In [0]:
# Databricks notebook source
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# COMMAND ----------

def path_exists(path):
  try:
    dbutils.fs.ls(path)
    return True
  except Exception as e:
    if 'java.io.FileNotFoundException' in str(e):
      return False
    else:
      raise

# COMMAND ----------

class CourseDataset:
    def __init__(self, uri, location, checkpoint_path, data_catalog, db_name):
        self.uri = uri
        self.location = location
        self.checkpoint = checkpoint_path
        self.catalog_name = data_catalog
        self.db_name = db_name
    
    def download_dataset(self):
        source = self.uri
        target = self.location
        files = dbutils.fs.ls(source)

        for f in files:
            source_path = f"{source}/{f.name}"
            target_path = f"{target}/{f.name}"
            if not path_exists(target_path):
                print(f"Copying {f.name} ...")
                dbutils.fs.cp(source_path, target_path, True)
    
    
    def create_database(self):
        spark.sql(f"USE CATALOG {self.catalog_name}")
        spark.sql(f"CREATE SCHEMA IF NOT EXISTS {self.db_name}")
        spark.sql(f"USE SCHEMA {self.db_name}")
    
    
    def clean_up(self):
        print("Removing Checkpoints ...")
        dbutils.fs.rm(self.checkpoint, True)
        print("Dropping Database ...")
        spark.sql(f"DROP SCHEMA IF EXISTS {self.db_name} CASCADE")
        print("Removing Dataset ...")
        dbutils.fs.rm(self.location, True)
        print("Done")

    
    def __get_index(self, dir):
        try:
            files = dbutils.fs.ls(dir)
            file = max(f.name for f in files if f.name.endswith('.json'))
            index = int(file.rsplit('.', maxsplit=1)[0])
        except:
            index = 0
        return index+1
    
    
    def __load_json_file(self, current_index, streaming_dir, raw_dir):
        latest_file = f"{str(current_index).zfill(2)}.json"
        source = f"{streaming_dir}/{latest_file}"
        target = f"{raw_dir}/{latest_file}"
        prefix = streaming_dir.split("/")[-1]
        if path_exists(source):
            print(f"Loading {prefix}-{latest_file} file to the bookstore dataset")
            dbutils.fs.cp(source, target)
    
    
    def __load_data(self, max, streaming_dir, raw_dir, all=False):
        index = self.__get_index(raw_dir)
        if index > max:
            print("No more data to load\n")

        elif all == True:
            while index <= max:
                self.__load_json_file(index, streaming_dir, raw_dir)
                index += 1
        else:
            self.__load_json_file(index, streaming_dir, raw_dir)
            index += 1
    
    def load_new_data(self, num_files = 1):
        streaming_dir = f"{self.location}/kafka-streaming"
        raw_dir = f"{self.location}/kafka-raw"
        for i in range(num_files):
            self.__load_data(10, streaming_dir, raw_dir)
        
    
    def load_books_updates(self):
        streaming_dir = f"{self.location}/books-updates-streaming"
        raw_dir = f"{self.location}/kafka-raw/books-updates"
        self.__load_data(5, streaming_dir, raw_dir)
        
    def process_bronze(self):
        schema = "key BINARY, value BINARY, topic STRING, partition LONG, offset LONG, timestamp LONG"

        query = (spark.readStream
                            .format("cloudFiles")
                            .option("cloudFiles.format", "json")
                            .schema(schema)
                            .load(f"{self.location}/kafka-raw")
                            .withColumn("timestamp", (F.col("timestamp")/1000).cast("timestamp"))  
                            .withColumn("year_month", F.date_format("timestamp", "yyyy-MM"))
                      .writeStream
                          .option("checkpointLocation", f"{self.checkpoint}/bronze")
                          .option("mergeSchema", True)
                          .partitionBy("topic", "year_month")
                          .trigger(availableNow=True)
                          .table("bronze"))

        query.awaitTermination()
        
        
    def __upsert_data(self, microBatchDF, batch):
        microBatchDF.createOrReplaceTempView("orders_microbatch")
    
        sql_query = """
          MERGE INTO orders_silver a
          USING orders_microbatch b
          ON a.order_id=b.order_id AND a.order_timestamp=b.order_timestamp
          WHEN NOT MATCHED THEN INSERT *
        """

        microBatchDF.sparkSession.sql(sql_query)
        
    def __batch_upsert(self, microBatchDF, batchId):
        window = Window.partitionBy("customer_id").orderBy(F.col("row_time").desc())
        
        (microBatchDF.filter(F.col("row_status").isin(["insert", "update"]))
                     .withColumn("rank", F.rank().over(window))
                     .filter("rank == 1")
                     .drop("rank")
                     .createOrReplaceTempView("ranked_updates"))

        query = """
            MERGE INTO customers_silver c
            USING ranked_updates r
            ON c.customer_id=r.customer_id
                WHEN MATCHED AND c.row_time < r.row_time
                  THEN UPDATE SET *
                WHEN NOT MATCHED
                  THEN INSERT *
        """

        microBatchDF.sparkSession.sql(query)
        
    
    def __type2_upsert(self, microBatchDF, batch):
        microBatchDF.createOrReplaceTempView("updates")

        sql_query = """
            MERGE INTO books_silver
            USING (
                SELECT updates.book_id as merge_key, updates.*
                FROM updates

                UNION ALL

                SELECT NULL as merge_key, updates.*
                FROM updates
                JOIN books_silver ON updates.book_id = books_silver.book_id
                WHERE books_silver.current = true AND updates.price <> books_silver.price
              ) staged_updates
            ON books_silver.book_id = merge_key 
            WHEN MATCHED AND books_silver.current = true AND books_silver.price <> staged_updates.price THEN
              UPDATE SET current = false, end_date = staged_updates.updated
            WHEN NOT MATCHED THEN
              INSERT (book_id, title, author, price, current, effective_date, end_date)
              VALUES (staged_updates.book_id, staged_updates.title, staged_updates.author, staged_updates.price, true, staged_updates.updated, NULL)
        """

        microBatchDF.sparkSession.sql(sql_query)
    
    def process_orders_silver(self):
        json_schema = "order_id STRING, order_timestamp Timestamp, customer_id STRING, quantity BIGINT, total BIGINT, books ARRAY<STRUCT<book_id STRING, quantity BIGINT, subtotal BIGINT>>"
        
        deduped_df = (spark.readStream
                   .table("bronze")
                   .filter("topic = 'orders'")
                   .select(F.from_json(F.col("value").cast("string"), json_schema).alias("v"))
                   .select("v.*")
                   .withWatermark("order_timestamp", "30 seconds")
                   .dropDuplicates(["order_id", "order_timestamp"]))
        
        query = (deduped_df.writeStream
                   .foreachBatch(self.__upsert_data)
                   .outputMode("update")
                   .option("checkpointLocation", f"{self.checkpoint}/orders_silver")
                   .trigger(availableNow=True)
                   .start())

        query.awaitTermination()

        
    def process_customers_silver(self):
        
        schema = "customer_id STRING, email STRING, first_name STRING, last_name STRING, gender STRING, street STRING, city STRING, country_code STRING, row_status STRING, row_time timestamp"
        
        df_country_lookup = spark.read.json(f"{dataset_bookstore}/country_lookup")

        query = (spark.readStream
                          .table("bronze")
                          .filter("topic = 'customers'")
                          .select(F.from_json(F.col("value").cast("string"), schema).alias("v"))
                          .select("v.*")
                          .join(F.broadcast(df_country_lookup), F.col("country_code") == F.col("code") , "inner")
                       .writeStream
                          .foreachBatch(self.__batch_upsert)
                          .outputMode("update")
                          .option("checkpointLocation", f"{self.checkpoint}/customers_silver")
                          .trigger(availableNow=True)
                          .start()
                )

        query.awaitTermination()
    
    def process_books_silver(self):
        schema = "book_id STRING, title STRING, author STRING, price DOUBLE, updated TIMESTAMP"

        query = (spark.readStream
                        .table("bronze")
                        .filter("topic = 'books'")
                        .select(F.from_json(F.col("value").cast("string"), schema).alias("v"))
                        .select("v.*")
                     .writeStream
                        .foreachBatch(self.__type2_upsert)
                        .option("checkpointLocation", f"{self.checkpoint}/books_silver")
                        .trigger(availableNow=True)
                        .start()
                )

        query.awaitTermination()
        
    def process_current_books(self):
        spark.sql("""
            CREATE OR REPLACE TABLE current_books
            AS SELECT book_id, title, author, price
               FROM books_silver
               WHERE current IS TRUE
        """)

