# Modern Data Lake Storage Layers - Delta Lake

In [None]:
%%configure -f
{
    "conf": {
        "spark.jars.packages": "io.delta:delta-core_2.12:1.0.0",
        "spark.sql.extensions":"io.delta.sql.DeltaSparkSessionExtension",
        "spark.sql.catalog.spark_catalog":"org.apache.spark.sql.delta.catalog.DeltaCatalog"
    }
}

In [None]:
%env S3_BUCKET_NAME=YOUR_S3_BUCKET_NAME

In [None]:
S3_BUCKET_NAME="YOUR_S3_BUCKET_NAME"

## The basics of Delta Lake

Like the other notebooks, we'll create an input dataframe and save it to S3.

_NOTE_ that this appears to automatically create some sort of schema in S3 as well... Thus, I've included the `overwriteSchema` option to recreate the table if necessary.

In [23]:
# Create a DataFrame
inputDF = spark.createDataFrame(
    [
        ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
        ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
        ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
        ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
        ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"),
        ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z"),
    ],
    ["id", "creation_date", "last_update_time"],
)

# Write a DataFrame as a Delta dataset
inputDF.write.format("delta").mode("overwrite").option(
    "overwriteSchema", "true"
).partitionBy("creation_date").save(f"s3://{S3_BUCKET_NAME}/tmp/delta/")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Well that's pretty straight-forward! Let's see what it looks like on S3.

In [24]:
%%sh

aws s3 ls s3://${S3_BUCKET_NAME}/tmp/delta/

                           PRE _delta_log/
                           PRE creation_date=2015-01-01/
                           PRE creation_date=2015-01-02/
2022-02-01 20:40:35          0 _delta_log_$folder$


In [25]:
%%sh

aws s3 ls s3://${S3_BUCKET_NAME}/tmp/delta/ --recursive | tee /tmp/delta_op_001

2022-02-01 20:40:35       2120 tmp/delta/_delta_log/00000000000000000000.json
2022-02-01 20:40:35          0 tmp/delta/_delta_log_$folder$
2022-02-01 20:40:35        875 tmp/delta/creation_date=2015-01-01/part-00001-329f5740-2d39-4121-a226-96819c7b0e54.c000.snappy.parquet
2022-02-01 20:40:35        875 tmp/delta/creation_date=2015-01-01/part-00003-56522242-034b-4fe1-ba13-6eb887606c4d.c000.snappy.parquet
2022-02-01 20:40:35        875 tmp/delta/creation_date=2015-01-01/part-00005-69310258-d7d2-48cd-a20d-a06f87f5ada0.c000.snappy.parquet
2022-02-01 20:40:35        875 tmp/delta/creation_date=2015-01-01/part-00007-c1079719-8587-42b6-99f7-3c34c9412929.c000.snappy.parquet
2022-02-01 20:40:35        875 tmp/delta/creation_date=2015-01-02/part-00009-719fb0c9-cb8a-4135-86df-32867dc61aac.c000.snappy.parquet
2022-02-01 20:40:35        875 tmp/delta/creation_date=2015-01-02/part-00011-2348f269-9561-4767-ab3c-634788b0e85d.c000.snappy.parquet


Similar to Hudi, we've got our `.parquet` files in each partition as well as some sort of JSON file in the `_delta_log` prefix. Let's take a quick look at that JSON.

In [26]:
%%sh

aws s3 cp s3://${S3_BUCKET_NAME}/tmp/delta/_delta_log/00000000000000000000.json - | jq '.'

