## Simple UPSERT Delta Lake

**Prerequisites**:
 
- Download data [here](https://aws.amazon.com/blogs/big-data/handle-upsert-data-operations-using-open-source-delta-lake-and-aws-glue/)
- Crawl the data 


#### Enable Delta Lake Support 

In [None]:
%glue_version 3.0
%%configure
{
  "--datalake-formats": "delta"
}

In [None]:
%help

####  Run this cell to set up and start your interactive session.


In [None]:
%idle_timeout 2880
# %glue_version 3.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

## Read Fulload Delta Table Using SQL 

In [None]:
%%sql
SELECT * FROM `default`.`sample_delta_sample_delta_table` limit 10

## Upsert Delta Table using Spark DataFrame

In [None]:
BUCKET="upsert-demo-15092023"

In [None]:
from delta.tables import DeltaTable
from pyspark.sql.types import *
from pyspark.sql.functions import col
from pyspark.sql.functions import expr

In [None]:
schema = StructType() \
      .add("policy_id",IntegerType(),True) \
      .add("expiry_date",DateType(),True) \
      .add("location_name",StringType(),True) \
      .add("state_code",StringType(),True) \
      .add("region_name",StringType(),True) \
      .add("insured_value",IntegerType(),True) \
      .add("business_type",StringType(),True) \
      .add("earthquake_coverage",StringType(),True) \
      .add("flood_coverage",StringType(),True) 

In [None]:
# read the full load
sdf = spark.read.format("csv").option("header",True).schema(schema).load(f's3://{BUCKET}/fullload/')
sdf.printSchema()

In [None]:
# sdf.show(5)

In [None]:
# write data as DELTA TABLE
sdf.write.format("delta").mode("overwrite").save("s3://"+ BUCKET +"/delta/insurance/")

In [None]:
# please crawl it before can query 
# %%sql 
# SELECT * FROM `default`.`delta_insurance`

In [None]:
# read cdc update data 
cdc_df = spark.read.csv(f's3://{BUCKET}/cdcload/')

In [None]:
# df_update.show(5)

In [None]:
# read fullload to dataframe from existing delta table 
delta_df = DeltaTable.forPath(spark, "s3://"+ BUCKET +"/delta/insurance/")

In [None]:
# delta_df.toDF().show(5,True)

In [None]:
# UPSERT process if matches on the condition the update else insert
# if there is no keyword then create a data set with Insert, Update and Delete flag and do it separately.
# for delete it has to run in loop with delete condition, this script do not handle deletes.
    
final_df = delta_df.alias("prev_df").merge( \
source = cdc_df.alias("append_df"), \
#matching on primarykey
condition = expr("prev_df.policy_id = append_df._c1"))\
.whenMatchedUpdate(set= {
    "prev_df.expiry_date"           : col("append_df._c2"), 
    "prev_df.location_name"         : col("append_df._c3"),
    "prev_df.state_code"            : col("append_df._c4"),
    "prev_df.region_name"           : col("append_df._c5"), 
    "prev_df.insured_value"         : col("append_df._c6"),
    "prev_df.business_type"         : col("append_df._c7"),
    "prev_df.earthquake_coverage"   : col("append_df._c8"), 
    "prev_df.flood_coverage"        : col("append_df._c9")} )\
.whenNotMatchedInsert(values =
#inserting a new row to Delta table
{   "prev_df.policy_id"             : col("append_df._c1"),
    "prev_df.expiry_date"           : col("append_df._c2"), 
    "prev_df.location_name"         : col("append_df._c3"),
    "prev_df.state_code"            : col("append_df._c4"),
    "prev_df.region_name"           : col("append_df._c5"), 
    "prev_df.insured_value"         : col("append_df._c6"),
    "prev_df.business_type"         : col("append_df._c7"),
    "prev_df.earthquake_coverage"   : col("append_df._c8"), 
    "prev_df.flood_coverage"        : col("append_df._c9")
})\
.execute()

In [None]:
# read target table 
delta_df = DeltaTable.forPath(spark, "s3://"+ BUCKET +"/delta/insurance/")

In [None]:
delta_df.toDF().select("policy_id", "expiry_date").show(3)

## Filter on DataFrame 

Both filter and where work but where is for SQL familiarity

In [None]:
temp_df = delta_df.toDF()

In [None]:
temp_df.where("policy_id IN (100462, 100463, 100475)").show()

## Temp View Table and Query 

In [None]:
temp_df.createOrReplaceTempView("temp_view")

In [None]:
spark.sql("SELECT * FROM temp_view WHERE policy_id IN (100462, 100463, 100475)").show()