# COMMAND ----------

data_source_uri = "s3://dalhussein-courses/DE-Pro/datasets/bookstore/v1/"
dataset_bookstore = 'dbfs:/mnt/demo-datasets/DE-Pro/bookstore'
spark.conf.set(f"dataset.bookstore", dataset_bookstore)
checkpoint_path = "dbfs:/mnt/demo_pro/checkpoints"
data_catalog = 'hive_metastore'
db_name = "bookstore_eng_pro"

bookstore = CourseDataset(data_source_uri, dataset_bookstore, checkpoint_path, data_catalog, db_name)
bookstore.download_dataset()
bookstore.create_database()

# COMMAND ----------

#bookstore.clean_up()

# COMMAND ----------

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

In [0]:
%fs ls dbfs:/mnt/demo-datasets/DE-Pro/bookstore/kafka-raw/

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, unix_timestamp, from_unixtime, current_date, date_format, lit, col, date_format, to_date
from pyspark.sql.types import StringType, StructType, StructField, IntegerType, LongType, DoubleType
from pyspark.sql.window import Window

In [0]:
df_raw = spark.read.json("dbfs:/mnt/demo-datasets/DE-Pro/bookstore/kafka-raw/01.json")
display(df_raw)
# value column is the actual data sent as json format

In [0]:
df_raw.printSchema()

