In [0]:

%run ./01-config

In [0]:
class Upserter:
    def __init__(self, merge_query, temp_view):
        self.merge_query = merge_query
        self.temp_view = temp_view

    def upsert(self, df_micro_batch, batch_id):
        df_micro_batch.createOrReplaceTempView(self.temp_view)
        df_micro_batch.sparkSession.sql(self.merge_query)

In [0]:
from pyspark.sql.window import Window
from pyspark.sql import functions as F

class CDCUpserter:
    def __init__(self, merge_query, temp_view, id_col, sort_col):
        self.merge_query = merge_query
        self.temp_view = temp_view
        self.id_col = id_col
        self.sort_col = sort_col

    def upsert(self, df_micro_batch, batch_id):
        window = Window.partitionBy(self.id_col).orderBy(F.col(self.sort_col).desc())

        df_micro_batch.filter(F.col("row_status").isin(["insert", "update"])) \
            .withColumn("rank", F.rank().over(window)) \
            .filter("rank == 1") \
            .drop("rank") \
            .createOrReplaceTempView(self.temp_view)

        df_micro_batch.sparkSession.sql(self.merge_query)

In [0]:
class CDFUpserter:
    def __init__(self, merge_query, temp_view, sort_col, *id_cols):
        self.merge_query = merge_query
        self.temp_view = temp_view
        self.id_cols = id_cols
        self.sort_col = sort_col

    def upsert(self, df_micro_batch, batch_id):
        window = Window.partition_by(*self.id_cols).orderBy(F.col(self.sort_col).desc())

        df_micro_batch.filter(F.col("_change_type").isin(["insert", "update_postimage"])) \
            .withColumn("rank", F.rank().over(window)) \
            .filter("rank == 1") \
            .drop("rank", "_change_type", "_commit_version") \
            .withColumnRenamed("commit_timestamp", "processed_timestamp") \
            .createOrReplaceTempView(self.temp_view)

        df_micro_batch.sparkSession.sql(self.merge_query)