{
  "commitInfo": {
    "timestamp": 1643748034751,
    "operation": "WRITE",
    "operationParameters": {
      "mode": "Overwrite",
      "partitionBy": "[\"creation_date\"]"
    },
    "isBlindAppend": false,
    "operationMetrics": {
      "numFiles": "6",
      "numOutputBytes": "5250",
      "numOutputRows": "6"
    }
  }
}
{
  "protocol": {
    "minReaderVersion": 1,
    "minWriterVersion": 2
  }
}
{
  "metaData": {
    "id": "e1c7e882-e876-495b-aa0e-751a5ecb9feb",
    "format": {
      "provider": "parquet",
      "options": {}
    },
    "schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"creation_date\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"last_update_time\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}",
    "partitionColumns": [
      "creation_date"
    ],
    "configuration": {},
    "createdTime": 1643748034259
  }
}
{
  "add": {
    "path": "creatio

OK, so we've got some `commitInfo` that gives us information about the commit including the operation type and number of files written. We've got some `metaData` that includes the schema. And then several different `add` statements about the parquet files being created.

## Updating Data

Alright, let's go ahead and update one of our rows. Delta Lake provides a `merge` operation that we can use. We'll use the syntax [from the docs](https://docs.delta.io/latest/quick-start.html#update-table-data) that's slightly different from native Spark as it creates a `DeltaTable` object.

In [27]:
from pyspark.sql.functions import lit

# Create a new DataFrame from the first row of inputDF with a different creation_date value
updateDF = inputDF.where("id = 100").withColumn("creation_date", lit("2022-01-11"))

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, f"s3://{S3_BUCKET_NAME}/tmp/delta/")

deltaTable.alias("oldData") \
  .merge(
    updateDF.alias("newData"),
    "oldData.id = newData.id") \
  .whenMatchedUpdate(set = { "creation_date": col("newData.creation_date") }) \
  .execute()

deltaTable.toDF().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+-------------+--------------------+
| id|creation_date|    last_update_time|
+---+-------------+--------------------+
|100|   2022-01-11|2015-01-01T13:51:...|
|101|   2015-01-01|2015-01-01T12:14:...|
|102|   2015-01-01|2015-01-01T13:51:...|
|103|   2015-01-01|2015-01-01T13:51:...|
|105|   2015-01-02|2015-01-01T13:51:...|
|104|   2015-01-02|2015-01-01T12:15:...|
+---+-------------+--------------------+

Cool, looks like that worked! Let's take another peek at S3. We'll see one new `_delta_log` and one new `.parquet` file.

In [28]:
%%sh

aws s3 ls s3://${S3_BUCKET_NAME}/tmp/delta/ --recursive > /tmp/delta_op_002
diff -u /tmp/delta_op_001 /tmp/delta_op_002 || true

--- /tmp/delta_op_001	2022-02-01 20:40:37.234677808 +0000
+++ /tmp/delta_op_002	2022-02-01 20:40:40.470673699 +0000
@@ -1,4 +1,5 @@
 2022-02-01 20:40:35       2120 tmp/delta/_delta_log/00000000000000000000.json
+2022-02-01 20:40:39       1015 tmp/delta/_delta_log/00000000000000000001.json
 2022-02-01 20:40:35          0 tmp/delta/_delta_log_$folder$
 2022-02-01 20:40:35        875 tmp/delta/creation_date=2015-01-01/part-00001-329f5740-2d39-4121-a226-96819c7b0e54.c000.snappy.parquet
 2022-02-01 20:40:35        875 tmp/delta/creation_date=2015-01-01/part-00003-56522242-034b-4fe1-ba13-6eb887606c4d.c000.snappy.parquet
@@ -6,3 +7,4 @@
 2022-02-01 20:40:35        875 tmp/delta/creation_date=2015-01-01/part-00007-c1079719-8587-42b6-99f7-3c34c9412929.c000.snappy.parquet
 2022-02-01 20:40:35        875 tmp/delta/creation_date=2015-01-02/part-00009-719fb0c9-cb8a-4135-86df-32867dc61aac.c000.snappy.parquet
 2022-02-01 20:40:35        875 tmp/delta/creation_date=2015-01-02/part-00011-2348f269-9561-

In [29]:
%%sh

aws s3 cp s3://${S3_BUCKET_NAME}/tmp/delta/_delta_log/00000000000000000001.json - | jq '.'

{
  "commitInfo": {
    "timestamp": 1643748038744,
    "operation": "MERGE",
    "operationParameters": {
      "predicate": "(oldData.`id` = newData.`id`)",
      "matchedPredicates": "[{\"actionType\":\"update\"}]",
      "notMatchedPredicates": "[]"
    },
    "readVersion": 0,
    "isBlindAppend": false,
    "operationMetrics": {
      "numTargetRowsCopied": "0",
      "numTargetRowsDeleted": "0",
      "numTargetFilesAdded": "1",
      "executionTimeMs": "850",
      "numTargetRowsInserted": "0",
      "scanTimeMs": "448",
      "numTargetRowsUpdated": "1",
      "numOutputRows": "1",
      "numSourceRows": "1",
      "numTargetFilesRemoved": "1",
      "rewriteTimeMs": "364"
    }
  }
}
{
  "remove": {
    "path": "creation_date=2015-01-01/part-00001-329f5740-2d39-4121-a226-96819c7b0e54.c000.snappy.parquet",
    "deletionTimestamp": 1643748038744,
    "dataChange": true,
    "extendedFileMetadata": true,
    "partitionValues": {
      "creation_date": "2015-01-01"
    },
    "si

When we look at that JSON file, we can see it recorded our `MERGE` commit and that one parquet file was "remove"ed and another was added.

Given that no parquet file was removed from S3, it looks like Delta Lake reconstructs the state of the table from these logs.

We can see from the [Delta Utility](https://docs.delta.io/latest/delta-utility.html#language-python) docs, we can `vacuum` the table to remove data files and also retrieve the history of the table. Vacuum won't delete until after the default retention period (30 days) has passed, but let's look at the history.

In [30]:
deltaTable.history().show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------------------+------+--------+---------+------------------------------------------------------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+
|version|timestamp          |userId|userName|operation|operationParameters                                                                                                     |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics                                                                                                                                                                                                                            

## Deleting Data

We can, of course, delete data as well! We'll use the `deltaTable` object we created above.

In [31]:
deltaTable.delete("id = 100")
deltaTable.toDF().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+-------------+--------------------+
| id|creation_date|    last_update_time|
+---+-------------+--------------------+
|101|   2015-01-01|2015-01-01T12:14:...|
|102|   2015-01-01|2015-01-01T13:51:...|
|103|   2015-01-01|2015-01-01T13:51:...|
|105|   2015-01-02|2015-01-01T13:51:...|
|104|   2015-01-02|2015-01-01T12:15:...|
+---+-------------+--------------------+

In [32]:
%%sh

aws s3 ls s3://${S3_BUCKET_NAME}/tmp/delta/ --recursive > /tmp/delta_op_003
diff -u /tmp/delta_op_002 /tmp/delta_op_003 || true

--- /tmp/delta_op_002	2022-02-01 20:40:40.470673699 +0000
+++ /tmp/delta_op_003	2022-02-01 20:40:45.602667185 +0000
@@ -1,5 +1,6 @@
 2022-02-01 20:40:35       2120 tmp/delta/_delta_log/00000000000000000000.json
 2022-02-01 20:40:39       1015 tmp/delta/_delta_log/00000000000000000001.json
+2022-02-01 20:40:44        596 tmp/delta/_delta_log/00000000000000000002.json
 2022-02-01 20:40:35          0 tmp/delta/_delta_log_$folder$
 2022-02-01 20:40:35        875 tmp/delta/creation_date=2015-01-01/part-00001-329f5740-2d39-4121-a226-96819c7b0e54.c000.snappy.parquet
 2022-02-01 20:40:35        875 tmp/delta/creation_date=2015-01-01/part-00003-56522242-034b-4fe1-ba13-6eb887606c4d.c000.snappy.parquet


Interestingly enough, we just see a new `_delta_log` JSON file, recording the fact that the parquet file is no longer relevant.

In [33]:
%%sh

aws s3 cp s3://${S3_BUCKET_NAME}/tmp/delta/_delta_log/00000000000000000002.json - | jq '.'

{
  "commitInfo": {
    "timestamp": 1643748043482,
    "operation": "DELETE",
    "operationParameters": {
      "predicate": "[\"(CAST(`id` AS INT) = 100)\"]"
    },
    "readVersion": 1,
    "isBlindAppend": false,
    "operationMetrics": {
      "numRemovedFiles": "1",
      "numCopiedRows": "0",
      "executionTimeMs": "446",
      "numDeletedRows": "1",
      "scanTimeMs": "293",
      "numAddedFiles": "0",
      "rewriteTimeMs": "153"
    }
  }
}
{
  "remove": {
    "path": "creation_date=2022-01-11/part-00000-15a77ce6-e8e1-4bba-8e04-846a9e095a9b.c000.snappy.parquet",
    "deletionTimestamp": 1643748043482,
    "dataChange": true,
    "extendedFileMetadata": true,
    "partitionValues": {
      "creation_date": "2022-01-11"
    },
    "size": 875
  }
}


## Snapshots

We can also read the table at a certain point-in-time using [Spark SQL or DataFrames](https://docs.delta.io/latest/delta-batch.html#-deltatimetravel).

In [34]:
# Read using a specific timestamp
first_timestamp = deltaTable.history().sort("version").limit(1).collect()[0]['timestamp']

df1 = (
    spark.read.format("delta")
    .option("timestampAsOf", first_timestamp)
    .load(f"s3://{S3_BUCKET_NAME}/tmp/delta/")
)
df1.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+-------------+--------------------+
| id|creation_date|    last_update_time|
+---+-------------+--------------------+
|101|   2015-01-01|2015-01-01T12:14:...|
|100|   2015-01-01|2015-01-01T13:51:...|
|102|   2015-01-01|2015-01-01T13:51:...|
|103|   2015-01-01|2015-01-01T13:51:...|
|105|   2015-01-02|2015-01-01T13:51:...|
|104|   2015-01-02|2015-01-01T12:15:...|
+---+-------------+--------------------+

In [35]:
# Or a specific (0-indexed :)) version
df2 = (
    spark.read.format("delta")
    .option("versionAsOf", 0)
    .load(f"s3://{S3_BUCKET_NAME}/tmp/delta/")
)
df2.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+-------------+--------------------+
| id|creation_date|    last_update_time|
+---+-------------+--------------------+
|101|   2015-01-01|2015-01-01T12:14:...|
|100|   2015-01-01|2015-01-01T13:51:...|
|102|   2015-01-01|2015-01-01T13:51:...|
|103|   2015-01-01|2015-01-01T13:51:...|
|105|   2015-01-02|2015-01-01T13:51:...|
|104|   2015-01-02|2015-01-01T12:15:...|
+---+-------------+--------------------+

Sometimes it can be helpful to describe the history of a table. For example, you might want to get the latest version to use in downstream queries.

In [36]:
history = spark.sql(f"DESCRIBE HISTORY delta.`s3://{S3_BUCKET_NAME}/tmp/delta/`")

history.show()

latest_version = history.selectExpr("max(version)").collect()[0][0]
print(f"Latest version is: {latest_version}")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+
|      2|2022-02-01 20:40:44|  null|    null|   DELETE|{predicate -> ["(...|null|    null|     null|          1|          null|        false|{numRemovedFiles ...|        null|
|      1|2022-02-01 20:40:39|  null|    null|    MERGE|{predicate -> (ol...|null|    null|     null|          0|          null|        false|{numTargetRowsCop...|        null|
|      0|2022-02-01 20:40:35|  null|    null|    WRITE|{mode -> Overwrit...|null|    null|     null|       null|        

## Cleanup

In [37]:
%%sh

if [ 1 == 2 ]; then
    aws s3 rm --recursive s3://${S3_BUCKET_NAME}/tmp/delta/
fi