In [12]:
import pyspark
import pyspark.sql.functions as F
from delta import *
from delta.tables import *

builder = pyspark.sql.SparkSession.builder.appName("DF2_Practice") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") 

spark = configure_spark_with_delta_pip(builder).getOrCreate()

spark.conf.set("spark.sql.repl.eagerEval.enabled",True) # OK for exploration, not great for performance
spark.conf.set("spark.sql.repl.eagerEval.truncate", 500)


### Delta Lake continued, Intro to Structured Streaming

* Delta Lake
    * Data Lake to Data Warehouse: merge into
    * Updating table definition
    * Partition column
* Structured Streaming intro

In [13]:
# We will use a movies dataset and try to simulate dimensional modelling behaviour

# we don't need to create a separate variable for our dataframes
# if we have done testing/debugging, we can be more concise

(spark.read
 .option("delimiter","::")
 .option("inferSchema","true")
 .csv("movies/movies.dat")
 .toDF("MovieID","Title","Genres") # for naming columns
 .write
 .format("delta")
 .mode("overwrite")
 .saveAsTable("source_movies")
)

(spark.read
 .option("delimiter", "::")
 .option("inferSchema", "true")
 .csv("movies/ratings.dat")
 .toDF("UserID","MovieID","Rating","Timestamp")
 .write
 .format("delta")
 .mode("overwrite")
 .saveAsTable("source_ratings")
)

(spark.read
 .option("delimiter", "::")
 .option("inferSchema", "true")
 .csv("movies/users.dat")
 .toDF("UserID","Gender","Age","Occupation","Zip-code")
 .write
 .format("delta")
 .mode("overwrite")
 .saveAsTable("source_users")
)

In [14]:
# let's separate data into smaller batches for simulating incremental inserts
(spark.table("source_ratings")
 .filter(F.col("timestamp").cast("timestamp").cast("date").between('2000-01-01','2000-12-31'))
 .write
 .format("delta")
 .mode("overwrite")
 .saveAsTable("source_ratings_2000")
)

(spark.table("source_ratings")
 .filter(F.col("timestamp").cast("timestamp").cast("date").between('2001-01-01','2001-12-31'))
 .write
 .format("delta")
 .mode("overwrite")
 .saveAsTable("source_ratings_2001")
)

(spark.table("source_ratings")
 .filter(F.col("timestamp").cast("timestamp").cast("date").between('2002-01-01','2002-12-31'))
 .write
 .format("delta")
 .mode("overwrite")
 .saveAsTable("source_ratings_2002")
)

(spark.table("source_ratings")
 .filter(F.col("timestamp").cast("timestamp").cast("date").between('2003-01-01','2003-12-31'))
 .write
 .format("delta")
 .mode("overwrite")
 .saveAsTable("source_ratings_2003")
)

In [15]:
# Now we can create dimension and fact tables. We will only use year 2000 data for now

spark.sql("""
CREATE OR REPLACE TABLE DimUser USING DELTA AS
SELECT *
, CAST('2001-01-01' as date) ValidFrom
, CAST('9999-12-31' as date) ValidTo
FROM source_users
WHERE UserID IN (SELECT UserID FROM source_ratings_2000)
""")

spark.sql("""
CREATE OR REPLACE TABLE DimMovie USING DELTA  AS
SELECT *
, CAST('2001-01-01' as date) ValidFrom
, CAST('9999-12-31' as date) ValidTo
FROM source_movies
WHERE movieID IN (SELECT movieID FROM source_ratings_2000)
""")

spark.sql("""
CREATE OR REPLACE TABLE FactRating USING DELTA  AS
SELECT *
FROM source_ratings_2000
""")

In [16]:
# let's make some modifications to the data to see how handling slowly changing dimensions would work

spark.sql("""UPDATE source_users
SET Age = 21
WHERE Age = 1
""")

spark.sql("""UPDATE source_users
SET `Zip-code` = 12345
WHERE `Zip-Code` = 10023""")

spark.sql("""UPDATE source_movies
SET Genres = 'Comedy'
WHERE MovieID = 18""")

