# Load data incrementally from Delta table to Snowflake


####  Run this cell to set up and start your interactive session.


In [None]:
%session_id_prefix delta-snowflake-incremental-
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5
%connections snowflake
%%configure
{
    "--datalake-formats": "delta",
    "--conf": "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog",
    "--extra-py-files": "/opt/aws_glue_connectors/selected/datalake/delta-core_2.12-2.1.0.jar"
}

#### Configure your resources

In [None]:
AWS_ACCOUNT_ID = "123456789101"
REGION = "us-east-1"

DELTA_DATASET_PATH = "s3://<Your S3 bucket>/<Your S3 prefix>/delta_incremental/ghcn/"

SNOWFLAKE_CONNECTION_NAME = "snowflake"
SNOWFLAKE_URL = "YOUR_SNOWFLAKE_URL"
SNOWFLAKE_SECRET_ID = "snowflake_credentials"
SNOWFLAKE_SCHEMA = "public"
SNOWFLAKE_WAREHOUSE_NAME = "YOUR_SNOWFLAKE_WAREHOUSE"
SNOWFLAKE_DATABASE_NAME = "YOUR_SNOWFLAKE_DATABASE"
SNOWFLAKE_TABLE_NAME = "ghcn"
SNOWFLAKE_TABLE_PRIMARY_KEYS = ["ID", "DATE", "ELEMENT"]

#### Initialize SparkSession and GlueContext

In [None]:
import sys
import json
import boto3
from botocore.exceptions import ClientError
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from delta.tables import *

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

params = []
if '--JOB_NAME' in sys.argv:
    params.append('JOB_NAME')
if '--TempDir' in sys.argv:
    params.append('TempDir')
args = getResolvedOptions(sys.argv, params)

job_name= None
if 'JOB_NAME' in args:
    job_name = args['JOB_NAME']
if not job_name:
    job_name = "delta-ghcn-incremental-load-notebook"

if 'TempDir' in args:
    temp_dir = args['TempDir']
if not temp_dir:
    temp_dir = f"s3://aws-glue-assets-{AWS_ACCOUNT_ID}-{REGION}/temporary/"


#### Determine target time range for CDC

In [None]:
glue = boto3.client('glue')

try:
    res = glue.get_tags(ResourceArn=f"arn:aws:glue:{REGION}:{AWS_ACCOUNT_ID}:job/{job_name}")
    if 'Tags' in res and 'lastQueryEndTime' in res['Tags']:
        beginTime = res['Tags']['lastQueryEndTime']
    else:
        beginTime = "1970-01-01 00:00:00" ### retrieve all
except Exception as e:
    raise Exception("Failed to retrieve lastQueryEndTime tag via get_tags: " + e.__str__())

beginTime += ".001" # to exclude the last commit processed at the previous run
print(f"beginTime: {beginTime}")


In [None]:
deltaTable = DeltaTable.forPath(spark, DELTA_DATASET_PATH)
lastOperationTimestamp = deltaTable.history(1).select("timestamp").collect()[0][0]

endTime=lastOperationTimestamp
print(f"endTime: {endTime}")

#### Run query

In [None]:
df = spark.read.format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingTimestamp", beginTime) \
    .option("endingTimestamp", endTime) \
    .load(DELTA_DATASET_PATH)

In [None]:
df.show(20)

#### Merge changes into destination table

In [None]:
column_names = [f.name for f in df.schema.fields]
print(column_names)

In [None]:
tmp_table_name = f"{SNOWFLAKE_TABLE_NAME}_tmp"

post_actions = f"BEGIN TRANSACTION; CREATE TABLE IF NOT EXISTS {SNOWFLAKE_SCHEMA}.{SNOWFLAKE_TABLE_NAME} AS SELECT * FROM {SNOWFLAKE_SCHEMA}.{tmp_table_name} WHERE 1=0; "
post_actions += f"MERGE INTO {SNOWFLAKE_SCHEMA}.{SNOWFLAKE_TABLE_NAME} USING {SNOWFLAKE_SCHEMA}.{tmp_table_name} ON "
post_actions += ' AND '.join(f"{SNOWFLAKE_SCHEMA}.{SNOWFLAKE_TABLE_NAME}.{pk} = {SNOWFLAKE_SCHEMA}.{tmp_table_name}.{pk}" for pk in SNOWFLAKE_TABLE_PRIMARY_KEYS)

post_actions += f" WHEN MATCHED AND {SNOWFLAKE_SCHEMA}.{tmp_table_name}._change_type = 'update_postimage' THEN UPDATE SET "
post_actions += ', '.join(f"{col} = {SNOWFLAKE_SCHEMA}.{tmp_table_name}.{col}" for col in column_names)

post_actions += f" WHEN MATCHED AND {SNOWFLAKE_SCHEMA}.{tmp_table_name}._change_type = 'delete' THEN DELETE"

post_actions += " WHEN NOT MATCHED THEN INSERT VALUES ("
post_actions += ', '.join(f"{SNOWFLAKE_SCHEMA}.{tmp_table_name}.{col}" for col in column_names)

post_actions += f"); DROP TABLE {SNOWFLAKE_SCHEMA}.{tmp_table_name}; COMMIT;"

print(f"post_actions: {post_actions}")

In [None]:
secretsmanager = boto3.client('secretsmanager')
res = secretsmanager.get_secret_value(SecretId="snowflake_credentials")
secret = json.loads(res['SecretString'])
sfUser = secret['sfUser']
sfPassword = secret['sfPassword']

In [None]:
df.write \
  .format("net.snowflake.spark.snowflake") \
  .option("sfUrl", SNOWFLAKE_URL) \
  .option("sfWarehouse", SNOWFLAKE_WAREHOUSE_NAME) \
  .option("sfDatabase", SNOWFLAKE_DATABASE_NAME) \
  .option("sfUser", sfUser) \
  .option("sfPassword", sfPassword) \
  .option("dbtable", tmp_table_name) \
  .option("postactions", post_actions) \
  .mode("error") \
  .save()

#### Update the last query end time

In [None]:
tag = {"lastQueryEndTime": f"{endTime}"}

try:
    glue.tag_resource(ResourceArn=f"arn:aws:glue:{REGION}:{AWS_ACCOUNT_ID}:job/{job_name}",TagsToAdd=tag)
except Exception as e:
    raise Exception("Failed to update lastQueryEndTime tag via tags_resource: " + e.__str__())

#### Update the record

In [None]:
deltaTable.update(
  condition = "ID = 'AE000041196' AND DATE = '20221231' AND ELEMENT = 'PRCP'",
  set = { "DATA_VALUE": "12345" }
)

#### Delete the record

In [None]:
deltaTable.delete("ID = 'AE000041196' AND DATE = '20221231' AND ELEMENT = 'TMAX'")