<img src="https://databricks.com/wp-content/uploads/2018/10/spark-summit-19-black.png" />

##![Spark Logo Tiny](https://kpistoropen.blob.core.windows.net/collateral/roadshow/logo_spark_tiny.png) Overview

<img width="300px" src="https://docs.delta.io/latest/_static/delta-lake-logo.png">

The core abstraction of the Delta Lake is an ACID compliant Spark Table.


* Stored as PARQUET format in blob storage
* ACID Transactions
* Snapshot Isolation
* Scalable Metadata management
* Schema Enforcement
* Time Travel

##![Spark Logo Tiny](https://kpistoropen.blob.core.windows.net/collateral/roadshow/logo_spark_tiny.png) Scenario

You are a Data Engineer working for a company that processes data collected from many IoT devices.  You've been tasked to build an end-to-end pipeline to capture and process this data in near real-time (NRT).  You can think of a few creative ways to accomplish this using Apache Spark and open formats such as Parquet, but in the design process you've uncovered some challenges:


* Many small files leading to bad downstream performance
* Frequent changes to business logic, therefore data schema

The frequent changes to business logic is the scariest, as we may need to backfill the data multiple times.

We need a reliable way to update the old data as we are streaming the latest data. Having a pipeline that accommodates maximum flexibility would make our life much easier. Delta Lake's ACID guarantees and unified batch/streaming support make it a good fit.

##![Spark Logo Tiny](https://kpistoropen.blob.core.windows.net/collateral/roadshow/logo_spark_tiny.png) Sample Dataset

In [5]:
PARQUET_PATH="/tmp/delta_tutorial/parquet_table"
DELTA_SILVER_PATH="/tmp/delta_tutorial/delta_table"
DELTA_GOLD_PATH="/tmp/delta_tutorial/delta_agg_table"

# Reset Env
dbutils.fs.rm(PARQUET_PATH, True)
dbutils.fs.rm(DELTA_SILVER_PATH, True)
dbutils.fs.rm(DELTA_GOLD_PATH, True)

# Make some configurations small-scale friendly
sql("set spark.sql.shuffle.partitions = 1")
sql("set spark.databricks.delta.snapshotPartitions = 1")

##![Spark Logo Tiny](https://kpistoropen.blob.core.windows.net/collateral/roadshow/logo_spark_tiny.png) Using Parquet

##### Create a dataframe to read in our sample dataset

In [8]:
from pyspark.sql.functions import expr
from pyspark.sql.types import *

raw_data = spark.range(100000) \
  .selectExpr("if(id % 2 = 0, 'Open', 'Close') as action") \
  .withColumn("date", expr("cast(concat('2019-04-', cast(rand(5) * 30 as int) + 1) as date)")) \
  .withColumn("device_id", expr("cast(rand(5) * 100 as int)"))

#####Write this out to parquet

In [10]:
raw_data.write.format("parquet").partitionBy("date").save(PARQUET_PATH)

In [11]:
display(spark.read.format("parquet").load(PARQUET_PATH))

In [12]:
display(spark.read.format("parquet").load(PARQUET_PATH).groupBy("action").count())

#####Append some data to our table

In [14]:
stream_data = spark.readStream.format("rate").option("rowsPerSecond", 100).load() \
  .selectExpr("'Open' as action") \
  .withColumn("date", expr("cast(concat('2019-04-', cast(rand(5) * 30 as int) + 1) as date)")) \
  .withColumn("device_id", expr("cast(rand(5) * 500 as int)"))

In [15]:
stream_data.writeStream.format("parquet").partitionBy("date").outputMode("append") \
  .trigger(processingTime='5 seconds').option('checkpointLocation', PARQUET_PATH + "/_chk").start(PARQUET_PATH)

In [16]:
display(spark.read.format("parquet").load(PARQUET_PATH).groupBy("action").count())

Uh-oh. What happened to our Close actions?

 - You can't combine streaming and batch in the FileSink implementation in Apache Spark

##![Spark Logo Tiny](https://kpistoropen.blob.core.windows.net/collateral/roadshow/logo_spark_tiny.png) Using Delta Lake

#####The syntax to create this table is almost identical, except we're using the format DELTA instead of PARQUET.

In [21]:
raw_data.write.format("delta").partitionBy("date").save(DELTA_SILVER_PATH)

In [22]:
display(spark.read.format("delta").load(DELTA_SILVER_PATH).groupBy("action").count())

In [23]:
stream_data.writeStream.format("delta").partitionBy("date").outputMode("append") \
  .trigger(processingTime='5 seconds').option('checkpointLocation', DELTA_SILVER_PATH + "/_chk").start(DELTA_SILVER_PATH)

