# Data Lakes on Acid - Delta Demo
![javazone](https://datalandsbyen.norge.no/assets/uploads/files/1654846045873-9019c0fa-4dfb-4849-b897-6b31a2679ed3-image.png)

In [1]:
data_path = 'abfss://home@stjz2022demo.dfs.core.windows.net/oslobysykkel'

StatementMeta(pool01, 10, 2, Finished, Available)

# API V1 - 2016 - 2018
Here we read trip data from the "v1" API, covering the years from 2016 - 2018.

The schema here is very simple, with only 4 fields.

In [2]:
v1Data = (spark.read
        .format("csv")
        .option("header", "true")
        .option("inferSchema", "true")
        .load(f"{data_path}/raw/csv/v1/*.csv")
)

print(f"Total number of records: {v1Data.count()}")
print(v1Data.printSchema())
display(v1Data.limit(5))

StatementMeta(pool01, 10, 3, Finished, Available)

Total number of records: 7747817
root
 |-- Start station: integer (nullable = true)
 |-- Start time: string (nullable = true)
 |-- End station: integer (nullable = true)
 |-- End time: string (nullable = true)

None


SynapseWidget(Synapse.DataFrame, a08e9a3c-749a-4ccf-b38f-895fb5d0ca3c)

# Legacy Station Ids

The station_ids used in the v1 API have been updated, so we need to map the legacy id to the new id values.

First we'll inspect the "dimension table" that provides the mapping.

In [3]:
stationIdDimDf = (spark.read
    .format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load(f"{data_path}/raw/csv/legacy_new_station_id_mapping.csv")
)
display(stationIdDimDf.limit(5))

StatementMeta(pool01, 10, 4, Finished, Available)

SynapseWidget(Synapse.DataFrame, 4acd3209-b1a8-4a62-ad2d-658231901889)

## Map legacy id to a current id

Now we will:
1. update the column names to lower_snake_case
2. add new columns for the year and the month from the start date, so that we can use this for logical partitioning
3. add a new column for the API version ("1")
4. join the v1 data with the dimension table for mapping both start_station_id and end_station_id


In [4]:
import pyspark.sql.functions as F

v1Data = (v1Data
    .withColumnRenamed("Start station", "start_station_id")
    .withColumnRenamed("End station", "end_station_id")
    .withColumnRenamed("Start time", "started_at")
    .withColumnRenamed("End time", "ended_at")
    .withColumn("start_year", F.year("started_at"))
    .withColumn("start_month", F.month("started_at"))
    .withColumn("api_version", F.lit(1))
)

v1Data = (v1Data.join(stationIdDimDf, v1Data.start_station_id ==  stationIdDimDf.legacy_id, "inner")
    .drop("start_station_id", "legacy_id")
    .withColumnRenamed("new_id", "start_station_id")
    .join(stationIdDimDf, v1Data.end_station_id ==  stationIdDimDf.legacy_id, "inner")
    .drop("end_station_id", "legacy_id")
    .withColumnRenamed("new_id", "end_station_id"))

display(v1Data.limit(5))

StatementMeta(pool01, 10, 5, Finished, Available)

SynapseWidget(Synapse.DataFrame, 6818c452-136a-4f43-bd83-242655307f81)

# Write to Delta Table

Now we'll write the data to a Delta table!

Note that we can tell Spark to automatically partition the data
by `start_year` and `start_month`.

In [5]:
(v1Data.write
    .mode("append")
    .partitionBy("start_year", "start_month")
    .format("delta")
    .save(f"{data_path}/processed/delta")
)

StatementMeta(pool01, 10, 6, Finished, Available)

# API V2 (2019 - 2022)

1. Read the raw data (CSV format)
2. Extract `start_year` and `start_month`
3. Add `api_version` = 3
4. Review the data

In [6]:
v2Data = (spark.read
        .format("csv")
        .option("header", "true")
        .option("inferSchema", "true")
        .load(f"{data_path}/raw/csv/v2/*.csv")
)

v2Data = (v2Data
    .withColumn("start_year", F.year("started_at"))
    .withColumn("start_month", F.month("started_at"))
    .withColumn("api_version", F.lit(2))
)

print(f"Total number of records: {v2Data.count()}")
print(v2Data.printSchema())
display(v2Data.limit(5))

StatementMeta(pool01, 10, 7, Finished, Available)

Total number of records: 6233450
root
 |-- started_at: string (nullable = true)
 |-- ended_at: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- start_station_id: integer (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_description: string (nullable = true)
 |-- start_station_latitude: double (nullable = true)
 |-- start_station_longitude: double (nullable = true)
 |-- end_station_id: integer (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_description: string (nullable = true)
 |-- end_station_latitude: double (nullable = true)
 |-- end_station_longitude: double (nullable = true)
 |-- start_year: integer (nullable = true)
 |-- start_month: integer (nullable = true)
 |-- api_version: integer (nullable = false)

None


SynapseWidget(Synapse.DataFrame, afc67d23-6436-49ce-8f34-8f1ce43322b3)

## Write v2 to Delta

Now we append the v2 data to the same Delta table!... or do we?



In [8]:
(v2Data.write
    .mode("append")
    .partitionBy("start_year", "start_month")
    .format("delta")
    .save(f"{data_path}/processed/delta")
)

StatementMeta(pool01, 10, 9, Finished, Available)

Alternatively we can configure the spark context to _always_ allow for drift.

```
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")
```

# Delta Table API

We use the API to create a variable named `dt` pointing to our DeltaTable.alias

In [9]:
from delta.tables import DeltaTable

dt = DeltaTable.forPath(spark, f"{data_path}/processed/delta")

StatementMeta(pool01, 10, 10, Finished, Available)

We have a quick look at the table to confirm that both v1 and v2 data are in the table.

In [20]:
display(dt.toDF().where("api_version = 1").limit(3))
display(dt.toDF().where("api_version = 2").limit(3))

StatementMeta(pool01, 10, 21, Finished, Available)

SynapseWidget(Synapse.DataFrame, dbc6fdfb-27df-4b48-a85b-45b167744413)

SynapseWidget(Synapse.DataFrame, 4042975a-612f-45b9-9f30-0761dd826ebf)

## Delta History



In [11]:
display(dt.history())
total_rows = dt.toDF().count()
print(f"Total rows: {total_rows}")

StatementMeta(pool01, 10, 12, Finished, Available)

SynapseWidget(Synapse.DataFrame, 2e4f74ac-6c6a-48fd-8120-9f81fc2b22d5)

In [13]:
# Oh no! A sophisticated hacker has infiltrated the notebook
# and executed the delete() command, removing all data!  
dt.delete()

StatementMeta(pool01, 10, 14, Finished, Available)

In [14]:
# Count the table again
total_rows = dt.toDF().count()
print(f"Total rows: {total_rows}")

StatementMeta(pool01, 10, 15, Finished, Available)

Total rows: 0


## Time to Travel with Time Travel!
![Delorean at Rebel](https://www.kode24.no/images/74140596.jpg?imageId=74140596&width=320&height=190)

First have a peek a the history in the Delta log

In [15]:
display(dt.history())

StatementMeta(pool01, 10, 16, Finished, Available)

SynapseWidget(Synapse.DataFrame, ead63b09-9bde-40cd-83f4-f60262aea043)

Restore to version `1` and count the data again

In [16]:
# Alternatively: dt.restoreToTimestamp
dt.restoreToVersion(1)
print(f"Total rows: {dt.toDF().count()}")

StatementMeta(pool01, 10, 17, Finished, Available)

Total rows: 11539308


In [31]:
display(dt.history())

StatementMeta(pool01, 10, 32, Finished, Available)

SynapseWidget(Synapse.DataFrame, c0c75224-c791-456b-a2b6-22f61cd2a3b1)

# Structured Streaming demo (if there's time)

"Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine."

The general pattern is to read from a "source", perform any transformations, and write to a "sink".

Here we use the "rate" source to generate fake data for October 2022 and tagged as `api_version = 3`
and then write the data to the Delta table sink.

In [33]:
from pyspark.sql.functions import col, lit, rand, round

# Change to True to enable the stream
there_is_still_time = False

if there_is_still_time:
    # Let's generate some fake data as "api_version = 3"
    delta_checkpoint_path= f"{data_path}/processed/deltacheckpoint"

    df = (spark.readStream
        .format("rate")
        .option("rowsPerSecond", 10)
        .load()
        .withColumn("start_station_id", lit(round(rand()*500)).cast("int"))
        .withColumn("end_station_id", lit(round(rand()*500)).cast("int"))
        .withColumn("started_at", col("timestamp").cast("string"))
        .withColumn("ended_at", col("timestamp").cast("string"))
        .withColumn("start_year", lit(2022))
        .withColumn("start_month", lit(10))
        .withColumn("api_version", lit(3))
    )

    df = df.select("start_station_id", "end_station_id", "started_at", "ended_at", "start_year", "start_month", "api_version")

    query = (df.writeStream
        .format("delta")
        .option("checkpointLocation", delta_checkpoint_path)
        .start(f"{data_path}/processed/delta")
    )

StatementMeta(pool01, 10, 34, Finished, Available)

In [43]:
# Verify that the stream is active
query.isActive

StatementMeta(pool01, 10, 44, Finished, Available)

False

In [36]:
# Count the fake new "version 3" records that are being streamed in
count = dt.toDF().where("api_version = 3").count()
print(count)

StatementMeta(pool01, 10, 37, Finished, Available)

434


## Medallion Architecture 

Using Structured Streams and Delta tables we can set up an automated data pipeline that processes
data through incremental stages from a "raw" (AKA "bronze") table to a final data product (AKA "gold").crc

Databricks refers to this as a ["Medallion architecture"](https://www.databricks.com/glossary/medallion-architecture)

![Medallion architecture](https://www.databricks.com/wp-content/uploads/2022/03/delta-lake-medallion-architecture-2.jpeg)

Note that here we rebuild the table every time (`.outputMode("complete")`).

You generally would not want to do this on a frequently-updated stream.

In [37]:
if there_is_still_time:
    groupedQuery = (spark.readStream
        .format("delta")
        .load(f"{data_path}/processed/delta")
        .groupBy("start_year", "start_month")
        .count()
        .writeStream
        .format("delta")
        .outputMode("complete")
        .option("checkpointLocation", f"{data_path}/processed/groupedTripsCheckpoint")
        .start(f"{data_path}/processed/groupedTrips")
    )

StatementMeta(pool01, 10, 38, Finished, Available)

Let's have a look at the streaming data (October 2022)

In [39]:
from pyspark.sql.functions import col

if there_is_still_time and groupedQuery.isActive:
    df = (spark.read
        .format("delta")
        .load(f"{data_path}/processed/groupedTrips")
        .where("start_year = 2022 and start_month = 10")
        .orderBy(col("count").desc())
    )
    display(df)

StatementMeta(pool01, 10, 40, Finished, Available)

SynapseWidget(Synapse.DataFrame, dbc2790a-62cb-4227-9857-9803a2ead697)

In [41]:
display(dt.history().limit(5))

StatementMeta(pool01, 10, 42, Finished, Available)

SynapseWidget(Synapse.DataFrame, a389131f-b1d3-4caf-87a8-41e1cfb399dd)

In [42]:
if query.isActive:
    query.stop()
if groupedQuery.isActive:
    groupedQuery.stop()

StatementMeta(pool01, 10, 43, Finished, Available)