
## To know what are the directories present in the retail container

In [0]:
%fs ls abfss://retail@netflixprojectdlkart.dfs.core.windows.net


## To Know what are the files present in particular directory.

In [0]:
%fs ls abfss://retail@netflixprojectdlkart.dfs.core.windows.net/bronze

# Reading the customer file

In [0]:
df_customer=spark.read.format("parquet")\
        .option("header", True)\
        .option("inferSchema", True)\
        .load("abfss://retail@netflixprojectdlkart.dfs.core.windows.net/bronze/customer")
df.display()            

# Reading the product file

In [0]:
    df_product=spark.read.format("parquet")\
        .option("header", True)\
        .option("inferSchema", True)\
        .load("abfss://retail@netflixprojectdlkart.dfs.core.windows.net/bronze/product")
df.display() 


# Reading the store file

In [0]:
 df_store=spark.read.format("parquet")\
        .option("header", True)\
        .option("inferSchema", True)\
        .load("abfss://retail@netflixprojectdlkart.dfs.core.windows.net/bronze/store")
df.display() 

# Reading the transaction file

In [0]:
 df_transaction=spark.read.format("parquet")\
        .option("header", True)\
        .option("inferSchema", True)\
        .load("abfss://retail@netflixprojectdlkart.dfs.core.windows.net/bronze/transaction")
df.display() 

===================================================================================

### # Silver Layer

In [0]:
# DBTITLE 1,create silver layer - data cleaning
from pyspark.sql.functions import *
from pyspark.sql.types import *



In [0]:
# Convert types and clean data
df_transaction = df_transaction.select(
    col("transaction_id").cast("int"),
    col("customer_id").cast("int"),
    col("product_id").cast("int"),
    col("store_id").cast("int"),
    col("quantity").cast("int"),
    col("transaction_date").cast("date")
)

In [0]:
df_product = df_product.select(
    col("product_id").cast("int"),
    col("product_name"),
    col("category"),
    col("price").cast("double")
)

In [0]:
df_store = df_store.select(
    col("store_id").cast("int"),
    col("store_name"),
    col("location")
)

In [0]:
df_customer = df_customer.select(
    "customer_id", "first_name", "last_name", "email", "city", "registration_date"
).dropDuplicates(["customer_id"])

### Performing join transformation

In [0]:
df_silver = df_transaction \
    .join(df_customer, "customer_id") \
    .join(df_product, "product_id") \
    .join(df_store, "store_id") \
    .withColumn("total_amount", col("quantity") * col("price"))

## Writing the data in delta format and stores in silver directory.

In [0]:
df_silver.write.mode("overwrite").format("delta").save("abfss://retail@netflixprojectdlkart.dfs.core.windows.net/silver/cleaned_transaction")

In [0]:
display(df_silver)


### creating managed table 

In [0]:
spark.sql("""
CREATE TABLE my_catalog.silver.silver_trans_data
USING DELTA
LOCATION 'abfss://retail@netflixprojectdlkart.dfs.core.windows.net/silver/cleaned_transaction'
""")

In [0]:
%sql

select * from my_catalog.silver.silver_trans_data

In [0]:
silver_df = spark.read.format("delta").load("abfss://retail@netflixprojectdlkart.dfs.core.windows.net/silver/cleaned_transaction")


In [0]:
display(silver_df)

=========================================================================================

### Gold Layer

In [0]:
gold_df = silver_df.groupBy(
    "transaction_date",
    "product_id", "product_name", "category",
    "store_id", "store_name", "location"
).agg(
    sum("quantity").alias("total_quantity_sold"),
    sum("total_amount").alias("total_sales_amount"),
    countDistinct("transaction_id").alias("number_of_transactions"),
    avg("total_amount").alias("average_transaction_value")
)

In [0]:
display(gold_df)



### Writing back into the gold layer

In [0]:
gold_df.write.mode("overwrite").format("delta").save("abfss://retail@netflixprojectdlkart.dfs.core.windows.net/gold/curated_data")

In [0]:
spark.sql("""
CREATE TABLE my_catalog.gold.curated_data
USING DELTA
LOCATION 'abfss://retail@netflixprojectdlkart.dfs.core.windows.net/gold/curated_data'
""")

In [0]:
%sql

select * from my_catalog.gold.curated_data