In [24]:
display(spark.read.format("delta").load(DELTA_SILVER_PATH).groupBy("action").count())

#####Using our Delta Lake as a Source

In [26]:
delta_data_stream = spark.readStream \
  .option("maxFilesPerTrigger", "10") \
  .format("delta") \
  .load(DELTA_SILVER_PATH)
  

In [27]:
delta_data_stream.groupBy("action", "date", "device_id") \
  .count() \
  .writeStream \
  .format("delta") \
  .option("checkpointLocation", DELTA_GOLD_PATH + "/_checkpoint") \
  .partitionBy("date") \
  .outputMode("complete") \
  .start(DELTA_GOLD_PATH)

In [28]:
display(spark.read.format("delta").load(DELTA_GOLD_PATH).orderBy("date", "device_id", "action"))

##![Spark Logo Tiny](https://kpistoropen.blob.core.windows.net/collateral/roadshow/logo_spark_tiny.png) Schema Evolution

In [30]:
# Now we have more users, so let's add the user_id column to our table
new_data_with_new_col = spark.range(1000) \
  .selectExpr("'Open' as action","cast(concat('2019-04-', cast(rand(5) * 3 as int) + 1) as date) as date") \
  .withColumn("device_id", expr("cast(rand(5) * 100 as int)")) \
  .withColumn("user_id", expr("cast(rand(10) * 100 as int)"))
  
new_data_with_new_col.write.format("delta").partitionBy("date").mode("append").save(DELTA_SILVER_PATH)

In [31]:
# Add the mergeSchema option
new_data_with_new_col.write.option("mergeSchema","true").format("delta").partitionBy("date").mode("append").save(DELTA_SILVER_PATH)

##![Spark Logo Tiny](https://kpistoropen.blob.core.windows.net/collateral/roadshow/logo_spark_tiny.png) ACID Transactions

Let's go and take a look what happened to our streams

In [34]:
# Let's add the user_id column, changing some old partitions at a time
spark.read.format("delta").load(DELTA_SILVER_PATH) \
  .where("date <= '2019-05-01'") \
  .withColumn("user_id", expr("coalesce(user_id, cast(rand(10) * 100 as int))")) \
  .repartition(30, "date") \
  .write.format("delta").mode("overwrite") \
  .option("replaceWhere", "date = '2019-05-01'") \
  .save(DELTA_SILVER_PATH)

Delta doesn't support Hive style dynamic partition overwrites. It is bad practice, and leads to mistakes. Delta provides 'replaceWhere' to enforce data correctness.

In [36]:
# Let's restart the stream with the addition of user_id
spark.readStream \
  .option("maxFilesPerTrigger", "10") \
  .format("delta") \
  .load(DELTA_SILVER_PATH) \
  .groupBy("action", "date", "device_id", "user_id") \
  .count() \
  .writeStream \
  .format("delta") \
  .option("checkpointLocation", DELTA_GOLD_PATH + "/_checkpoint_new") \
  .option("overwriteSchema", "true") \
  .partitionBy("date") \
  .outputMode("complete") \
  .start(DELTA_GOLD_PATH)

In [37]:
display(spark.read.format("delta").load(DELTA_GOLD_PATH).orderBy("date", "device_id", "action"))

##![Spark Logo Tiny](https://kpistoropen.blob.core.windows.net/collateral/roadshow/logo_spark_tiny.png) Time Travel

The transaction log is stored along with the data under the `_delta_log` directory.

In [40]:
print("\n".join([f.name for f in dbutils.fs.ls(DELTA_SILVER_PATH + "/_delta_log") if f.name.endswith('json')]))

In [41]:
# latest version - 2, because in latest version - 1 we added the user_id column
version_before_schema_change = 10

In [42]:
display(spark.read.format("delta").load(DELTA_SILVER_PATH))

In [43]:
spark.read.format("delta").load(DELTA_SILVER_PATH).count()

In [44]:
display(spark.read.format("delta").option("versionAsOf", version_before_schema_change).load(DELTA_SILVER_PATH))

In [45]:
spark.read.format("delta").option("versionAsOf", version_before_schema_change).load(DELTA_SILVER_PATH).count()

##![Spark Logo Tiny](https://kpistoropen.blob.core.windows.net/collateral/roadshow/logo_spark_tiny.png) Additional Topics & Resources
* <a href="https://docs.delta.io/latest/index.html" target="_blank">Delta Lake Documentation</a>
* <a href="https://delta.io" target="_blank">Delta Lake Website</a>