## This Notebook will Processing Ratings Data in Streaming Using Structured Streaming
Spark Version 3.1.2
Databricks Runtime LTS 9.1

In [0]:
from pyspark.sql.functions import to_date,split,input_file_name, from_unixtime
from pyspark.sql.types import DateType
from delta.tables import *
import logging

### Common Utilities functions

In [0]:
%run ../util/utils

In [0]:
rating_filepath="dbfs:/FileStore/tables/asos_data/rating/"
checkpoint_path="dbfs:/FileStore/tables/asos_delta_std/rating_checkpoint"
rating_outpath="dbfs:/FileStore/tables/asos_delta_std/rating/"

### Input Raw folder structure

In [0]:
display(dbutils.fs.ls(rating_filepath))

path,name,size
dbfs:/FileStore/tables/asos_data/rating/20220306/,20220306/,0


### Getting Rating Schema from latest file path

In [0]:
rating_schema_filepath = get_latest_file_path(rating_filepath)
rating_schema = get_schema(rating_schema_filepath, "csv")

### Based on the user id and movie id,data will update when matched else insert.

In [0]:
def upsertToDelta(microBatchOutputDF, batchId): 
  """ This function does upserts"""
  logging.info(f"Rows Processed in this Dataframe for the BatchId:{batchId} Count:{microBatchOutputDF.count()}")
  microBatchOutputDF = microBatchOutputDF.dropDuplicates(["user_id", "movie_id"])
  (rating_target_df.alias("target")
   .merge(microBatchOutputDF.alias("source"),"source.user_id = target.user_id and source.movie_id = target.movie_id")
   .whenMatchedUpdateAll()
   .whenNotMatchedInsertAll()
   .execute())

In [0]:
spark.createDataFrame([],rating_schema).write.format("delta").mode("overwrite").save(rating_outpath)

In [0]:
rating_target_df = DeltaTable.forPath(spark, rating_outpath)

In [0]:
df_rating = spark.readStream.format("csv").schema(rating_schema).option("header", True).option("inferSchema", "true").load(rating_filepath + "/*")

### When writing for a specific batch the upsertToDelta function will be invoked to perform the MERGE INTO

In [0]:
df_rating.writeStream.format("delta").outputMode("append").foreachBatch(upsertToDelta).option('checkpointLocation', checkpoint_path).start(rating_outpath)

Out[13]: <pyspark.sql.streaming.StreamingQuery at 0x7f7efdcdb910>

### Output displayed from the movies output path

In [0]:
display(spark.read.format("delta").load(rating_outpath).limit(10))

user_id,movie_id,rating,created_timestamp
648,648,4,891384754
259,259,2,881705026
295,295,3,887741461
49,49,3,878183512
331,331,3,879455166
1000,1000,3,879877173
1328,1328,4,890116132
735,735,4,879524718
306,306,1,878962006
901,901,1,883670672


### Below are the output folders with the checkpoint in place

In [0]:
display(dbutils.fs.ls("/FileStore/tables/asos_delta_std"))

path,name,size
dbfs:/FileStore/tables/asos_delta_std/movies/,movies/,0
dbfs:/FileStore/tables/asos_delta_std/movies_checkpoint/,movies_checkpoint/,0
dbfs:/FileStore/tables/asos_delta_std/rating/,rating/,0
dbfs:/FileStore/tables/asos_delta_std/rating_checkpoint/,rating_checkpoint/,0
dbfs:/FileStore/tables/asos_delta_std/tags/,tags/,0
dbfs:/FileStore/tables/asos_delta_std/tags_checkpoint/,tags_checkpoint/,0
