In [1]:
#author: Adrian J
#TO-DO: move the foreachbatch functions to a util
import pytz
from datetime import datetime
from delta import *
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import TimestampType, DateType, IntegerType, StringType
from pyspark.sql import Row

In [2]:
#get or create spark delta session
builder = (
    SparkSession
    .builder
    .master("local")
    .appName("p0_stream_triggers")
)
spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [3]:
#source table in raw zone
source_schema = 'assignment_data'
source_table_name = 'r_session_events'
source_dl_raw_path = f'/home/jovyan/work/data_lake/raw/{source_schema}/{source_table_name}/'

#delta sink table uc zone
sink_dl_schema = 'uc_assignment'
sink_table_name = 'uc_delta_session_events'
sink_dl_uc_path = f'/home/jovyan/work/data_lake/use_case/{sink_dl_schema}/{sink_table_name}'

#assuming finland time is the default time for the data lake ts
FinlandTimeZone = pytz.timezone('Europe/Helsinki')
#all columns from source
source_select_columns = ["country", "player_id", "session_id", "ts"]

In [4]:
#READ and WRITE START and END events to data lake as UPSERT
#To-do move the method to outside this notebook
def upsert_to_delta(microbatch_input, batchId):
    "Method used for upsert start events into event delta table"
    sink_start_dl_schema = 'uc_assignment'
    sink_start_table_name = 'uc_delta_session_events'
    sink_start_dl_uc_path = f'/home/jovyan/work/data_lake/use_case/{sink_start_dl_schema}/{sink_start_table_name}'
    existingTable = DeltaTable.forPath(spark, sink_start_dl_uc_path)
    #merge statement 
    existingTable.alias("t") \
    .merge(
        microbatch_input.alias("i"),
        "t.session_id = i.session_id") \
     .whenMatchedUpdate(
        set = {"end_ts": F.col("i.end_ts"), "DL_UPDATE_TS": F.col("i.DL_UPDATE_TS"), 
               "session_status":F.col("i.session_status"), "EVENT_DATE":F.col("i.EVENT_DATE")}) \
     .whenNotMatchedInsert(
        values = {"country": "i.country", "player_id":"i.player_id", "session_id": "i.session_id",
                  "start_ts": "i.start_ts", "end_ts": "i.end_ts", "session_status":"i.session_status", 
                  "DL_INSERT_TS": "i.DL_INSERT_TS", "DL_UPDATE_TS":"i.DL_UPDATE_TS", "EVENT_DATE":"i.EVENT_DATE"} ) \
     .execute()
    metadata = (existingTable.history(1))
    #print(f'INFO: Upsert Mode -> {upsert_mode}')
    print(f'INFO: Upsert for table r_session_events completed')
    print(f"INFO: metadata = {metadata.collect()[0]}")

#order
order_col = ["country", "player_id", "session_id", "start_ts", "end_ts", "session_status", "DL_INSERT_TS", "DL_UPDATE_TS", "EVENT_DATE"]
#read and transform as required
strm_start_events = (
    spark
    .readStream
    .format("delta")
    .load(source_dl_raw_path)
    .select(F.col("country").alias("country"),
            F.col("player_id").alias("player_id"),
            F.col("session_id").cast(StringType()).alias("session_id"),
            F.when(F.col("event")=="start", F.col("ts").cast(TimestampType())).otherwise(F.lit(None)).alias("start_ts"),
            F.when(F.col("event")=="end", F.col("ts").cast(TimestampType())).otherwise(F.lit(None)).alias("end_ts"),
            F.when(F.col("event")=="start", F.lit("0").cast(IntegerType())).otherwise(F.lit("1").cast(IntegerType())).alias("session_status"),
            F.lit(datetime.now(FinlandTimeZone).strftime('%Y-%m-%d %H:%M:%S')).cast(TimestampType()).alias("DL_INSERT_TS"),
            F.lit(datetime.now(FinlandTimeZone).strftime('%Y-%m-%d %H:%M:%S')).cast(TimestampType()).alias("DL_UPDATE_TS"),
            F.col("ts").cast(DateType()).alias("EVENT_DATE")))
#group to merge the sessions from same day
strm_start_events = (
    strm_start_events
    .groupBy("player_id", "session_id")
    .agg(F.max("country").alias("country"),
         F.min("start_ts").alias("start_ts"), 
         F.max("end_ts").alias("end_ts"), 
         F.max("session_status").alias("session_status"),
         F.min("DL_INSERT_TS").alias("DL_INSERT_TS"),
         F.max("DL_UPDATE_TS").alias("DL_UPDATE_TS"),
         F.max("EVENT_DATE").alias("EVENT_DATE"))##remove from table redundant
    .select(*order_col))
#upser for eachbacth 
strm_start_events = (
    strm_start_events
    .writeStream
    .format("delta")
    .foreachBatch(upsert_to_delta)
    .outputMode("update")
    .option("checkpointLocation", f"{sink_dl_uc_path}/_checkpoint")
    .start(sink_dl_uc_path)
)
print("INFO: listener to write into delta initiated")

INFO: metadata = Row(version=1, timestamp=datetime.datetime(2022, 5, 4, 14, 45, 5, 897000), userId=None, userName=None, operation='MERGE', operationParameters={'matchedPredicates': '[{"actionType":"update"}]', 'predicate': '(t.session_id = i.session_id)', 'notMatchedPredicates': '[{"actionType":"insert"}]'}, job=None, notebook=None, clusterId=None, readVersion=0, isolationLevel='Serializable', isBlindAppend=False, operationMetrics={'numOutputRows': '17227', 'numTargetRowsInserted': '17227', 'numTargetRowsUpdated': '0', 'numTargetFilesAdded': '1', 'numTargetFilesRemoved': '0', 'numTargetRowsDeleted': '0', 'scanTimeMs': '2227', 'numSourceRows': '0', 'executionTimeMs': '132635', 'numTargetRowsCopied': '0', 'rewriteTimeMs': '130246'}, userMetadata=None, engineInfo='Apache-Spark/3.2.1 Delta-Lake/1.2.0')