num_affected_rows
1


In [17]:
# the MERGE statement is a powerful way of upserting data:
# in the first statement, if the userid exists but something about the user has changed, then we invalidate the old data
# and if the userid does not exist, then we insert the new user

spark.sql("""
MERGE INTO DimUser tgt
USING (
  SELECT * FROM source_users
  WHERE UserID IN (SELECT UserID FROM source_ratings_2000)
  OR UserID IN (SELECT UserID FROM source_ratings_2001)
) src
ON tgt.UserID = src.UserID
WHEN MATCHED AND (tgt.Gender != src.Gender OR tgt.Age != src.Age OR tgt.Occupation != src.Occupation OR tgt.`Zip-code` != src.`Zip-code`)
  THEN UPDATE SET tgt.ValidTo = '2002-01-01'
WHEN NOT MATCHED THEN INSERT (UserID, Gender, Age, Occupation, `Zip-code`, ValidFrom, ValidTo) VALUES ( 
          src.UserID
          , src.Gender
          , src.Age
          , src.Occupation
          , src.`Zip-code`
          , CAST('2002-01-01' as date) 
          , CAST('9999-12-31' as date)
          )
""")

# in this statement, we add new data for all the rows that we invalidated with the previous statement
spark.sql("""
MERGE INTO DimUser tgt
USING (
  SELECT * FROM source_users
  WHERE UserID IN (SELECT UserID FROM source_ratings_2000)
  OR UserID IN (SELECT UserID FROM source_ratings_2001)
) src
ON tgt.UserID = src.UserID
AND tgt.Gender = src.Gender
AND tgt.Age = src.Age
AND tgt.Occupation = src.Occupation
AND tgt.`Zip-code` = src.`Zip-code`
WHEN NOT MATCHED THEN INSERT (UserID, Gender, Age, Occupation, `Zip-code`, ValidFrom, ValidTo) VALUES ( 
          src.UserID
          , src.Gender
          , src.Age
          , src.Occupation
          , src.`Zip-code`
          , CAST('2002-01-01' as date) 
          , CAST('9999-12-31' as date)
          )
""")

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
230,0,0,230


In [18]:
# using the Delta API
# the queries follow the same pattern as in the previous cell

dimMovieTable = DeltaTable.forName(spark, "dimMovie")
sourceMoviesDf = spark.sql("""
SELECT * FROM source_movies
WHERE MovieID IN (SELECT MovieID FROM source_ratings_2000)
OR MovieID IN (SELECT MovieID FROM source_ratings_2001)
""")

dimMovieTable.alias("tgt").merge(
  source = sourceMoviesDf.alias("src"),
  condition = "tgt.MovieID = src.MovieID"
).whenMatchedUpdate(
  condition = "tgt.Title != src.Title OR tgt.Genres != src.Genres",
  set = 
  {
   "ValidTo": "cast('2002-01-01' as date)"
  }
).whenNotMatchedInsert(values =
    {
      "MovieID": "src.MovieID",
      "Title": "src.Title",
      "Genres": "src.Genres",
      "ValidFrom": "cast('2002-01-01' as date)",
      "ValidTo": "cast('9999-12-31' as date)"
    }
).execute()


dimMovieTable.alias("tgt").merge(
  source = sourceMoviesDf.alias("src"),
  condition = "tgt.MovieID = src.MovieID AND tgt.Title = src.Title AND tgt.Genres = src.Genres"
).whenNotMatchedInsert(values =
    {
      "MovieID": "src.MovieID",
      "Title": "src.Title",
      "Genres": "src.Genres",
      "ValidFrom": "cast('2002-01-01' as date)",
      "ValidTo": "cast('9999-12-31' as date)"
    }
).execute()

In [19]:
# schema evolution - this cell is for reference on how schema is currently 

spark.table("FactRating").printSchema()

root
 |-- UserID: integer (nullable = true)
 |-- MovieID: integer (nullable = true)
 |-- Rating: integer (nullable = true)
 |-- Timestamp: integer (nullable = true)