In [0]:
new_df = df_raw.withColumn('current_timestamp', current_timestamp()) \
        .withColumn('current_date', current_date())

In [0]:
display(new_df)

In [0]:
new_df = new_df.withColumn("current_date", date_format(col("current_date"), "yyyy.MM.dd"))
new_df.show()

In [0]:
new_df = new_df.withColumn("current_timestamp_float", unix_timestamp(col("current_timestamp")))
new_df.show()

In [0]:
new_df.withColumn("current_timestamp_float", unix_timestamp(col("current_timestamp"))).show()

In [0]:
display(new_df.withColumn("new_date_col",from_unixtime(col("current_timestamp_float"), "yyyy-MM-dd")))

In [0]:
new_df.printSchema()

In [0]:
new_df = new_df.withColumn("current_date", to_date(col("current_date"), 'yyyy.MM.dd'))
new_df.printSchema()

In [0]:
display(new_df.withColumn("timestamp_new", (col("timestamp")/1000).cast("timestamp")))

In [0]:
# from pyspark.sql.functions import datediff, months_between, add_months, date_add, year, month
from pyspark.sql.functions import *
# df.withColumn('datediff', datediff(col1, col2))
# df.withColumn('monthsbetween', months_between(col1, col2))
# df.withColumn('addMonths', add_months(col1, 3))
# df.withColumn('subMonths', add_months(col2, -5))
# df.withColumn('dateAdd', date_add(col1, 10))
# # df.withColumn('subDays', date_add(col, -9))
# df.withColumn("year", year(col1))
# df.withColumn("month", month(col1))
# df.withColumn("dayofmonth", dayofmonth(col2))

In [0]:
%sh 
ls /dbfs/mnt/demo-datasets/DE-Pro-new/bookstore/kafka-raw/

In [0]:
%sql
show tables;
-- drop table bronze_table;

In [0]:
def process_bronze():
    schema = "key BINARY, value BINARY, topic STRING, partition LONG, offset LONG, timestamp LONG"

    query = (spark.readStream
                    .format("cloudFiles")
                    .option("cloudFiles.format", "json")
                    .schema(schema)
                    .load("dbfs:/mnt/demo-datasets/DE-Pro-new/bookstore/kafka-raw")
                    .withColumn("timestamp", (col("timestamp")/1000).cast("timestamp"))
                    .withColumn("year_month", date_format("timestamp", "yyyy-MM"))
                .writeStream
                    .option("checkpointLocation", "dbfs:/mnt/demo_pro/checkpoints/bronze")
                    .option("mergeSchema","True")
                    .partitionBy("topic","year_month")
                    .trigger(availableNow=True)
                    .table("bronze_table")
                    )
    
    query.awaitTermination()

In [0]:
%sh
# rm -rf /dbfs/mnt/demo_pro/checkpoints/bronze
ls /dbfs/mnt/demo_pro/checkpoints/

In [0]:
process_bronze()

In [0]:
%sql
-- drop table bronze_table;

In [0]:
%sql
show tables;

In [0]:
%sql
select * from bronze_table;

In [0]:
%sql
select distinct(topic) from bronze_table;

In [0]:
bronze_df = spark.table("bronze_table")
display(bronze_df)

In [0]:
%sql
select cast(key as string), cast(value as string)
from bronze_table
where topic = "orders"
limit 20;