In [0]:
class Silver:
    def __init__(self, env):
        self.Conf = Config()
        self.checkpoint_dir = self.Conf.base_checkpoint_dir + "/checkpoints"
        self.landing_dir = self.Conf.base_data_dir + "/raw"
        self.catalog = env
        self.db_name = self.Conf.db_name
        self.maxFilesPerTrigger = self.Conf.maxFilesPerTrigger
        spark.sql(f"USE {self.catalog}.{self.db_name}")

    
    def upsert_customers(self, once=True, processing_time="10 seconds"):
        query = f"""
            MERGE INTO {self.catalog}.{self.db_name}.silver_customers c
            USING ranked_updates r
            ON c.customer_id=r.customer_id
            WHEN MATCHED AND c.row_time < r.row_time
              THEN UPDATE SET *
            WHEN NOT MATCHED
              THEN INSERT *        
        """

        data_upserter = CDCUpserter(query, "ranked_updates", "customer_id", "row_time")

        schema = """customer_id STRING, email STRING, first_name STRING, last_name STRING, gender STRING, street STRING, city STRING,
            country_code STRING, row_status STRING, row_time timestamp"""

        df_country_lookup = spark.read.json(self.landing_dir + "/country_lookup")

        df_stream = (spark.readStream
                          .table(f"{self.catalog}.{self.db_name}.bronze")
                          .filter("topic = 'customers'")
                          .select(F.from_json(F.col("value").cast("string"), schema).alias("c"))
                          .select("c.*")
                          .join(F.broadcast(df_country_lookup), F.col("country_code") == F.col("code"))
                    )

        stream_writer = (df_stream.writeStream
                                  .foreachBatch(data_upserter.upsert)
                                  .option("checkpointLocation", self.checkpoint_dir + "/silver_customers")                 
                        )
        
        if once:
            return stream_writer.trigger(availableNow=True).start()
        else:
            return stream_writer.trigger(processingTime=processing_time).start()
        
    
    def upsert_orders(self, once=True, processing_time="10 seconds"):
        query = f"""
            MERGE INTO {self.catalog}.{self.db_name}.silver_orders o
            USING orders_temp t
            ON o.order_id=t.order_id AND o.order_timestamp=t.order_timestamp
            WHEN NOT MATCHED THEN INSERT *        
        """

        data_upserter = Upserter(query, "orders_temp")

        schema = """order_id STRING, order_timestamp Timestamp, customer_id STRING, quantity BIGINT, total BIGINT,
            books ARRAY<STRUCT<book_id STRING, quantity BIGINT, subtotal BIGINT>>"""
        
        df_stream = (spark.readStream
                          .table(f"{self.catalog}.{self.db_name}.bronze")
                          .filter("topic='orders'")
                          .select(F.from_json(F.col("value").cast("string"), schema).alias("o"))
                          .select("o.*")
                          .dropDuplicates(["order_id", "order_timestamp"])
                    )

        stream_writer = (df_stream.writeStream
                                  .foreachBatch(data_upserter.upsert)
                                  .option("checkpointLocation", self.checkpoint_dir + "/silver_orders")                 
                        )

        if once:
            return stream_writer.trigger(availableNow=True).strart()
        else:
            return stream_writer.trigger(processingTime=processing_time).start()


    def upsert_books(self, once=True, processing_time="10 seconds"):
        query = f"""
            MERGE INTO {self.catalog}.{self.db_name}.silver_books
            USING (
                SELECT updates.book_id as merge_key, updates.*
                FROM updates
                UNION ALL
                SELECT NULL as merge_key, updates.*
                FROM updates
                JOIN silver_books ON updates.book_id = silver_books.book_id
                WHERE silver_books.current = true AND updates.price <> silver_books.price
                ) staged_updates
            ON silver_books.book_id = merge_key
            WHEN MATCHED AND silver_books.current = true AND silver_books.price <> staged_updates.price THEN
               UPDATE SET current = false, end_date = staged_updates.updated
            WHEN NOT MATCHED THEN
               INSERT (book_id, title, author, price, current, effective_date, end_date)
               VALUES (staged_updates.book_id, staged_updates.title, staged_updates.author, staged_updates.price, true, staged_updates.updated, NULL)
        """

        data_upserter = Upserter(query, "updates")

        schema =  schema = "book_id STRING, title STRING, author STRING, price DOUBLE, updated TIMESTAMP"

        df_stream = (spark.readStream
                          .table(f"{self.catalog}.{self.db_name}.silver_books")
                          .select(F.from_json(F.col("value").cast("string"), schema).alias("b"))
                          .select("b.*")
                    )
        
        stream_writer = (df_stream.writeStream
                                  .foreachBatch(data_upserter.upsert)
                                  .option("checkpointLocation", self.checkpoint_dir + "/silver_books")
                        )
        
        if once:
            stream_writer.trigger(availableNow=True).strart()
        else:
            stream_writer.trigger(processingTime=processing_time).start()

    
    def upsert_books_sales(self, once=True, processing_time="10 seconds"):
        df_stream = (spark.readStream
                          .table(f"{self.catalog}.{self.db_name}.silver_orders")
                          .withColumn("book", F.explode("books"))
                    )
        current_books = spark.read.table(f"{self.catalog}.{self.db_name}.current_books")

        stream_writer = (df_stream.join(current_books, df_stream.book.book_id ==current_books.book_id)
                                  .writeStream
                                  .option("checkpointLocation", self.checkpoint_dir + "/silver_book_sales")
                        )
        
        if once:
            stream_writer.trigger(availableNow=True).strart()
        else:
            stream_writer.trigger(processingTime=processing_time).start()


    def upsert_customers_orders(self, once=True, processing_time="10 seconds"):
        query = f"""
            MERGE INTO {self.catalog}.{self.db_name}.silver_customers_orders co
            USING updates u
            ON co.order_id=u.order_id AND co.customer_id=u.customer_id
            WHEN MATCHED AND co.processed_timestamp < u.processed_timestamp 
              THEN UPDATE SET *
            WHEN NOT MATCHED 
              THEN INSERT *        
        """

        data_upserter = CDFUpserter(query, "updates")

        schhema = """order_id STRING, order_timestamp Timestamp, customer_id STRING, quantity BIGINT, total BIGINT, books ARRAY<STRUCT<book_id STRING,
            quantity BIGINT, subtotal BIGINT>>, email STRING, first_name STRING, last_name STRING, gender STRING, street STRING, city STRING, country STRING,
            row_time TIMESTAMP, processed_timestamp TIMESTAMP"""

        orders_stream = spark.readStream.table(f"{self.catalog}.{self.db_name}.silver_orders")

        customers_stream = (spark.readStream
                                 .option("readChangeData", True)
                                 .option("startingVersion", 2)
                                 .table(f"{self.catalog}.{self.db_name}.silver_customers")
                    )

        stream_writer = (orders_stream.join(customers_stream, on="customer_id")
                                      .writeStream
                                          .foreachBatch(data_upserter.upsert)
                                          .option("checkpointLocation", self.checkpoint_dir + "/silver_customers_orders")
                        )
        
        if once:
            stream_writer.trigger(availableNow=True).start()
        else:
            stream_writer.trigger(processingTime=processing_time).start()


In [0]:
SL = Silver('dev')

In [0]:
SL.upsert_customers(once=True)

In [0]:
df = spark.read.table('dev.bookstore_db.bronze')
display(df)

In [0]:
 df_country_lookup = spark.read.json("s3://bookstore-storage-001/raw/country_lookup")

schema = """customer_id STRING, email STRING, first_name STRING, last_name STRING, gender STRING, street STRING, city STRING,
            country_code STRING, row_status STRING, row_time timestamp"""

df_stream = (spark.read
                          .table(f"dev.bookstore_db.bronze")
                          .filter("topic = 'customers'")
                          .select(F.from_json(F.col("value").cast("string"), schema).alias("c"))
                          .select("c.*")
                          #.join(F.broadcast(df_country_lookup), F.col("country_code") == F.col("code"))
                          )

In [0]:
display(df_stream)