##Mounting of s3 bucketz
####url = "s3://<Access_key_ID>:<encoded_Secret_access_key>@test-autoloder-bucket/growth_data/"

In [0]:
Access_key_ID = "****************************"
Secret_access_key = "*************************"

import urllib
encoded_Secret_access_key = urllib.parse.quote(Secret_access_key, "")


path = 'test-autoloder-bucket/growth_data'
url = "s3a://{0}:{1}@{2}".format(Access_key_ID, encoded_Secret_access_key, path)

dbutils.fs.mount(url, '/mnt/growth_data')

## 2. Set variables for your path

In [0]:
landing_zone = "/mnt/growth_data"
incoming_data = landing_zone + "/incoming_data"
checkpoint_path = landing_zone + "/growth_checkpoint"
Customer_modelled_data = landing_zone + "/modelled_data/customer"
orders_modelled_data = landing_zone + "/modelled_data/orders

## 3. Define Schema and readthe files through autoloader
###### you can infer schema, column Types and can give shema hints
###### .option("cloudFiles.inferSchema", "true") 
###### .option("cloudFiles.schemaLocation", checkpoint_dir) [make sure you should have initial base files to inferschema]
###### .option("cloudFiles.inferColumnTypes", checkpoint_dir)
###### .option("cloudFiles.schemaHints", "col_1 int, col_4 string")

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType, DoubleType
from pyspark.sql.functions import input_file_name, current_timestamp

# Define the schema
schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("customer_fname", StringType(), True),
    StructField("customer_lname", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("pincode", IntegerType(), True),
    StructField("line_items", ArrayType(
        StructType([
            StructField("order_item_id", IntegerType(), True),
            StructField("order_item_product_id", IntegerType(), True),
            StructField("order_item_quantity", IntegerType(), True),
            StructField("order_item_product_price", DoubleType(), True),
            StructField("order_item_subtotal", DoubleType(), True)
        ])
    ), True)
])

# Load the data with the defined schema
read_df = spark.readStream.format("cloudFiles") \
    .option("cloudFiles.format", "json") \
    .schema(schema) \
    .load(incoming_data) \
    .withColumn("file_name", input_file_name()) \
    .withColumn("time_of_load", current_timestamp())


## 4. Basic transformations to refine your nested file format

In [0]:
from pyspark.sql.functions import explode

# explode your nested column
df = read_df.withColumn("new_column", explode("line_items")).drop("line_items")
df.createOrReplaceTempView("data")

# flatten your dataframe
new_1 = spark.sql(""" select
        order_id,
        customer_id,
        customer_fname, 
        customer_lname, 
        city,
        state,
        pincode,
        new_column.order_item_id as order_item_id,
        new_column.order_item_product_id as order_item_product_id,
        new_column.order_item_quantity as order_item_quantity,
        new_column.order_item_product_price as order_item_product_price,
        new_column.order_item_subtotal as order_item_subtotal,
        file_name,
        time_of_load
        from data
""")

new_1.createOrReplaceTempView("final_table")

##5. Lets go for DQL 

In [0]:
# 1. TOP CUSTOMERS WHO SPENT TOTAL AMOUNT 

total_amount_by_customer = spark.sql("""SELECT customer_id, customer_fname, customer_lname, round(SUM(order_item_subtotal), 4) AS TOTAL_AMOUNT_SPENT
FROM final_table
GROUP BY customer_id, customer_fname, customer_lname
ORDER BY TOTAL_AMOUNT_SPENT DESC 
LIMIT 10""")

display(total_amount_by_customer)

In [0]:
# 2. TOTAL ORDER_ITEMS PLACED BY EACH CUSTOMERS:
total_orders_per_customer = spark.sql("""
                                      SELECT customer_id, customer_fname, customer_lname,
                                      count(order_item_id) AS total_orders
                                      FROM final_table
                                      GROUP BY customer_id, customer_fname, customer_lname
                                      ORDER BY total_orders DESC
                                      """)
display(total_orders_per_customer)

## 6. write your streaming data into DELTA TABLE
###### Use outputMode wsiely based on business use case (append - for new records; update - for updating old records + new incoming records; complete - for rewriting old and old records |==> append doesn't work for aggregating dataframes )

In [0]:
Streaming_writes = new_1 \
    .writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", checkpoint_path) \
    .toTable("streaming_delta_table")
    
display(spark.sql("select * from streaming_delta_table"))

In [0]:
%sql
select count(*) from streaming_delta_table

## 7. Data modelling 
##### Once your streaming job done; a finalised model can be modelled in the form of relation modelling or dimentional modelling 

In [0]:
# Query to create Customer_table_1 DataFrame
Customes_details = spark.sql("""
    SELECT customer_id,
           customer_fname, 
           customer_lname, 
           city,
           state,
           pincode
    FROM streaming_delta_table
""")

# Write the DataFrame to Delta format
Customes_details.write.mode("overwrite") \
    .format("delta") \
    .option("header", "true") \
    .option("path", Customer_modelled_data) \
    .saveAsTable("customes_details")


In [0]:
# Query from streaming_delta_table
order_detail = spark.sql("""
    SELECT
        order_id,
        customer_id,
        order_item_id,
        order_item_product_id,
        order_item_quantity,
        order_item_product_price,
        order_item_subtotal
    FROM streaming_delta_table
""")

# Write the DataFrame as a Delta table
order_detail.write.mode("append") \
    .format("delta") \
    .option("mergeSchema", "true") \
    .option("header", "true") \
    .option("path", orders_modelled_data) \
    .saveAsTable("order_detail")

In [0]:
# Read the Delta table
customer_details_df = spark.read.format("delta").option("header", "true").load(Customer_modelled_data)

# Display the DataFrame
display(customer_details_df)

In [0]:
# Read the Delta table
order_detail_df = spark.read.format("delta").option("header", "true").load(orders_modelled_data)

# Display the DataFrame
display(order_detail)