## Reference Material

* [Deltalake Documentation](https://docs.delta.io/latest/index.html#)

## Configuration

In [1]:
%%configure -f
{
  "conf": {
            "spark.jars":"https://repo1.maven.org/maven2/io/delta/delta-core_2.12/2.1.0/delta-core_2.12-2.1.0.jar",
            "spark.serializer":"org.apache.spark.serializer.KryoSerializer",
            "spark.sql.hive.convertMetastoreParquet":"false",
            "spark.jars.packages":"io.delta:delta-core_2.12:2.1.0",
            "spark.sql.extensions":"io.delta.sql.DeltaSparkSessionExtension",
            "spark.sql.catalog.spark_catalog":"org.apache.spark.sql.delta.catalog.DeltaCatalog",
            "spark.databricks.hive.metastore.glueCatalog.enabled":"true"
          }
}

Set a variable equal to the name of the S3 bucket to read / write from

In [2]:
s3_bucket_name = "emr-studio-demo-s3bucket-g3frvpeeuanh"

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1665678843032_0001,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime

import shutil

spark = SparkSession \
   .builder \
   .appName("Delta lake") \
   .master("local[*]") \
   .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
   .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
   .getOrCreate()

spark.sparkContext.addPyFile("https://repo1.maven.org/maven2/io/delta/delta-core_2.12/2.1.0/delta-core_2.12-2.1.0.jar")

from delta.tables import *

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Create a DataFrame

In [4]:
data = [
        ("1", "Chris", "2020-01-01", datetime.strptime('2020-01-01 00:00:00', '%Y-%m-%d %H:%M:%S')),
        ("2", "Will", "2020-01-01", datetime.strptime('2020-01-01 00:00:00', '%Y-%m-%d %H:%M:%S')),
        ("3", "Emma", "2020-01-01", datetime.strptime('2020-01-01 00:00:00', '%Y-%m-%d %H:%M:%S')),
        ("4", "John", "2020-01-01", datetime.strptime('2020-01-01 00:00:00', '%Y-%m-%d %H:%M:%S')),
        ("5", "Eric", "2020-01-01", datetime.strptime('2020-01-01 00:00:00', '%Y-%m-%d %H:%M:%S')),
        ("6", "Adam", "2020-01-01", datetime.strptime('2020-01-01 00:00:00', '%Y-%m-%d %H:%M:%S'))
]

schema = StructType([
        StructField("id", StringType(), False),
        StructField("name", StringType(), False), 
        StructField("create_date", StringType(), False),             
        StructField("last_update_time", TimestampType(), False)
])

inputDF = spark.createDataFrame(data=data,schema=schema)

# inputDF.show()
# inputDF.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Write to S3 via. Deltalake

In [5]:
inputDF.write.format("delta").save("s3://" + s3_bucket_name + "/delta/")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Read Data in Deltalake Format

In [6]:
readDF = spark.read.format("delta").load("s3://" + s3_bucket_name + "/delta/")

r.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+-----+-----------+-------------------+
| id| name|create_date|   last_update_time|
+---+-----+-----------+-------------------+
|  1|Chris| 2020-01-01|2020-01-01 00:00:00|
|  2| Will| 2020-01-01|2020-01-01 00:00:00|
|  3| Emma| 2020-01-01|2020-01-01 00:00:00|
|  4| John| 2020-01-01|2020-01-01 00:00:00|
|  5| Eric| 2020-01-01|2020-01-01 00:00:00|
|  6| Adam| 2020-01-01|2020-01-01 00:00:00|
+---+-----+-----------+-------------------+

## Upsert via. Deltalake

In [11]:
data = [
        ("1", "Christopher", "2020-01-01", datetime.strptime('2020-01-02 00:00:00', '%Y-%m-%d %H:%M:%S')),
        ("3", "Emmeline", "2020-01-01", datetime.strptime('2020-01-02 00:00:00', '%Y-%m-%d %H:%M:%S'))
]

schema = StructType([
        StructField("id", StringType(), False),
        StructField("name", StringType(), False), 
        StructField("create_date", StringType(), False),             
        StructField("last_update_time", TimestampType(), False)
])

updateDF = spark.createDataFrame(data=data,schema=schema)

# inputDF.show()
# inputDF.printSchema()

deltaTableSource = DeltaTable.forPath(spark, "s3://" + s3_bucket_name + "/delta/")

deltaTableSource.alias('source') \
  .merge(
    updateDF.alias('updates'),
    'source.id = updates.id'
  ) \
  .whenMatchedUpdate(set =
    {
      "id": "updates.id",
      "name": "updates.name",
      "create_date": "updates.create_date",
      "last_update_time": "updates.last_update_time"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "id": "updates.id",
      "name": "updates.name",
      "create_date": "updates.create_date",
      "last_update_time": "updates.last_update_time"
    }
  ) \
  .execute()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
spark.read.format("delta").load("s3://" + s3_bucket_name + "/delta/").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+-----------+-----------+-------------------+
| id|       name|create_date|   last_update_time|
+---+-----------+-----------+-------------------+
|  4|       John| 2020-01-01|2020-01-01 00:00:00|
|  5|       Eric| 2020-01-01|2020-01-01 00:00:00|
|  6|       Adam| 2020-01-01|2020-01-01 00:00:00|
|  1|Christopher| 2020-01-01|2020-01-02 00:00:00|
|  2|       Will| 2020-01-01|2020-01-01 00:00:00|
|  3|   Emmeline| 2020-01-01|2020-01-02 00:00:00|
+---+-----------+-----------+-------------------+

## Time Travel via. Deltalake

In [7]:
deltaTable = DeltaTable.forPath(spark, "s3://" + s3_bucket_name + "/delta/")

deltaTable.history().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|      0|2022-10-13 16:41:41|  null|    null|    WRITE|{mode -> ErrorIfE...|null|    null|     null|       null|  Serializable|         true|{numFiles -> 2, n...|        null|Apache-Spark/3.3....|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+

In [8]:
spark.read.format("delta").option("versionAsOf", 0).load("s3://" + s3_bucket_name + "/delta/").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+-----+-----------+-------------------+
| id| name|create_date|   last_update_time|
+---+-----+-----------+-------------------+
|  1|Chris| 2020-01-01|2020-01-01 00:00:00|
|  2| Will| 2020-01-01|2020-01-01 00:00:00|
|  3| Emma| 2020-01-01|2020-01-01 00:00:00|
|  4| John| 2020-01-01|2020-01-01 00:00:00|
|  5| Eric| 2020-01-01|2020-01-01 00:00:00|
|  6| Adam| 2020-01-01|2020-01-01 00:00:00|
+---+-----+-----------+-------------------+