In [0]:
%sql
--TODO: Create new table "events_movie_watches" according to the requirements:
--Fomat - Delta
--Partitioned by - year, month and day
--Location - '/mnt/data/gold/events_movie_watches/'
CREATE TABLE IF NOT EXISTS events_movie_watches (
  movie_id string,
  event_id int,
  start_event_timestamp timestamp,
  finish_event_timestamp timestamp,
  is_finished int,
  title_type string,
  primary_title string,
  runtime_minutes int,
  primary_genre string,
  secondary_genre string,
  birth_year int,
  device_type string,
  subscription_type string,
  device_os string,
  year int,
  month int,
  day int
) USING DELTA
PARTITIONED BY (year, month, day)
LOCATION '/mnt/data/gold/events_movie_watches/'

In [0]:
# Define variables used in code below
file_path = "/mnt/data/silver/events"
table_name = "events"
checkpointPath = "/mnt/data/checkpoints/silver/events"

#drop table
spark.sql("DROP TABLE IF EXISTS events")

# Configure Auto Loader to ingest PARQUET data to a Delta table
(
  spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  .option("cloudFiles.schemaLocation", checkpointPath)
  .load(file_path)  
  .writeStream
  .option("checkpointLocation", checkpointPath)
  .trigger(availableNow=True)
  .toTable(table_name)
  .awaitTermination()
)

In [0]:
import re
from delta.tables import *
from pyspark.sql.functions import *
from pyspark.sql import Window

events_movie_watches = DeltaTable.forPath(spark, '/mnt/data/gold/events_movie_watches/')

events = spark.read.parquet('/mnt/data/silver/events/')
events.createOrReplaceTempView("events")

uc1_details = spark.sql("""
    select 
        usd.user_subscription_device_id as user_subscription_device_id,
        s.subscription_type as subscription_type,
        d.device_os as device_os,
        d.device_type as device_type,
        year(u.user_date_of_birth) as birth_year
    from YouFlix.youflix_user_subscription_device_delta usd
    left join YouFlix.youflix_subscription_delta s on usd.subscription_id = s.subscription_id
    left join YouFlix.youflix_device_delta d on usd.device_id = d.device_id
    left join YouFlix.youflix_user_delta u on usd.user_id = u.user_id
""")
uc1_details.createOrReplaceTempView("uc1_details")

uc2_details = spark.read.parquet('/mnt/data/silver/imdb/title_genres/')
uc2_details.createOrReplaceTempView("uc2_details")
uc2_details = spark.sql("""
    select
        title_id,
        collect_list(genre) as genres
    from uc2_details      
    group by title_id                  
""")
uc2_details.createOrReplaceTempView("uc2_details")

uc3_details = spark.read.parquet('/mnt/data/silver/imdb/titles')
uc3_details.createOrReplaceTempView("uc3_details")

events_enriched = spark.sql("""
    select 
        e.event_timestamp as event_timestamp,
        e.event_uid as event_uid, 
        e.event_type as event_type,
        e.event_id as event_id,
        e.user_subscription_device_id as user_subscription_device_id,
        e.movie_id as movie_id,
        e.EventProcessedUtcTime as EventProcessedUtcTime,
        e.PartitionId as PartitionId,
        e.EventEnqueuedUtcTime as EventEnqueuedUtcTime,
        e.year as year,
        e.month as month,
        e.day as day,
        u1.subscription_type as subscription_type,
        u1.device_os as device_os,
        u1.birth_year as birth_year,
        u1.device_type as device_type,
        u2.genres[0] as primary_genre,
        u2.genres[1] as secondary_genre,
        u3.primary_title as primary_title,
        u3.title_type as title_type,
        u3.runtime_minutes as runtime_minutes
    from events e
    left join uc1_details u1 on e.user_subscription_device_id = u1.user_subscription_device_id      
    left join uc2_details u2 on e.movie_id = u2.title_id
    left join uc3_details u3 on string(e.movie_id) = u3.title_id      
""")

In [0]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from delta.tables import DeltaTable

spark.sql("SET spark.sql.ansi.enabled = false")


finish_events = events_enriched.filter(events_enriched.event_type == "FINISH")
start_events = events_enriched.filter(events_enriched.event_type == "START")

finish_events.createOrReplaceTempView("finish_events")
start_events.createOrReplaceTempView("start_events")

valid_finish_events = spark.sql("""
    SELECT 
        s.*
    FROM finish_events f
    INNER JOIN start_events s ON f.event_id = s.event_id
""")

union_df = start_events.union(valid_finish_events).dropDuplicates(['event_id'])
union_df.createOrReplaceTempView("updates")

print(union_df.schema)

spark.sql("""
    MERGE INTO events_movie_watches
    USING updates
    ON events_movie_watches.event_id = updates.event_id
    WHEN MATCHED THEN
        UPDATE SET
            events_movie_watches.finish_event_timestamp = updates.EventProcessedUtcTime,
            events_movie_watches.is_finished = 1
    WHEN NOT MATCHED THEN
        INSERT (
            movie_id, event_id, start_event_timestamp, finish_event_timestamp, is_finished, 
            title_type, primary_title, runtime_minutes, primary_genre, secondary_genre, 
            birth_year, device_type, subscription_type, device_os, year, month, day
        ) VALUES (
            updates.movie_id, try_cast(updates.event_id as int), updates.EventProcessedUtcTime, NULL, 0, 
            updates.title_type, updates.primary_title, updates.runtime_minutes, updates.primary_genre, updates.secondary_genre, 
            updates.birth_year, updates.device_type, updates.subscription_type, updates.device_os, updates.year, updates.month, updates.day
        )
""")

events_movie_watches.toDF().write.format("delta").mode("overwrite").save("/mnt/data/gold/events_movie_watches/")

StructType([StructField('event_timestamp', StringType(), True), StructField('event_uid', StringType(), True), StructField('event_type', StringType(), True), StructField('event_id', StringType(), True), StructField('user_subscription_device_id', StringType(), True), StructField('movie_id', StringType(), True), StructField('EventProcessedUtcTime', TimestampType(), True), StructField('PartitionId', LongType(), True), StructField('EventEnqueuedUtcTime', TimestampType(), True), StructField('year', IntegerType(), True), StructField('month', IntegerType(), True), StructField('day', IntegerType(), True), StructField('subscription_type', StringType(), True), StructField('device_os', StringType(), True), StructField('birth_year', IntegerType(), True), StructField('device_type', StringType(), True), StructField('primary_genre', StringType(), True), StructField('secondary_genre', StringType(), True), StructField('primary_title', StringType(), True), StructField('title_type', StringType(), True), S

In [0]:
%sql
SELECT
  count(*)
FROM
  cloud_files_state("/mnt/data/checkpoints/silver/events");

count(1)
1550336


In [0]:
%sql
select
  count(*)
from
  events_movie_watches;

count(1)
886001
