In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
%cd /content/drive/MyDrive/Github/

/content/drive/MyDrive/Github


In [None]:
# !rm -rf .git
# !git clone https://github.com/bst-depractice/spark_play

Cloning into 'spark_play'...
remote: Enumerating objects: 15, done.[K
remote: Counting objects: 100% (15/15), done.[K
remote: Compressing objects: 100% (13/13), done.[K
remote: Total 15 (delta 2), reused 0 (delta 0), pack-reused 0[K
Receiving objects: 100% (15/15), 1.17 MiB | 4.35 MiB/s, done.
Resolving deltas: 100% (2/2), done.


In [3]:
!apt-get -qq update > /tmp/apt.out
!apt-get install -y -qq openjdk-11-jdk-headless

In [4]:
!(wget -q --show-progress -nc https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz)
!tar xf spark-3.2.1-bin-hadoop3.2.tgz

In [5]:
try:
  import pyspark, findspark, delta, pyngrok
except:
  %pip install -q --upgrade pyspark==3.2.1
  %pip install -q findspark
  %pip install -q delta
  %pip install pyngrok

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.0/199.0 kB[0m [31m21.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for delta (setup.py) ... [?25l[?25hdone
Collecting pyngrok
  Downloading pyngrok-7.1.6-py3-none-any.whl (22 kB)
Installing collected packages: pyngrok
Successfully installed pyngrok-7.1.6


Pass the config k,v pairs and get a spark session object

In [6]:
import findspark
import pyspark
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/drive/MyDrive/Github/spark-3.2.1-bin-hadoop3.2"

findspark.init()
MAX_MEMORY="8g"
maven_coords = [
    "org.apache.spark:spark-avro_2.12:3.2.1",
    "io.delta:delta-core_2.12:2.0.0rc1",
    "org.xerial:sqlite-jdbc:3.36.0.3",
    "graphframes:graphframes:0.8.2-spark3.2-s_2.12",
    "com.acervera.osm4scala:osm4scala-spark3-shaded_2.12:1.0.8",
]
spark = (pyspark.sql.SparkSession.builder.appName("MyApp")
    .config("spark.jars.packages", ",".join(maven_coords))
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.executor.memory", MAX_MEMORY)
    .config("spark.driver.memory", MAX_MEMORY)
    .config('spark.ui.port', '4050')
    .enableHiveSupport()
    .getOrCreate()
    )
spark

In [None]:
from pyngrok import ngrok, conf
import getpass

print("Enter your authtoken, which can be copied "
"from https://dashboard.ngrok.com/get-started/your-authtoken")
conf.get_default().auth_token = getpass.getpass()

ui_port = 4040
public_url = ngrok.connect(ui_port).public_url
print(f" * ngrok tunnel \"{public_url}\" -> \"http://127.0.0.1:{ui_port}\"")

Enter your authtoken, which can be copied from https://dashboard.ngrok.com/get-started/your-authtoken
··········




 * ngrok tunnel "https://50f0-34-68-94-133.ngrok-free.app" -> "http://127.0.0.1:4040"


Setup is complete. At this point you have started a spark application and able to access the application-ui using the url above.
You can start writing your data transformation code below...

### Add Schema to your Dataframe

StructType, StructField, Spark Datatypes

Spark Datatypes are not the same as Python Datatypes

string vs StringType<br>
int vs IntegerType<br>
double vs DoubleType<br>
date vs DateType<br>

The type classes are in pyspark.sql.types<br>
`from pyspark.sql.types import *`

#### How to create an empty Dataframe for a given schema ?


In [7]:
from pyspark.sql.types import *

from datetime import datetime
schema = StructType(
          [StructField("name", StringType(), False),
           StructField("dob", DateType(), False)
          ]
        )

df = spark.createDataFrame([["ash", datetime.strptime("2020-01-01", "%Y-%m-%d")]], schema = schema)

df.write.mode("overwrite").format("delta").saveAsTable("delta_table")

# Merge into a delta_table using records from dataframe

with pyspark API

In [24]:
from delta.tables import *

schema = StructType(
          [StructField("name", StringType(), False),
           StructField("dob", DateType(), False)
          ]
        )
df = spark.createDataFrame([["ash", datetime.strptime("2010-01-01", "%Y-%m-%d")]], schema = schema)

new_df = spark.createDataFrame([["ash", datetime.strptime("2026-01-01", "%Y-%m-%d")],
                                ["mat", datetime.strptime("9926-01-01", "%Y-%m-%d")]], schema = schema)

df.write.mode("overwrite").format("delta").saveAsTable("my_delta")
new_df.write.mode("overwrite").format("delta").saveAsTable("updates_to_delta")

spark.sql("select * from my_delta").show()

spark.sql("""Merge into my_delta
              using updates_to_delta
              on my_delta.name = updates_to_delta.name
              when matched then
                update set
                  dob = updates_to_delta.dob
              when not matched then
                insert (name, dob)  values (updates_to_delta.name, updates_to_delta.dob)

            """)

spark.sql("select * from my_delta").show()

+----+----------+
|name|       dob|
+----+----------+
| ash|2010-01-01|
+----+----------+

+----+----------+
|name|       dob|
+----+----------+
| ash|2026-01-01|
| mat|9926-01-01|
+----+----------+



# Merge into a delta_table using records from dataframe

with SQL API

In [None]:
from delta.tables import *

schema = StructType(
          [StructField("name", StringType(), False),
           StructField("dob", DateType(), False)
          ]
        )
df = spark.createDataFrame([["ash", datetime.strptime("2010-01-01", "%Y-%m-%d")]], schema = schema)

new_df = spark.createDataFrame([["ash", datetime.strptime("2026-01-01", "%Y-%m-%d")],
                                ["mat", datetime.strptime("9926-01-01", "%Y-%m-%d")]], schema = schema)

df.write.mode("overwrite").format("delta").saveAsTable("my_delta")
new_df.write.mode("overwrite").format("delta").saveAsTable("updates_to_delta")

spark.sql("select * from my_delta").show()

spark.sql("""Merge into my_delta
              using updates_to_delta
              on my_delta.name = updates_to_delta.name
              when matched then
                update set
                  dob = updates_to_delta.dob
              when not matched then
                insert (name, dob)  values (updates_to_delta.name, updates_to_delta.dob)

            """)

spark.sql("select * from my_delta").show()