In [0]:
%sql
select v.*
from (
  select from_json(cast(value as string), "order_id string, order_timestamp Timestamp, customer_id string, quantity bigint, total bigint, 
        books ARRAY<STRUCT<book_id STRING, quantity BIGINT, subtotal BIGINT>>") as v
        from bronze_new_partition
        where topic = "orders")

In [0]:
%sql
select v.*
from (
  select from_json(cast(value as string), schema_of_json('{"order_id":"000000003491","order_timestamp":"2021-11-12 09:10:00","customer_id":"C00002","quantity":3,"total":99,"books":[{"book_id":"B02","quantity":1,"subtotal":28},{"book_id":"B06","quantity":1,"subtotal":22},{"book_id":"B01","quantity":1,"subtotal":49}]}')) v
        from bronze_new_partition
        where topic = "orders")

In [0]:
%sql
create or replace temp view bronze_view as 
select vals.* from (
select from_json(cast(value as STRING), "customer_id string, order_id string, books ARRAY<STRUCT<book_id string, quantity bigint, subtotal bigint>>, quantity bigint, total bigint, 
  order_timestamp timestamp") vals
  from bronze_new_partition
  where topic = "orders" )

In [0]:
%sql
select * from bronze_view;

In [0]:
%sql
drop view bronze_view;

In [0]:
(spark.readStream
      .table("bronze_new_partition")
      .createOrReplaceTempView("bronze_view"))

In [0]:
%sql
select * from bronze_view;

In [0]:
%sql
create or replace temporary view silver_tmp_view as 
select vals.* from (
  select from_json(cast(value as STRING), "customer_id string, order_id string, books ARRAY<STRUCT<book_id string, quantity bigint, subtotal bigint>>, quantity bigint, total bigint, order_timestamp timestamp")as vals
  from bronze_view
  where topic = 'orders'
); 

-- select from_json(cast(value as STRING), "customer_id string, order_id string, books ARRAY<STRUCT<book_id string, quantity bigint, subtotal bigint>>, quantity bigint, total bigint, 
--   order_timestamp timestamp") vals

In [0]:
%sql
select * from silver_tmp_view;

In [0]:
query = spark.table("silver_tmp_view") \
        .writeStream \
        .option("checkpointlocation","/dbfs:/mnt/demo_pro_new/checkpoints/silver") \
        .trigger(availableNow=True) \
        .outputMode("append") \
        .table("orders_silver")
  
query.awaitTermination()

In [0]:
from pyspark.sql.functions import *
from pyspark.sql import *
from pyspark.sql.types import *

In [0]:

json_schema = "customer_id string, order_id string, books ARRAY<STRUCT<book_id string, quantity bigint, subtotal bigint>>, quantity bigint, total bigint, order_timestamp timestamp"

# from_json() -> it will parse the JSON string and create a row with the parsed data

query = (spark.readStream
                .table("bronze_table")
                .filter("topic = 'orders'")
                .select(from_json(col("value").cast("string"), json_schema).alias("v"))
                .select("v.*")
            .writeStream
                .option("checkpointLocation", "dbfs:/mnt/demo_pro/checkpoint/silver/")
                .trigger(availableNow=True)
                .table("orders_ultimatum"))
query.awaitTermination()


# query.awaitTermination()

In [0]:
%sql
select * from orders_ultimatum;

In [0]:
%sql
select b.customer_id, b.order_id, b.order_quantity, b.order_total, b.exploded_books.book_id as book_id, b.exploded_books.quantity as book_quantity, b.exploded_books.subtotal as books_subtotal
from ( 
select customer_id, order_id, explode(books) as exploded_books, quantity as order_quantity, total as order_total, order_timestamp from orders_ultimatum) b
order by b.order_timestamp;

In [0]:
%sql
alter table orders_ultimatum add constraint timestamp_within_range check(order_timestamp >= '2020-01-01')

In [0]:
%sql
describe extended orders_ultimatum

In [0]:
%sql
alter table orders_ultimatum add constraint check_quantity check(quantity > 0)

In [0]:
json_schema = "customer_id string, order_id string, books ARRAY<STRUCT<book_id string, quantity bigint, subtotal bigint>>, quantity bigint, total bigint, order_timestamp timestamp"

# from_json() -> it will parse the JSON string and create a row with the parsed data

query = (spark.readStream
                .table("bronze_table")
                .filter("topic = 'orders'")
                .select(from_json(col("value").cast("string"), json_schema).alias("v"))
                .select("v.*")
                .where("v.quantity > 0")
            .writeStream
                .option("checkpointLocation", "dbfs:/mnt/demo_pro/checkpoint/silver-updated")
                .trigger(availableNow=True)
                .table("orders_silver"))
query.awaitTermination()

In [0]:
%sh
# rm -rf /dbfs/mnt/demo_pro/checkpoint/remove-duplicates

In [0]:
%sql
-- drop table orders_silver;
select * from orders_silver;

In [0]:
%sql
alter table orders_ultimatum drop constraint timestamp_within_range;

In [0]:
%sql
describe extended orders_ultimatum

In [0]:
spark.read \
    .table("bronze_table") \
    .filter("topic = 'orders'") \
    .count()

In [0]:
json_schema = "customer_id string, order_id string, books ARRAY<STRUCT<book_id string, quantity bigint, subtotal bigint>>, quantity bigint, total bigint, order_timestamp timestamp"

batch_total = (spark.read
                    .table("bronze_table")
                    .filter("topic = 'orders'")
                    .select(from_json(col("value").cast("string"), json_schema).alias("v"))
                    .select("v.*")
                    .where("quantity > 0")
                    .dropDuplicates(["order_id","order_timestamp"])
                    .count()
)

In [0]:
batch_total

In [0]:
%sql
truncate table orders_silver;

In [0]:
json_schema = "customer_id string, order_id string, books ARRAY<STRUCT<book_id string, quantity bigint, subtotal bigint>>, quantity bigint, total bigint, order_timestamp timestamp"

deduped_df = spark.readStream \
                .table("bronze_table") \
                .filter("topic = 'orders'") \
                .select(from_json(col("value").cast("string"), json_schema).alias("v")) \
                .select("v.*")  \
                .where("quantity > 0") \
                .withWatermark("order_timestamp", "30 seconds") \
                .dropDuplicates(["order_id", "order_timestamp"])

In [0]:
# display(deduped_df)
# It will the streaming ingestion

In [0]:
def upsert_data(microBatchDF, batch):
    microBatchDF.createOrReplaceTempView("orders_microbatch")

    sql_query = """
        MERGE INTO orders_silver a
        using orders_microbatch b
        on a.order_id = b.order_id and a.order_timestamp = b.order_timestamp
        when not matched then insert *
    """
    # spark.sql(sql_query)
    microBatchDF.sparkSession.sql(sql_query)

In [0]:
query = (deduped_df.writeStream 
                    .foreachBatch(upsert_data)
                    .option("checkpointLocation","dbfs:/mnt/demo_pro/checkpoint/")
                    .trigger(availableNow = True)
                    .start())
query.awaitTermination()

In [0]:
%sql
select count(*) from orders_silver;

In [0]:
streaming_total = spark.read.table("orders_silver").count()
# batch_total = spark.read.table("bronze_table").count()
print(f"batch total: {batch_total}")
print(f"Streaming total: {streaming_total}")

In [0]:
%sql
create table if not exists books_silver
(book_id string, title string, author string, price double, current boolean, effective_date timestamp, end_date timestamp)

In [0]:
def type1_upsert(microBatchDF, batch):
  microBatchDF.createOrReplaceTempView("updates")

  sql_query = """
  merge into books_silver
  using (
    select updates.book_id as merge_key, updates.*
    from updates

    union all

    select updates.book_id as NULL, updates.*
    from books_silver
    join books_silver on updates.book_id = books_silver.book_id
    where books_silver.current = true and updates.price <> books_silver.price
  ) staged_updates
  on books_silver.book_id = merge_key
  when matched and books_silver.current = true and books_silver.price <> staged_updates.price then
    update set current = false, end_date = staged_updates.updated
  when not matched then
    insert (book_id, title, author, price, current, effective_date, end_date)
    values(staged_updates.book_id, staged_updates.title, staged_updates.author, staged_updates.price, true, staged_updates.updated, NULL)
  """

  microBatchDF.sparkSession.sql(sql_query)

In [0]:
json_schema = "customer_id string, order_id string, books ARRAY<STRUCT<book_id string, quantity bigint, subtotal bigint>>, quantity bigint, total bigint, order_timestamp timestamp"

querydf = spark.readStream \
                .table("bronze_table") \
                .filter("topic = 'books'") \
                .select(from_json(col("value").cast("string"), json_schema).alias("v")) \
                .select("v.*")  \
              .writeStream \
                .foreachBatch(type1_upsert) \
                .option("checkpointLocation", "dbfs:/mnt/demo_pro/checkpoints/books_silver") \
                .trigger(availableNow=True) \
                .start()
  
query.awaitTermination()