In [0]:
configs = {"fs.azure.account.auth.type": "OAuth",
       "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
       "fs.azure.account.oauth2.client.id": "ec354f17-f7cc-47ae-857d-a01dd42a5c74",
       "fs.azure.account.oauth2.client.secret": "bB18Q~reHA9tA2wdHO8Q-BokJjiI2aDM5wPq9aW_",
       "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/3082adf5-ecf3-4980-9f87-ef8a9669de7e/oauth2/token",
       "fs.azure.createRemoteFileSystemDuringInitialization": "true"}

dbutils.fs.mount(

source = "abfss://bronze@adlsgroupproject.dfs.core.windows.net/",
mount_point = "/mnt/bronze4/",
extra_configs = configs)

In [0]:
if any([f.path == '/mnt/bronze2' for f in dbutils.fs.ls('/mnt')]):
    display(dbutils.fs.ls('/mnt/bronze2'))
else:
    print("Directory /mnt/bronze2 does not exist or is not mounted.")

In [0]:
# List all mount points
display(dbutils.fs.mounts())

In [0]:
# STEP2: LIST THE FILES
display(dbutils.fs.ls('/mnt/bronze4'))
bronze_df = spark.read.format("csv").option("header", "true")

In [0]:
dbutils.fs.mounts()

In [0]:
# STEP 3: CREATE THE DATAFRAMES
df_customers    = spark.read.parquet('/mnt/bronze4/customer.parquet')
df_products     = spark.read.parquet('/mnt/bronze4/products.parquet')
df_shop         = spark.read.parquet('/mnt/bronze4/stores.parquet')
df_transactions = spark.read.parquet('/mnt/bronze4/transactions.parquet')

display(df_customers)


In [0]:
display(dbutils.fs.ls("/mnt/bronze4/"))

In [0]:
# STEP 3: CREATE THE DATAFRAMES
df_customers    = spark.read.parquet('/mnt/bronze4/customer.parquet')
df_products     = spark.read.parquet('/mnt/bronze4/products.parquet')
df_stores       = spark.read.parquet('/mnt/bronze4/stores.parquet')
df_transactions = spark.read.parquet('/mnt/bronze4/transactions.parquet')


In [0]:
# STEP 4: CHECK THE DATAFRAME

display(df_transactions)

In [0]:
# STEP 5: START CLEANING

# Create silver layer - data cleaning

from pyspark.sql.functions import col

# Convert types and clean data
df_transactions = df_transactions.select(
    col("transaction_id").cast("int"),
    col("customer_id").cast("int"),
    col("product_id").cast("int"),
    col("store_id").cast("int"),
    col("quantity").cast("int"),
    col("transaction_date").cast("date")
)

df_products = df_products.select(
    col("product_id").cast("int"),
    col("product_name"),
    col("category"),
    col("price").cast("double")
)

df_shop = df_stores.select(
    col("store_id").cast("int"),
    col("store_name"),
    col("location")
)

df_customers = df_customers.select(
    "customer_id", "first_name", "last_name", "email", "city", "registration_date"
).dropDuplicates(["customer_id"])


In [0]:
# STEP 6: JOIN THE DATAFRAMES

# Join all data together

df_silver = df_transactions \
    .join(df_customers, "customer_id") \
    .join(df_products, "product_id") \
    .join(df_stores, "store_id") \
    .withColumn("total_amount", col("quantity") * col("price"))

In [0]:
# VALIDATE THE SILVER DATAFRAMES

display(df_silver)


In [0]:
dbutils.fs.mount(
  source = "abfss://silver@adlsgroupproject.dfs.core.windows.net/",
  mount_point = "/mnt/silver",
  extra_configs = {"fs.azure.account.auth.type": "OAuth",
                   "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
                   "fs.azure.account.oauth2.client.id": "ec354f17-f7cc-47ae-857d-a01dd42a5c74",
                   "fs.azure.account.oauth2.client.secret": "bB18Q~reHA9tA2wdHO8Q-BokJjiI2aDM5wPq9aW_",
                   "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/3082adf5-ecf3-4980-9f87-ef8a9669de7e/oauth2/token"}
)


In [0]:
# STEP 7: COPY TO THE SILVER CONTAINER

# Copy to adls locatioN
silver_path = "/mnt/silver/"

df_silver.write.mode("overwrite").format("delta").save(silver_path)

In [0]:
# CREATE A DELTA TABLE FOR THE SILVER DATASET

# Create silver dataset

spark.sql(f"""
CREATE TABLE store_silver_cleaned
USING DELTA
LOCATION '/mnt/silver/'
""")


In [0]:
%sql
--STEP 8: Use SQL to validate the data

-- MAGIC %sql 
select * from store_silver_cleaned

In [0]:
# STEP 9: CREATE A NEW DATAFRAME IN PREPARATION FOR THE GOLD LAYER


# Load cleaned transactions from Silver layer
silver_df = spark.read.format("delta").load("/mnt/silver/")

In [0]:
# VALIDATE THE NEW DATAFRAME

display(silver_df)

In [0]:
# CREATE THE GOLD DATAFRAME BY AGGREEGATING THE SILVER DATAFRAME ACCORDING TO THE REQUIREMENT

from pyspark.sql.functions import sum, countDistinct, avg

gold_df = silver_df.groupBy(
    "transaction_date",
    "product_id", "product_name", "category",
    "store_id", "store_name", "location"
).agg(
    sum("quantity").alias("total_quantity_sold"),
    sum("total_amount").alias("total_sales_amount"),
    countDistinct("transaction_id").alias("number_of_transactions"),
    avg("total_amount").alias("average_transaction_value")
)

In [0]:
# VALIDATE THE GOLD DATAFRAME

display(gold_df)


In [0]:
dbutils.fs.mount(
  source = "abfss://gold@adlsgroupproject.dfs.core.windows.net/",
  mount_point = "/mnt/gold",
  extra_configs = {"fs.azure.account.auth.type": "OAuth",
                   "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
                   "fs.azure.account.oauth2.client.id": "ec354f17-f7cc-47ae-857d-a01dd42a5c74",
                   "fs.azure.account.oauth2.client.secret": "bB18Q~reHA9tA2wdHO8Q-BokJjiI2aDM5wPq9aW_",
                   "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/3082adf5-ecf3-4980-9f87-ef8a9669de7e/oauth2/token"}
)


In [0]:
gold_path = "/mnt/gold/"

gold_df.write.mode("overwrite").format("delta").save(gold_path)

In [0]:
# CREATE A DATABRICKS TABLE USING SPARK SQL

spark.sql("""
CREATE TABLE store_gold_sales_summary
USING DELTA
LOCATION '/mnt/gold/' """)

In [0]:
%sql
--VALIDATE THE DATA USING SQL

--MAGIC 
 
 select * from store_gold_sales_summary