In [20]:
# schema evolution
# our source data now includes a new column - date
# we can use mergeSchema to 

df = (spark.table("source_ratings_2001")
      .select("*", 
              F.col("timestamp")
              .cast("timestamp")
              .cast("date")
              .alias("date")
             )
      .write
      .format("delta")
      .mode("append")
      .option("mergeSchema", "true")
      .saveAsTable("FactRating")
     )

spark.table("FactRating").printSchema()

root
 |-- UserID: integer (nullable = true)
 |-- MovieID: integer (nullable = true)
 |-- Rating: integer (nullable = true)
 |-- Timestamp: integer (nullable = true)
 |-- date: date (nullable = true)



In [21]:
display(spark.table("FactRating"))

UserID,MovieID,Rating,Timestamp,date
2035,2202,4,979245334,2001-01-11
2035,2291,2,980114982,2001-01-21
2035,2903,2,979245309,2001-01-11
2036,912,5,993957674,2001-07-01
2036,1449,5,993957620,2001-07-01
2036,3083,5,993957620,2001-07-01
2036,1197,5,993957743,2001-07-01
2036,3129,3,993957620,2001-07-01
2036,1247,5,993957700,2001-07-01
2041,2987,4,1004131358,2001-10-26


In [22]:
# schema overwriting - mergeSchema is probably not what you want
# (here, the column names have changed; however, with mergeSchema, the old columns remain)

df = (spark.table("source_ratings")
     .select(F.col("userID").alias("user"), 
              F.col("timestamp")
              .cast("timestamp")
              .alias("utc_timestamp"), 
              F.col("timestamp")
              .cast("timestamp")
              .cast("date")
              .alias("utc_date")
             )
      .write
      .format("delta")
      .mode("overwrite")
      .option("mergeSchema", "true")
      .saveAsTable("FactRating")
     )

spark.table("FactRating").printSchema()

root
 |-- UserID: integer (nullable = true)
 |-- MovieID: integer (nullable = true)
 |-- Rating: integer (nullable = true)
 |-- Timestamp: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- user: integer (nullable = true)
 |-- utc_timestamp: timestamp (nullable = true)
 |-- utc_date: date (nullable = true)



In [23]:
# because we used mode("overwrite"), all of the data was overwritten
# but the old schema remained as a rudiment. Thus, all of the old columns are now NULL
display(spark.table("FactRating"))

UserID,MovieID,Rating,Timestamp,date,user,utc_timestamp,utc_date
,,,,,3180,2000-09-12 20:03:11,2000-09-12
,,,,,3180,2000-09-12 20:04:44,2000-09-12
,,,,,3180,2000-09-12 16:15:00,2000-09-12
,,,,,3180,2000-09-12 23:14:49,2000-09-12
,,,,,3180,2000-09-12 23:58:36,2000-09-12
,,,,,3180,2000-09-12 23:31:38,2000-09-12
,,,,,3180,2000-09-12 16:21:50,2000-09-12
,,,,,3180,2000-09-13 00:19:09,2000-09-13
,,,,,3180,2000-09-12 16:13:05,2000-09-12
,,,,,3180,2000-09-12 23:34:44,2000-09-12


In [24]:
# instead, if you need to completely overwrite the schema, use overwriteSchema
# note: append mode is not allowed

(spark.table("source_ratings")
 .select(F.col("userID").alias("user"), 
          F.col("timestamp")
          .cast("timestamp")
          .alias("utc_timestamp"), 
          F.col("timestamp")
          .cast("timestamp")
          .cast("date")
          .alias("utc_date")
         )
  .write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema", "true")
#  .partitionBy("utc_date") # column partition - most commonly on date, needs to be not too unique. Ideally something you use in filters a lot. Only makes sense if it would be ~ 1 GB per partition!
  .saveAsTable("FactRating")
 )

spark.table("FactRating").printSchema()

root
 |-- user: integer (nullable = true)
 |-- utc_timestamp: timestamp (nullable = true)
 |-- utc_date: date (nullable = true)



In [25]:
display(spark.table("FactRating"))

