# Chapter 9: Building Reliable Data Lakes with Apache Spark
Christoph Windheuser    
May, 2022   
Python examples of chapter 9 (page 265 ff) in the book *Learning Spark*

In order to use delta-lake functionality in PySpark, add the following line to the `spark-defaults.conf` file in `$SPARK_HOME/conf` directory:

```spark.jars.packages io.delta:delta-core_2.12:1.1.0```

If you want to add more jar packages, separate them with comma **without blanks**!

If no file `spark-defaults.conf` exists in `$SPARK_HOME/conf` but a file `spark-defaults.conf.template`, then copy this template file to `spark-defaults.conf` and do the changes in the new file.


In [1]:
# Import required python spark libraries
import pyspark
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.streaming import StreamingContext
from delta import *


## Run Delta Lake from Jupyter Notebooks:
See Stackoverflow: https://stackoverflow.com/questions/57740693/how-to-refer-deltalake-tables-in-jupyter-notebook-using-pyspark

# create a SparkSession
# This requires access to the internet. If executed offline, an error is thrown

builder = (SparkSession \
         .builder \
         .appName("Chapter_9") \
         .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
         .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()


In [2]:
# SparkSession 
spark = (
    SparkSession.builder
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") 
    .getOrCreate()
)


In [3]:
from delta.tables import *

## Loading Data into a Delta Lake Table
Page 275 ff

In [35]:
sourcePath = "../DB_Spark/LearningSparkV2/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"

deltaPath  = "/tmp/spark/loans_delta"

(spark.read.format("parquet").load(sourcePath)
    .write.format("delta").save(deltaPath))


In [36]:
spark.read.format("delta").load(deltaPath).createOrReplaceTempView("loans_delta")


In [37]:
# Count the rows:
spark.sql("SELECT count(*) FROM loans_delta").show()

+--------+
|count(1)|
+--------+
|   14705|
+--------+



In [38]:
# Show the first 5 rows of the table:
spark.sql("SELECT * FROM loans_delta LIMIT 5").show()

+-------+-----------+---------+----------+
|loan_id|funded_amnt|paid_amnt|addr_state|
+-------+-----------+---------+----------+
|      0|       1000|   182.22|        CA|
|      1|       1000|   361.19|        WA|
|      2|       1000|   176.26|        TX|
|      3|       1000|   1000.0|        OK|
|      4|       1000|   249.98|        PA|
+-------+-----------+---------+----------+



## Enforcing Schema on Write to Prevent Data Coruption
Page 278

When we try to append new rows with another schema (the row 'closed' was added) to the existing delta table, Spark throws an error *A schema mismatch detected*. The error message shows the two different schemas. 

In [39]:
cols  = ['loan_id', 'funded_amnt', 'paid_amnt', 'addr_state', 'closed']
items = [(1111111, 1000, 1000.0, 'TX', True),
        (2222222, 2000, 0.0,    'CA', False)]

loanUpdates = (spark.createDataFrame(items, cols)
              .withColumn("funded_amnt", col("funded_amnt").cast("int")))

loanUpdates.write.format("delta").mode("append").save(deltaPath)


AnalysisException: A schema mismatch detected when writing to the Delta table (Table ID: 21ee7030-632b-4306-92b7-9ca406724adb).
To enable schema migration using DataFrameWriter or DataStreamWriter, please set:
'.option("mergeSchema", "true")'.
For other operations, set the session configuration
spark.databricks.delta.schema.autoMerge.enabled to "true". See the documentation
specific to the operation for details.

Table schema:
root
-- loan_id: long (nullable = true)
-- funded_amnt: integer (nullable = true)
-- paid_amnt: double (nullable = true)
-- addr_state: string (nullable = true)


Data schema:
root
-- loan_id: long (nullable = true)
-- funded_amnt: integer (nullable = true)
-- paid_amnt: double (nullable = true)
-- addr_state: string (nullable = true)
-- closed: boolean (nullable = true)

         

## Schema Migration
Page 279
If we want to adapt the original schema to the new schema, we can do this with setting `mergeSchema`to `True`.

In [40]:
(loanUpdates.write.format("delta").mode("append")
    .option("mergeSchema", "true")
    .save(deltaPath))


In [41]:
# Show the first 5 rows of the new table:
loanUpdatesNew = spark.read.format("delta").load(deltaPath)
loanUpdatesNew.show(5)

+-------+-----------+---------+----------+------+
|loan_id|funded_amnt|paid_amnt|addr_state|closed|
+-------+-----------+---------+----------+------+
|      0|       1000|   182.22|        CA|  null|
|      1|       1000|   361.19|        WA|  null|
|      2|       1000|   176.26|        TX|  null|
|      3|       1000|   1000.0|        OK|  null|
|      4|       1000|   249.98|        PA|  null|
+-------+-----------+---------+----------+------+
only showing top 5 rows



## Updating Data
Page 280

Change all Adress States 'WA' to 'OR':

In [42]:
deltaTable = DeltaTable.forPath(spark, deltaPath)


In [43]:
deltaTable.update("addr_state = 'WA'", {"addr_state": "'OR'"})


In [44]:
# Show the first 5 rows of the new table:
loanUpdatesNew = spark.read.format("delta").load(deltaPath)
loanUpdatesNew.show(5)

+-------+-----------+---------+----------+------+
|loan_id|funded_amnt|paid_amnt|addr_state|closed|
+-------+-----------+---------+----------+------+
|      0|       1000|   182.22|        CA|  null|
|      1|       1000|   361.19|        OR|  null|
|      2|       1000|   176.26|        TX|  null|
|      3|       1000|   1000.0|        OK|  null|
|      4|       1000|   249.98|        PA|  null|
+-------+-----------+---------+----------+------+
only showing top 5 rows



## Deleting Data
Page 280

Delete all rows where the loan is fully paid:

In [45]:
deltaTable = DeltaTable.forPath(spark, deltaPath)
deltaTable.delete("funded_amnt <= paid_amnt")

In [46]:
# Show the first 5 rows of the new table:
loanUpdatesNew = spark.read.format("delta").load(deltaPath)
loanUpdatesNew.show(5)

+-------+-----------+---------+----------+------+
|loan_id|funded_amnt|paid_amnt|addr_state|closed|
+-------+-----------+---------+----------+------+
|      0|       1000|   182.22|        CA|  null|
|      1|       1000|   361.19|        OR|  null|
|      2|       1000|   176.26|        TX|  null|
|      4|       1000|   249.98|        PA|  null|
|      5|       1000|    408.6|        CA|  null|
+-------+-----------+---------+----------+------+
only showing top 5 rows



## Finally delete the delta table to be able to create it again next time

In [34]:
# Delete the delta tables on the file system:
!rm /tmp/spark/loans_delta/*.parquet
!rm -rf /tmp/spark/loans_delta/_delta_log
