# Delta Lake PySpark Quickstart

Referring to https://docs.delta.io/latest/quick-start.html, the following steps have been taken care of by the docker image.

> Note: You do **NOT** need to run these 2 commands.

## Python Notes
```bash
pip install pyspark==<compatible-spark-version>

$SPARK_HOME/bin/pyspark --packages io.delta:<compatible-delta-version> --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
```

## Scala Notes
If you would like to follow the scala version open a terminal and follow the scala instructions starting with

```bash
$SPARK_HOME/bin/spark-shell --packages io.delta:<compatible-delta-version> --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
```

## Write to and read from a Delta Lake table

### Write a Spark DataFrame to a Delta Lake table

In [1]:
data = spark.range(0, 5)

(data
  .write
  .format("delta")
  .save("/tmp/delta-table")
)

                                                                                

### Read the above Delta Lake table to a Spark DataFrame and display the DataFrame

In [2]:
df = (spark
        .read
        .format("delta")
        .load("/tmp/delta-table")
        .orderBy("id")
      )

df.show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



## Overwrite a Delta Lake table

### Overwrite the Delta Lake table written in the above step

In [3]:
data = spark.range(5, 10)

(data
  .write
  .format("delta")
  .mode("overwrite")
  .save("/tmp/delta-table")
)

                                                                                

### Read the above overwritten Delta Lake table to a Spark DataFrame and display the DataFrame

In [4]:
df = (spark
        .read
        .format("delta")
        .load("/tmp/delta-table")
        .orderBy("id")
      )

df.show()

+---+
| id|
+---+
|  5|
|  6|
|  7|
|  8|
|  9|
+---+



## Showcase `update`, `delete` and `merge` features of Delta Lake and display the corresponding DataFrames

In [5]:
from delta.tables import *
from pyspark.sql.functions import *

delta_table = DeltaTable.forPath(spark, "/tmp/delta-table")

# Update every even value by adding 100 to it
(delta_table
  .update(
    condition = expr("id % 2 == 0"),
    set = { "id": expr("id + 100") }
  )
)
(delta_table
  .toDF()
  .orderBy("id")
  .show()
)

# Delete every even value
(delta_table
  .delete(
    condition = expr("id % 2 == 0")
  )
)

(delta_table
  .toDF()
  .orderBy("id")
  .show()
)

# Upsert (merge) new data
new_data = spark.range(0, 20)

(delta_table.alias("old_data")
  .merge(
      new_data.alias("new_data"),
      "old_data.id = new_data.id"
      )
  .whenMatchedUpdate(set = { "id": col("new_data.id") })
  .whenNotMatchedInsert(values = { "id": col("new_data.id") })
  .execute()
)

(delta_table
  .toDF()
  .orderBy("id")
  .show()
)

                                                                                

+---+
| id|
+---+
|  5|
|  7|
|  9|
|106|
|108|
+---+



                                                                                

+---+
| id|
+---+
|  5|
|  7|
|  9|
+---+



                                                                                

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+---+



## Time travel feature of Delta Lake

### Find the entire history of the above Delta Lake table

In [6]:
# get the full history of the table
delta_table_history = (DeltaTable
                        .forPath(spark, "/tmp/delta-table")
                        .history()
                      )

(delta_table_history
   .select("version", "timestamp", "operation", "operationParameters", "operationMetrics", "engineInfo")
   .show()
)

+-------+--------------------+---------+--------------------+--------------------+--------------------+
|version|           timestamp|operation| operationParameters|    operationMetrics|          engineInfo|
+-------+--------------------+---------+--------------------+--------------------+--------------------+
|      4|2023-04-13 06:11:...|    MERGE|{predicate -> (ol...|{numTargetRowsCop...|Apache-Spark/3.3....|
|      3|2023-04-13 06:11:...|   DELETE|{predicate -> ["(...|{numRemovedFiles ...|Apache-Spark/3.3....|
|      2|2023-04-13 06:11:...|   UPDATE|{predicate -> ((i...|{numRemovedFiles ...|Apache-Spark/3.3....|
|      1|2023-04-13 06:11:...|    WRITE|{mode -> Overwrit...|{numFiles -> 4, n...|Apache-Spark/3.3....|
|      0|2023-04-13 06:11:...|    WRITE|{mode -> ErrorIfE...|{numFiles -> 4, n...|Apache-Spark/3.3....|
+-------+--------------------+---------+--------------------+--------------------+--------------------+



### Latest version of the Delta Lake table

In [7]:
df = (spark
        .read
        .format("delta")
        .load("/tmp/delta-table")
        .orderBy("id")
      )

df.show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+---+



### Time travel to the version `0` of the Delta Lake table using Delta Lake's history feature

In [8]:
df = (spark
        .read
        .format("delta")
        .option("versionAsOf", 0)
        .load("/tmp/delta-table")
        .orderBy("id")
      )

df.show()

                                                                                

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



### Time travel to the version `3` of the Delta Lake table using Delta Lake's  history feature

In [None]:
df = (spark
        .read
        .format("delta")
        .option("versionAsOf", 3)
        .load("/tmp/delta-table")
        .orderBy("id")
      )

df.show()

## A little bit of Streaming

In [None]:
streaming_df = (spark
                 .readStream
                 .format("rate")
                 .load()
               )

stream = (streaming_df
            .selectExpr("value as id")
            .writeStream
            .format("delta")
            .option("checkpointLocation", "/tmp/checkpoint")
            .start("/tmp/delta-table")
          )

In [None]:
# To view the results of this step, view your container logs after execution using: docker logs --follow <first 4 number of container id>

stream2 = (spark
            .readStream
            .format("delta")
            .load("/tmp/delta-table")
            .writeStream
            .format("console")
            .start()
          )