user,utc_timestamp,utc_date
1114,2000-11-22 18:06:39,2000-11-22
1114,2000-11-22 18:05:22,2000-11-22
1114,2000-11-22 17:53:52,2000-11-22
1114,2000-11-22 18:06:39,2000-11-22
1114,2000-11-22 17:55:12,2000-11-22
1114,2000-11-22 18:05:48,2000-11-22
1114,2000-11-22 17:49:22,2000-11-22
1114,2000-11-22 18:02:06,2000-11-22
1114,2000-11-22 18:05:48,2000-11-22
1114,2000-11-22 17:51:46,2000-11-22


Read more about Delta Lake optimizations:  
https://docs.delta.io/latest/optimizations-oss.html

### Structured streaming

Common input/output:
  * Kafka (and other distributed commit logs)
  * Files (Parquet, ORC, Avro, JSON, ...)
  * (Delta) tables

In [26]:
# let's start by looking at reading and writing streams to files
# we need a schema when reading streams

events_schema = "device STRING, ecommerce STRUCT<purchase_revenue_in_usd: DOUBLE, total_item_quantity: BIGINT, unique_items: BIGINT>, event_name STRING, event_previous_timestamp BIGINT, event_timestamp BIGINT, geo STRUCT<city: STRING, state: STRING>, items ARRAY<STRUCT<coupon: STRING, item_id: STRING, item_name: STRING, item_revenue_in_usd: DOUBLE, price_in_usd: DOUBLE, quantity: BIGINT>>, traffic_source STRING, user_first_touch_timestamp BIGINT, user_id STRING"

streaming_events_df = (spark.readStream
  .schema(events_schema)
  .option("maxFilesPerTrigger", 1) # used for example purposes, reads in 1 file per trigger
  .parquet("events_parquet")
)

AnalysisException: [PATH_NOT_FOUND] Path does not exist: events_parquet.

In [None]:
# you can check if a dataframe has streaming sources
# this means some functions are unavailable (eg count)

streaming_events_df.isStreaming


# displaying data
# in Databricks, we can use display(df) for debugging
# running locally, you can debug output to console (note, does not work when using Jupyter)
#streaming_events_df.writeStream.format("console").start()

In [None]:
# let's create a new dataframe and write it into a file

email_df = (streaming_events_df
            .filter(F.col("traffic_source") == "email")
            .withColumn("mobile", F.col("device").isin(["iOS", "Android"]))
            .select("user_id", "event_timestamp", "mobile")
           )

checkpoint_path = "streaming/email_traffic/checkpoint" 
output_path = "streaming/email_traffic/output"

devices_query = (email_df.writeStream
  .outputMode("append") # append = only new rows, complete = all rows written on every update, update = only updated rows (used in aggregations, otherwise same as append)
  .format("parquet")
  .queryName("email_traffic_query") # optional name
  .trigger(processingTime="10 second") # how often data is fetched from source
  .option("checkpointLocation", checkpoint_path) # used for fault-tolerance. Note: every query needs to have a unique check point location
  .start(output_path) # location where the file will be written
)

In [None]:
# monitor the query

#devices_query.id # unique per query, persisted when restarted from checkpoint 
#devices_query.name
#devices_query.status # isDataAvailable = new data available, isTriggerActive = trigger actively firing
#devices_query.awaitTermination(5) # used in non-Databricks usecases for keeping the thread (streaming query) alive. 
#devices_query.stop() # use to shut down the streaming query. Especially in Databricks, otherwise cluster will keep awake indefinitely

In [None]:
# We can also write it into a delta table

checkpoint_path = "streaming/email_delta/checkpoint" 
output_path = "spark-warehouse/email_streaming" # note that we set the output path to wherever the DWH catalog path is (in our case, ./spark-warehouse)

(email_df.writeStream
  .format("delta")
  .outputMode("append")
  .queryName("email_delta_query")
  .trigger(processingTime="30 second")
  .option("checkpointLocation", checkpoint_path)
  .start(output_path) 
)

