In [1]:
from pyspark.sql import SparkSession

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
2,application_1643358166445_0004,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Adding Delta Jar package and configurations to the notebook

Set up configuration with the correct version of Delta Lake that is compatible with Spark or PySpark.

For more information, see [here](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/Deltausing-cluster-spark.html).

You need to configure `spark.jar.packages` according to the Delta version that matches your Spark version.

For exmaple, if you are using Amazon EMR versions `emr-6.7.0`, you should use the first configurations.

In [None]:
%%configure -f

{
  "conf": {
    "spark.jars.packages": "io.delta:delta-core_2.12:1.2.1",
    "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension",
    "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog"
  }
}

If you are running Amazon EMR releases `emr-6.15.0` or higher, you must also use the following configurations to use fine-grained access control based on Lake Formation with Delta Lake.

In [2]:
%%configure -f

{
  "conf": {
    "spark.jars.packages": "io.delta:delta-spark_2.13:3.1.0",
    "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension",
    "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    "spark.sql.catalog.spark_catalog.lf.managed": "true"
  }
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
4,application_1643358166445_0006,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
4,application_1643358166445_0006,pyspark,idle,Link,Link,✔


In [3]:
from delta.tables import *

bucket = "learn-deltalake-2022" # Replace `learn-deltalake-2022` with your s3 path from here on.
hive_table, spark_table = "delta-hive-table", "delta-spark-table"

deltaHivePath = f"s3a://{bucket}/{hive_table}/"
deltaSparkPath = f"s3a://{bucket}/{spark_table}/"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Defining Methods to generate Json Data

In [4]:
from datetime import datetime

def get_json_data(start, count, dest):
    time_stamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    data = [{
        'trip_id': i,
        'tstamp': time_stamp,
        'route_id': chr(65 + (i % 10)),
        'destination': dest[i % 10]
    } for i in range(start, start + count)]
    return data

def create_json_df(spark, data):
    sc = spark.sparkContext
    return spark.read.json(sc.parallelize(data))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Creating Spark dataframe from trip data

In [5]:
dest = ["Seattle", "New York", "New Jersey", "Los Angeles", "Las Vagas",
        "Tucson", "Washinton DC", "Philadelphia", "Miami", "San Francisco"]

df1 = create_json_df(spark, get_json_data(0, 2000000, dest))
print(df1.count())
df1.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2000000
+-------------+--------+-------+-------------------+
|  destination|route_id|trip_id|             tstamp|
+-------------+--------+-------+-------------------+
|      Seattle|       A|      0|2022-01-31 07:24:39|
|     New York|       B|      1|2022-01-31 07:24:39|
|   New Jersey|       C|      2|2022-01-31 07:24:39|
|  Los Angeles|       D|      3|2022-01-31 07:24:39|
|    Las Vagas|       E|      4|2022-01-31 07:24:39|
|       Tucson|       F|      5|2022-01-31 07:24:39|
| Washinton DC|       G|      6|2022-01-31 07:24:39|
| Philadelphia|       H|      7|2022-01-31 07:24:39|
|        Miami|       I|      8|2022-01-31 07:24:39|
|San Francisco|       J|      9|2022-01-31 07:24:39|
|      Seattle|       A|     10|2022-01-31 07:24:39|
|     New York|       B|     11|2022-01-31 07:24:39|
|   New Jersey|       C|     12|2022-01-31 07:24:39|
|  Los Angeles|       D|     13|2022-01-31 07:24:39|
|    Las Vagas|       E|     14|2022-01-31 07:24:39|
|       Tucson|       F|     15|2022-0

In [6]:
df1.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- destination: string (nullable = true)
 |-- route_id: string (nullable = true)
 |-- trip_id: long (nullable = true)
 |-- tstamp: string (nullable = true)

### Saving the data frame in Delta format in S3

In [7]:
df1.write.mode("overwrite").format("delta").partitionBy("route_id").save(deltaHivePath)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Review written data using Spark SQL temporary view

In [8]:
spark.read.format("delta").load(deltaHivePath).createOrReplaceTempView("temp_trip_table")

spark.sql("SELECT count(*) FROM temp_trip_table").show()
spark.sql("SELECT * FROM temp_trip_table LIMIT 5").show()
spark.sql("SELECT max(trip_id) FROM temp_trip_table").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+
|count(1)|
+--------+
| 2000000|
+--------+

+-------------+--------+-------+-------------------+
|  destination|route_id|trip_id|             tstamp|
+-------------+--------+-------+-------------------+
|San Francisco|       J| 749569|2022-01-31 05:47:46|
|San Francisco|       J| 749579|2022-01-31 05:47:46|
|San Francisco|       J| 749589|2022-01-31 05:47:46|
|San Francisco|       J| 749599|2022-01-31 05:47:46|
|San Francisco|       J| 749609|2022-01-31 05:47:46|
+-------------+--------+-------+-------------------+

+------------+
|max(trip_id)|
+------------+
|     1999999|
+------------+

### Now lets create manifest files for accessing this delta storage data from Athena as table

In [9]:
deltaTable = DeltaTable.forPath(spark, deltaHivePath)
deltaTable.generate("symlink_format_manifest")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Creating a Spark table in Glue Data Catalog with same data

Before running the following command, replace the s3 location with yours.

In [10]:
%%sql -q
CREATE DATABASE IF NOT EXISTS my_delta_db LOCATION 's3a://learn-deltalake-2022/delta-spark-table/'

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
deltaDatabase = "my_delta_db"
deltaTable = f"{deltaDatabase}.delta_spark_table"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
df1.write.format("delta").mode("overwrite").partitionBy("route_id").option("path", deltaSparkPath).saveAsTable(deltaTable)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
spark.sql(f"use {deltaDatabase}")
spark.sql("show tables").show(20)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+-----------------+-----------+
|   database|        tableName|isTemporary|
+-----------+-----------------+-----------+
|my_delta_db|delta_spark_table|      false|
|my_delta_db| delta_trip_table|      false|
|           |  temp_trip_table|       true|
+-----------+-----------------+-----------+

In [14]:
%%spark -c spark -o delta_trip_op

delta_trip_op = spark.sql(f"select count(*) from {deltaTable}")
delta_trip_op.show()

delta_trip_op = spark.sql(f"select max(trip_id) from {deltaTable}")
delta_trip_op.show()

delta_trip_op = spark.sql(f"select destination, route_id, trip_id, tstamp from {deltaTable} limit 10")
delta_trip_op.show()

delta_trip_op = spark.sql(f"select distinct route_id from {deltaTable} order by route_id")
delta_trip_op.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+
|count(1)|
+--------+
| 2000000|
+--------+

+------------+
|max(trip_id)|
+------------+
|     1999999|
+------------+

+-------------+--------+-------+-------------------+
|  destination|route_id|trip_id|             tstamp|
+-------------+--------+-------+-------------------+
|San Francisco|       J|1750019|2022-01-31 07:24:39|
|San Francisco|       J|1750029|2022-01-31 07:24:39|
|San Francisco|       J|1750039|2022-01-31 07:24:39|
|San Francisco|       J|1750049|2022-01-31 07:24:39|
|San Francisco|       J|1750059|2022-01-31 07:24:39|
|San Francisco|       J|1750069|2022-01-31 07:24:39|
|San Francisco|       J|1750079|2022-01-31 07:24:39|
|San Francisco|       J|1750089|2022-01-31 07:24:39|
|San Francisco|       J|1750099|2022-01-31 07:24:39|
|San Francisco|       J|1750109|2022-01-31 07:24:39|
+-------------+--------+-------+-------------------+

+--------+
|route_id|
+--------+
|       A|
|       B|
|       C|
|       D|
|       E|
|       F|
|       G|
|       H|
|   

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### This point forward only Athena table will be used for data processing

### Lets INSERT some new rows to the Athena table with schema evolution

In [15]:
from datetime import datetime

def get_json_data_update(start, count, dest, origin):
    time_stamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    data = [{
        'trip_id': i,
        'tstamp': time_stamp,
        'route_id': chr(65 + (i % 10)),
        'destination': dest[i % 10],
        'origin': origin[i % 10]
    } for i in range(start, start + count)]
    return data

def create_json_df_update(spark, data):
    sc = spark.sparkContext
    return spark.read.json(sc.parallelize(data))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
insert_dest = ['Syracuse', 'Syracuse', 'Syracuse', 'Syracuse', 'Syracuse',
               'Syracuse', 'Syracuse', 'Syracuse', 'Syracuse', 'Syracuse']
insert_origin = ["Seattle", "New York", "New Jersey", "Los Angeles", "Las Vagas",
                 "Tucson", "Washinton DC", "Philadelphia", "Miami", "San Francisco"]

tripUpdates = create_json_df_update(spark, get_json_data_update(2000000, 20, insert_dest, insert_origin))
print(tripUpdates.count())
tripUpdates.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

20
+-----------+-------------+--------+-------+-------------------+
|destination|       origin|route_id|trip_id|             tstamp|
+-----------+-------------+--------+-------+-------------------+
|   Syracuse|      Seattle|       A|2000000|2022-01-31 05:49:39|
|   Syracuse|     New York|       B|2000001|2022-01-31 05:49:39|
|   Syracuse|   New Jersey|       C|2000002|2022-01-31 05:49:39|
|   Syracuse|  Los Angeles|       D|2000003|2022-01-31 05:49:39|
|   Syracuse|    Las Vagas|       E|2000004|2022-01-31 05:49:39|
|   Syracuse|       Tucson|       F|2000005|2022-01-31 05:49:39|
|   Syracuse| Washinton DC|       G|2000006|2022-01-31 05:49:39|
|   Syracuse| Philadelphia|       H|2000007|2022-01-31 05:49:39|
|   Syracuse|        Miami|       I|2000008|2022-01-31 05:49:39|
|   Syracuse|San Francisco|       J|2000009|2022-01-31 05:49:39|
|   Syracuse|      Seattle|       A|2000010|2022-01-31 05:49:39|
|   Syracuse|     New York|       B|2000011|2022-01-31 05:49:39|
|   Syracuse|   New Je

In [17]:
tripUpdates.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- destination: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- route_id: string (nullable = true)
 |-- trip_id: long (nullable = true)
 |-- tstamp: string (nullable = true)

In [18]:
deltaHivePath = f"s3a://{bucket}/{hive_table}/"
tripUpdates.write.format("delta").mode("append").option("mergeSchema", "true").partitionBy("route_id").save(deltaHivePath)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Now lets query the updated data using Spark SQL

In [19]:
spark.read.format("delta").load(deltaHivePath).createOrReplaceTempView("temp_trip_table")
spark.sql("SELECT count(*) FROM temp_trip_table").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+
|count(1)|
+--------+
| 2000020|
+--------+

In [20]:
spark.sql("SELECT * FROM temp_trip_table WHERE trip_id > 1999996 ORDER BY trip_id").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------+--------+-------+-------------------+-------------+
|  destination|route_id|trip_id|             tstamp|       origin|
+-------------+--------+-------+-------------------+-------------+
| Philadelphia|       H|1999997|2022-01-31 05:47:46|         null|
|        Miami|       I|1999998|2022-01-31 05:47:46|         null|
|San Francisco|       J|1999999|2022-01-31 05:47:46|         null|
|     Syracuse|       A|2000000|2022-01-31 05:49:39|      Seattle|
|     Syracuse|       B|2000001|2022-01-31 05:49:39|     New York|
|     Syracuse|       C|2000002|2022-01-31 05:49:39|   New Jersey|
|     Syracuse|       D|2000003|2022-01-31 05:49:39|  Los Angeles|
|     Syracuse|       E|2000004|2022-01-31 05:49:39|    Las Vagas|
|     Syracuse|       F|2000005|2022-01-31 05:49:39|       Tucson|
|     Syracuse|       G|2000006|2022-01-31 05:49:39| Washinton DC|
|     Syracuse|       H|2000007|2022-01-31 05:49:39| Philadelphia|
|     Syracuse|       I|2000008|2022-01-31 05:49:39|        Mi

### Now Updating the Manifest files for Athena

In [21]:
deltaTable = DeltaTable.forPath(spark, deltaHivePath)
deltaTable.generate("symlink_format_manifest")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Now lets UPDATE some data

In [22]:
spark.read.format("delta").load(deltaHivePath).createOrReplaceTempView("temp_trip_table")
spark.sql("SELECT count(*) AS Count_Syracuse FROM temp_trip_table WHERE destination = 'Syracuse'").show()
spark.sql("SELECT count(*) AS Count_Philadelphia FROM temp_trip_table WHERE destination = 'Philadelphia'").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+
|Count_Syracuse|
+--------------+
|            20|
+--------------+

+------------------+
|Count_Philadelphia|
+------------------+
|            200000|
+------------------+

In [23]:
deltaTable = DeltaTable.forPath(spark, deltaHivePath)
deltaTable.update("destination = 'Syracuse'", {"destination": "'Philadelphia'"})

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
deltaTable = DeltaTable.forPath(spark, deltaHivePath)
deltaTable.generate("symlink_format_manifest")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
spark.read.format("delta").load(deltaHivePath).createOrReplaceTempView("temp_trip_table")
spark.sql("SELECT count(*) AS Count_Syracuse FROM temp_trip_table WHERE destination = 'Syracuse'").show()
spark.sql("SELECT count(*) AS Count_Philadelphia FROM temp_trip_table WHERE destination = 'Philadelphia'").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+
|Count_Syracuse|
+--------------+
|             0|
+--------------+

+------------------+
|Count_Philadelphia|
+------------------+
|            200020|
+------------------+

### Now lets DELETE some data

In [26]:
spark.read.format("delta").load(deltaHivePath).createOrReplaceTempView("temp_trip_table")
spark.sql("SELECT count(*) as Count_total FROM temp_trip_table").show()
spark.sql("SELECT count(*) as Count_NJ FROM temp_trip_table WHERE destination = 'New Jersey'").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+
|Count_total|
+-----------+
|    2000020|
+-----------+

+--------+
|Count_NJ|
+--------+
|  200000|
+--------+

In [27]:
deltaTable = DeltaTable.forPath(spark, deltaHivePath)
deltaTable.delete("destination = 'New Jersey'")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
deltaTable.generate("symlink_format_manifest")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
spark.read.format("delta").load(deltaHivePath).createOrReplaceTempView("temp_trip_table")
spark.sql("SELECT count(*) as Count_total FROM temp_trip_table").show()
spark.sql("SELECT count(*) as Count_NJ FROM temp_trip_table WHERE destination = 'New Jersey'").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+
|Count_total|
+-----------+
|    1800020|
+-----------+

+--------+
|Count_NJ|
+--------+
|       0|
+--------+

### Lets do some UPSERT now

In [30]:
spark.read.format("delta").load(deltaHivePath).createOrReplaceTempView("temp_trip_table")
spark.sql("SELECT max(trip_id) FROM temp_trip_table").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+
|max(trip_id)|
+------------+
|     2000019|
+------------+

In [31]:
spark.sql("SELECT * FROM temp_trip_table WHERE trip_id > 2000014 ORDER BY trip_id").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+--------+-------+-------------------+-------------+
| destination|route_id|trip_id|             tstamp|       origin|
+------------+--------+-------+-------------------+-------------+
|Philadelphia|       F|2000015|2022-01-31 05:49:39|       Tucson|
|Philadelphia|       G|2000016|2022-01-31 05:49:39| Washinton DC|
|Philadelphia|       H|2000017|2022-01-31 05:49:39| Philadelphia|
|Philadelphia|       I|2000018|2022-01-31 05:49:39|        Miami|
|Philadelphia|       J|2000019|2022-01-31 05:49:39|San Francisco|
+------------+--------+-------+-------------------+-------------+

In [32]:
insert_dest = ['Avon', 'Simsbury', 'Farmington', 'Windsor', 'Ellington',
               'Vernon', 'Winsted', 'Westport', 'Fairfield', 'Stamford']
insert_origin = ["Seattle", "Seattle", "Seattle", "Seattle", "Seattle",
                 "Seattle", "Seattle", "Seattle", "Seattle", "Seattle"]


tripUpdates1 = create_json_df_update(spark, get_json_data_update(2000015, 10, insert_dest, insert_origin))
print(tripUpdates1.count())
tripUpdates1.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

10
+-----------+-------+--------+-------+-------------------+
|destination| origin|route_id|trip_id|             tstamp|
+-----------+-------+--------+-------+-------------------+
|     Vernon|Seattle|       F|2000015|2022-01-31 06:32:30|
|    Winsted|Seattle|       G|2000016|2022-01-31 06:32:30|
|   Westport|Seattle|       H|2000017|2022-01-31 06:32:30|
|  Fairfield|Seattle|       I|2000018|2022-01-31 06:32:30|
|   Stamford|Seattle|       J|2000019|2022-01-31 06:32:30|
|       Avon|Seattle|       A|2000020|2022-01-31 06:32:30|
|   Simsbury|Seattle|       B|2000021|2022-01-31 06:32:30|
| Farmington|Seattle|       C|2000022|2022-01-31 06:32:30|
|    Windsor|Seattle|       D|2000023|2022-01-31 06:32:30|
|  Ellington|Seattle|       E|2000024|2022-01-31 06:32:30|
+-----------+-------+--------+-------+-------------------+

In [33]:
deltaTable = DeltaTable.forPath(spark, deltaHivePath)
(deltaTable
.alias('t')
.merge(tripUpdates1.alias('s'), 't.trip_id = s.trip_id')
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [34]:
spark.read.format("delta").load(deltaHivePath).createOrReplaceTempView("temp_trip_table")
spark.sql("SELECT * FROM temp_trip_table WHERE trip_id > 2000010 ORDER BY trip_id").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+--------+-------+-------------------+-----------+
| destination|route_id|trip_id|             tstamp|     origin|
+------------+--------+-------+-------------------+-----------+
|Philadelphia|       B|2000011|2022-01-31 05:49:39|   New York|
|Philadelphia|       C|2000012|2022-01-31 05:49:39| New Jersey|
|Philadelphia|       D|2000013|2022-01-31 05:49:39|Los Angeles|
|Philadelphia|       E|2000014|2022-01-31 05:49:39|  Las Vagas|
|      Vernon|       F|2000015|2022-01-31 06:32:30|    Seattle|
|     Winsted|       G|2000016|2022-01-31 06:32:30|    Seattle|
|    Westport|       H|2000017|2022-01-31 06:32:30|    Seattle|
|   Fairfield|       I|2000018|2022-01-31 06:32:30|    Seattle|
|    Stamford|       J|2000019|2022-01-31 06:32:30|    Seattle|
|        Avon|       A|2000020|2022-01-31 06:32:30|    Seattle|
|    Simsbury|       B|2000021|2022-01-31 06:32:30|    Seattle|
|  Farmington|       C|2000022|2022-01-31 06:32:30|    Seattle|
|     Windsor|       D|2000023|2022-01-3

### Lets perform some time travel queries

In [35]:
(deltaTable
.history(100)
.select("version", "timestamp", "operation", "operationParameters")
.show(truncate=False))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------------------+---------+---------------------------------------------------------------------------------------------------------------------------------------------+
|version|timestamp          |operation|operationParameters                                                                                                                          |
+-------+-------------------+---------+---------------------------------------------------------------------------------------------------------------------------------------------+
|4      |2022-01-31 06:35:13|MERGE    |[predicate -> (t.`trip_id` = s.`trip_id`), matchedPredicates -> [{"actionType":"update"}], notMatchedPredicates -> [{"actionType":"insert"}]]|
|3      |2022-01-31 06:16:07|DELETE   |[predicate -> ["(`destination` = 'New Jersey')"]]                                                                                            |
|2      |2022-01-31 06:08:53|UPDATE   |[predicate -> (destination#216 = Syracuse)]        

In [36]:
df_v0 = (spark.read
         .format("delta")
         .option("timestampAsOf", "2022-01-31 05:48:06") # Use the timestamp of version 0 for `timestampAsOf` parameter
         .load(deltaHivePath))
df_v0.take(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(destination='San Francisco', route_id='J', trip_id=749569, tstamp='2022-01-31 05:47:46'), Row(destination='San Francisco', route_id='J', trip_id=749579, tstamp='2022-01-31 05:47:46'), Row(destination='San Francisco', route_id='J', trip_id=749589, tstamp='2022-01-31 05:47:46'), Row(destination='San Francisco', route_id='J', trip_id=749599, tstamp='2022-01-31 05:47:46'), Row(destination='San Francisco', route_id='J', trip_id=749609, tstamp='2022-01-31 05:47:46')]

In [37]:
df_v0.createOrReplaceTempView("temp_trip_table_v0")
spark.sql("SELECT * FROM temp_trip_table_v0 WHERE trip_id >= 2000000 ORDER BY trip_id").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------+-------+------+
|destination|route_id|trip_id|tstamp|
+-----------+--------+-------+------+
+-----------+--------+-------+------+

In [38]:
df_v2 = (spark.read
         .format("delta")
         .option("versionAsOf", "2")
         .load(deltaHivePath))
df_v2.take(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(destination='San Francisco', route_id='J', trip_id=749569, tstamp='2022-01-31 05:47:46', origin=None), Row(destination='San Francisco', route_id='J', trip_id=749579, tstamp='2022-01-31 05:47:46', origin=None), Row(destination='San Francisco', route_id='J', trip_id=749589, tstamp='2022-01-31 05:47:46', origin=None), Row(destination='San Francisco', route_id='J', trip_id=749599, tstamp='2022-01-31 05:47:46', origin=None), Row(destination='San Francisco', route_id='J', trip_id=749609, tstamp='2022-01-31 05:47:46', origin=None)]

In [39]:
df_v2.createOrReplaceTempView("temp_trip_table_v2")
spark.sql("SELECT * FROM temp_trip_table_v2 WHERE trip_id >= 2000000 ORDER BY trip_id").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+--------+-------+-------------------+-------------+
| destination|route_id|trip_id|             tstamp|       origin|
+------------+--------+-------+-------------------+-------------+
|Philadelphia|       A|2000000|2022-01-31 05:49:39|      Seattle|
|Philadelphia|       B|2000001|2022-01-31 05:49:39|     New York|
|Philadelphia|       C|2000002|2022-01-31 05:49:39|   New Jersey|
|Philadelphia|       D|2000003|2022-01-31 05:49:39|  Los Angeles|
|Philadelphia|       E|2000004|2022-01-31 05:49:39|    Las Vagas|
|Philadelphia|       F|2000005|2022-01-31 05:49:39|       Tucson|
|Philadelphia|       G|2000006|2022-01-31 05:49:39| Washinton DC|
|Philadelphia|       H|2000007|2022-01-31 05:49:39| Philadelphia|
|Philadelphia|       I|2000008|2022-01-31 05:49:39|        Miami|
|Philadelphia|       J|2000009|2022-01-31 05:49:39|San Francisco|
|Philadelphia|       A|2000010|2022-01-31 05:49:39|      Seattle|
|Philadelphia|       B|2000011|2022-01-31 05:49:39|     New York|
|Philadelp

In [40]:
df_v1 = (spark.read
         .format("delta")
         .option("versionAsOf", "1")
         .load(deltaHivePath))
df_v1.take(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(destination='San Francisco', route_id='J', trip_id=749569, tstamp='2022-01-31 05:47:46', origin=None), Row(destination='San Francisco', route_id='J', trip_id=749579, tstamp='2022-01-31 05:47:46', origin=None), Row(destination='San Francisco', route_id='J', trip_id=749589, tstamp='2022-01-31 05:47:46', origin=None), Row(destination='San Francisco', route_id='J', trip_id=749599, tstamp='2022-01-31 05:47:46', origin=None), Row(destination='San Francisco', route_id='J', trip_id=749609, tstamp='2022-01-31 05:47:46', origin=None)]

In [41]:
df_v3 = (spark.read
         .format("delta")
         .option("versionAsOf", "3")
         .load(deltaHivePath))
df_v3.take(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(destination='San Francisco', route_id='J', trip_id=749569, tstamp='2022-01-31 05:47:46', origin=None), Row(destination='San Francisco', route_id='J', trip_id=749579, tstamp='2022-01-31 05:47:46', origin=None), Row(destination='San Francisco', route_id='J', trip_id=749589, tstamp='2022-01-31 05:47:46', origin=None), Row(destination='San Francisco', route_id='J', trip_id=749599, tstamp='2022-01-31 05:47:46', origin=None), Row(destination='San Francisco', route_id='J', trip_id=749609, tstamp='2022-01-31 05:47:46', origin=None)]

In [42]:
df_v1.createOrReplaceTempView("temp_trip_table_v1")
spark.sql("SELECT count(*) AS Count_Syracuse FROM temp_trip_table_v1 WHERE destination = 'Syracuse'").show()
spark.sql("SELECT count(*) AS Count_Philadelphia FROM temp_trip_table_v1 WHERE destination = 'Philadelphia'").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+
|Count_Syracuse|
+--------------+
|            20|
+--------------+

+------------------+
|Count_Philadelphia|
+------------------+
|            200000|
+------------------+

In [43]:
spark.sql("SELECT count(*) AS Count_Syracuse FROM temp_trip_table_v2 WHERE destination = 'Syracuse'").show()
spark.sql("SELECT count(*) AS Count_Philadelphia FROM temp_trip_table_v2 WHERE destination = 'Philadelphia'").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+
|Count_Syracuse|
+--------------+
|             0|
+--------------+

+------------------+
|Count_Philadelphia|
+------------------+
|            200020|
+------------------+

In [44]:
spark.sql("SELECT count(*) as Count_total FROM temp_trip_table_v2").show()
spark.sql("SELECT count(*) as Count_NJ FROM temp_trip_table_v2 WHERE destination = 'New Jersey'").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+
|Count_total|
+-----------+
|    2000020|
+-----------+

+--------+
|Count_NJ|
+--------+
|  200000|
+--------+

In [45]:
df_v3.createOrReplaceTempView("temp_trip_table_v3")
spark.sql("SELECT count(*) as Count_total FROM temp_trip_table_v3").show()
spark.sql("SELECT count(*) as Count_NJ FROM temp_trip_table_v3 WHERE destination = 'New Jersey'").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+
|Count_total|
+-----------+
|    1800020|
+-----------+

+--------+
|Count_NJ|
+--------+
|       0|
+--------+

## References

- [`io.delta:delta-spark` in maven repository](https://mvnrepository.com/artifact/io.delta/delta-spark_2.13/3.1.0)
- [Use a Delta Lake cluster with Spark in Amazon EMR](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/Deltausing-cluster-spark.html)
- [Amazon EMR Release Guide](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-release-components.html)
  - [Application versions in Amazon EMR 7.x releases](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-release-app-versions-7.x.html)
  - [Application versions in Amazon EMR 6.x releases](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-release-app-versions-6.x.html)