# Delta Lake Lab 
## Unit 4: CRUD Support

This lab is powered by Dataproc Serverless Spark.

In the previous unit, we -
1. Create an unpartitioned delta table
2. Created a partitioned delta table called loan_db.loans_by_state_delta
3. Studied the files created & layout in the datalake
4. Learned how to look at delta table details
5. Looked at history (there was not any)
6. Created a manifest file
7. Reviewed entries in the Hive metastore

In this unit, we will learn how to -
1. Delete a record and study the delta log
2. Insert a record and study the delta log
3. Update a record and study the delta log
4. Upsert and study the delta log

### 1. Imports

In [None]:
import pandas as pd

from pyspark.sql.functions import month, date_format
from pyspark.sql.types import IntegerType
from pyspark.sql import SparkSession

from delta.tables import *

import warnings
warnings.filterwarnings('ignore')

### 2. Create a Spark session powered by Cloud Dataproc 

In [None]:
spark = SparkSession.builder.appName('Loan Analysis').getOrCreate()
spark

### 3. Declare variables

In [None]:
project_id_output = !gcloud config list --format "value(core.project)" 2>/dev/null
PROJECT_ID = project_id_output[0]
print("PROJECT_ID: ", PROJECT_ID)

In [None]:
project_name_output = !gcloud projects describe $PROJECT_ID | grep name | cut -d':' -f2 | xargs
PROJECT_NAME = project_name_output[0]
print("PROJECT_NAME: ", PROJECT_NAME)

In [None]:
project_number_output = !gcloud projects describe $PROJECT_ID | grep projectNumber | cut -d':' -f2 | xargs
PROJECT_NUMBER = project_number_output[0]
print("PROJECT_NUMBER: ", PROJECT_NUMBER)

In [None]:
ACCOUNT_NAME = "YOUR_ACCOUNT_NAME"

In [None]:
DATA_LAKE_ROOT_PATH= f"gs://dll-data-bucket-{PROJECT_NUMBER}-{ACCOUNT_NAME}"
DELTA_LAKE_DIR_ROOT = f"{DATA_LAKE_ROOT_PATH}/delta-consumable"

In [None]:
!gsutil ls -r $DELTA_LAKE_DIR_ROOT

### 4. Delete support

In [None]:
# Get the file count
!gsutil ls -r $DELTA_LAKE_DIR_ROOT/delta_consumable/part* | wc -l 

In [None]:
spark.sql("SELECT * FROM "+ ACCOUNT_NAME +"_loan_db.loans_by_state_delta WHERE addr_state='IA'").show(truncate=False)

In [None]:
spark.sql("DELETE FROM "+ ACCOUNT_NAME +"_loan_db.loans_by_state_delta WHERE addr_state='IA'").show(truncate=False)

In [None]:
# Get the file count
!gsutil ls -r $DELTA_LAKE_DIR_ROOT/delta_consumable/part* | wc -l 

In [None]:
spark.sql("SELECT * FROM "+ ACCOUNT_NAME +"_loan_db.loans_by_state_delta WHERE addr_state='IA'").show(truncate=False)

Lets look at the data lake:

In [None]:
# Note how the deleted created a json in the delta log directory
!gsutil ls -r $DELTA_LAKE_DIR_ROOT/_delta_log/* 

Lets look at the delta log:

In [None]:
# This is the original log
!gsutil cat $DELTA_LAKE_DIR_ROOT/_delta_log/00000000000000000000.json 

In [None]:
# Note the delete in this log
!gsutil cat $DELTA_LAKE_DIR_ROOT/_delta_log/00000000000000000001.json 

### 5. Create (Insert) support

In [None]:
# Get the file count
!gsutil ls -r $DELTA_LAKE_DIR_ROOT/delta_consumable/part* | wc -l 

In [None]:
spark.sql("INSERT INTO "+ ACCOUNT_NAME +"_loan_db.loans_by_state_delta VALUES ('IA',222222)")

In [None]:
spark.sql("SELECT * FROM "+ ACCOUNT_NAME +"_loan_db.loans_by_state_delta WHERE addr_state='IA'").show(truncate=False)

In [None]:
# Get the file count
!gsutil ls -r $DELTA_LAKE_DIR_ROOT/delta_consumable/part* | wc -l 

In [None]:
# Note how the insert created a new parquet file and in the delta log, yet another json
!gsutil ls -r $DELTA_LAKE_DIR_ROOT 

In [None]:
# Get the file count
!gsutil ls -r $DELTA_LAKE_DIR_ROOT/delta_consumable/part* | wc -l 

In [None]:
# Lets check for the insert
!gsutil cat $DELTA_LAKE_DIR_ROOT/_delta_log/00000000000000000002.json 

### 6. Update support

Lets update a record & see the changes in the delta log directory

In [None]:
# Get the file count
!gsutil ls -r $DELTA_LAKE_DIR_ROOT/delta_consumable/part* | wc -l 

In [None]:
spark.sql("UPDATE "+ ACCOUNT_NAME +"_loan_db.loans_by_state_delta SET count = 11111 WHERE addr_state='IA'").show(truncate=False)

In [None]:
spark.sql("SELECT * FROM "+ ACCOUNT_NAME +"_loan_db.loans_by_state_delta WHERE addr_state='IA'").show(truncate=False)

In [None]:
# Get the file count
!gsutil ls -r $DELTA_LAKE_DIR_ROOT/delta_consumable/part* | wc -l 

In [None]:
# Note how the update created a new parquet file and in the delta log, yet another json
!gsutil ls -r $DELTA_LAKE_DIR_ROOT 

In [None]:
# Lets check for the update
!gsutil cat $DELTA_LAKE_DIR_ROOT/_delta_log/00000000000000000003.json 

### 7. Upsert support

In [None]:
toBeMergedRows = [('IA', 555), ('CA', 12345), ('IN', 6666)]
toBeMergedColumns = ['addr_state', 'count']
toBeMergedDF = spark.createDataFrame(toBeMergedRows, toBeMergedColumns)
toBeMergedDF.createOrReplaceTempView("to_be_merged_table")
toBeMergedDF.orderBy("addr_state").show(3)

In [None]:
# Get the file count
!gsutil ls -r $DELTA_LAKE_DIR_ROOT/delta_consumable/part* | wc -l 

In [None]:
spark.sql("DELETE FROM "+ ACCOUNT_NAME +"_loan_db.loans_by_state_delta WHERE addr_state='IA'").show(truncate=False)

In [None]:
spark.sql("SELECT addr_state,count FROM "+ ACCOUNT_NAME +"_loan_db.loans_by_state_delta WHERE addr_state in ('IA','CA','IN') ORDER BY addr_state").show(truncate=False)

In [None]:
mergeSQLStatement = "MERGE INTO "+ ACCOUNT_NAME +"_loan_db.loans_by_state_delta as d USING to_be_merged_table as m ON (d.addr_state = m.addr_state) WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * "

print(mergeSQLStatement)


In [None]:
spark.sql(mergeSQLStatement).show(truncate=False)

In [None]:
spark.sql("SELECT addr_state,count FROM "+ ACCOUNT_NAME +"_loan_db.loans_by_state_delta WHERE addr_state in ('IA','CA','IN') ORDER BY addr_state").show(truncate=False)

In [None]:
# Get the file count
!gsutil ls -r $DELTA_LAKE_DIR_ROOT/part* | wc -l

In [None]:
# Note how the update created a new parquet file and in the delta log, yet another json
!gsutil ls -r $DELTA_LAKE_DIR_ROOT 

In [None]:
# Lets check for the upsert
!gsutil cat $DELTA_LAKE_DIR_ROOT/_delta_log/00000000000000000004.json 

### THIS CONCLUDES THIS UNIT. PROCEED TO THE NEXT NOTEBOOK