In [None]:
# by default (outside of Databricks), a new table is not created in the Delta metastore. 
# use the following query to "register" the table:

spark.sql("CREATE TABLE email_streaming USING DELTA LOCATION 'email_streaming'")

In [None]:
# note that this is now an EXTERNAL (unmanaged) table

spark.sql("DESCRIBE EXTENDED email_streaming")

# the main difference is in how the data is handled:
# MANAGED --> DROP TABLE --> drops the "table" as well as the data in it
# EXTERNAL --> DROP TABLE --> drops the "table" (data remains in the path)

In [None]:
# view snapshots of table 

display(spark.table("email_streaming"))

In [None]:
# we can also check the count of this table increasing

spark.table("email_streaming").count()

In [None]:
# since it was a delta table, we can look at the history

display(spark.sql("DESCRIBE HISTORY email_streaming"))

In [None]:
# you can also create views on top of the data 
(spark
 .read
 .format("delta")
 .load("spark-warehouse/email_streaming")
 .createOrReplaceTempView("email_streaming_vw")
)

display(spark.table("email_streaming_vw"))

In [None]:
# use spark.streams.active to loop over all active streams
# remember to stop streams if not working on them anymore

for stream in spark.streams.active:
  stream.stop()

## Further reading

* Delta Lake MERGE INTO:
  * https://docs.databricks.com/spark/latest/spark-sql/language-manual/delta-merge-into.html
* Structured Streaming: 
  * https://spark.apache.org/docs/latest/api/python/reference/pyspark.ss/index.html
  * https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/streaming/DataStreamReader.html
  * http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
  * https://docs.databricks.com/spark/latest/structured-streaming/production.html

#### Tasks

##### Task 1
Load the `employees.csv` dataset into a delta table `DimEmployee` with mapping the following columns:
* employee_id
* employee_name
* department
* region
* employee_key
* active_record (drop this field)
* active_record_start (drop this field)
* active_record_end (drop this field)
* job_id (constant value `1` in initial load)
* valid_from (map this to the same value as active_record_start)
* valid_to (map this to the same value as active_record_end, if NULL then map it to 9999-12-31)

You have to upsert the data from `employees2024.csv`.
* Assume that `employee_id` is the unique value per employee.  
* The fields `employee_name`, `department` and `region` should be checked for changes.
* Update any data that becomes invalid by setting `valid_to` equal to yesterday's date (NB! do not hardcode yesterday's date)
  * Remember that only currently _valid_ data should be updated
  * Note: you may need an additional merge statement for which you can use this: https://docs.delta.io/latest/delta-update.html#modify-all-unmatched-rows-using-merge
* Insert any new data that should be inserted, with `valid_from` set to current date (NB! do not hardcode current date)
* For new data, you can hardcode the `valid_to` field to `9999-12-31`
* The updates and inserts should have `job_id = 2`
* Display the results from the table by selecting all fields `WHERE job_id = 2`

In [None]:
# your answer

##### Task 2
Create a schema and a streaming dataframe for the JSON files in the `gaming_data` dataset.
  
  
Use the following as basis for creating your schema:  
 |-- eventName: string (nullable = true)  
 |-- eventParams: struct (nullable = true)  
 |    |-- amount: double (nullable = true)  
 |    |-- app_name: string (nullable = true)  
 |    |-- app_version: string (nullable = true)  
 |    |-- client_event_time: string (nullable = true)  
 |    |-- device_id: string (nullable = true)  
 |    |-- game_keyword: string (nullable = true)  
 |    |-- platform: string (nullable = true)  
 |    |-- scoreAdjustment: long (nullable = true)  
  
Read in 2 files per trigger.
  
Create a new modified dataframe:
* keep only rows where eventName is "scoreAdjustment"
* select the *game_keyword*, *platform* and *scoreAdjustment* columns from the eventParams struct.  
* set trigger to run every 5 seconds.

Write the datastream to a delta table called score_adjustments.  
Check to make sure that the table has some data - this should also be visible in the cell results.  
Show that the number of rows in the table is increasing.  
Then stop the datastream.

In [None]:
# your answer