### Configuration: ADF to Databricks using SAS Token  

- Created a container in **ADLS** for project data storage.  
- Configured **SAS Token** authentication to connect Databricks with ADLS.  
- Mounted the container in Databricks using SAS Token for secure access.  
- Verified the connection by listing files and checking read/write operations.  


In [0]:
spark.conf.set(
  "fs.azure.account.auth.type.stassignment.dfs.core.windows.net",
  "SAS"
)

spark.conf.set(
  "fs.azure.sas.token.provider.type.stassignment.dfs.core.windows.net",
  "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider"
)

spark.conf.set(
  "fs.azure.sas.fixed.token.stassignment.dfs.core.windows.net",
  "sv=2024-11-04&ss=bfqt&srt=sco&sp=rwdlacupiytfx&se=2025-09-16T22:03:31Z&st=2025-08-27T13:48:31Z&spr=https&sig=jR%2FHBfVXjYTnv5aPFEkF5Pa9K4GxXewiK5NhxacxNHQ%3D"
)


In [0]:
# %run /Workspace/adf_assignment/src/bronze_to_silver/utils

### Gold Layer  

- Joined **Customer, Product, Store, Sales** tables from Silver.  
- Created fact & dimension style tables for analytics.  
- Applied aggregations (sales per customer, store, product, etc.).  
- Standardized date format (`yyyy-MM-dd`).  
- Written to: `gold/sales_view/{tablename}/{delta parquet}`  


In [0]:
from pyspark.sql.functions import col

silver_base_path = "abfss://silver@stassignment.dfs.core.windows.net/"
gold_output_path = "abfss://gold@stassignment.dfs.core.windows.net/StoreProductSalesAnalysis"

sales_df = read_delta_with_snake_case(spark, f"{silver_base_path}/customer_sales")
product_df = read_delta_with_snake_case(spark, f"{silver_base_path}/product")
store_df = read_delta_with_snake_case(spark, f"{silver_base_path}/store")

sales_df = sales_df.withColumnRenamed("product__id", "product_id")

store_product_df = get_store_product_data(product_df, store_df)

result_df = enrich_sales_with_store_product(sales_df, store_product_df)

duplicate_cols = [col_name for col_name in result_df.columns if result_df.columns.count(col_name) > 1]
if duplicate_cols:
    print(f"Duplicate Columns Detected: {duplicate_cols}")

result_df = result_df.drop(*set(duplicate_cols[1:]))

selected_cols = [
    "order_date", "category", "city", "customer_id", "order_id", "product_id", "profit", "region", "sales", "segment",
    "ship_date", "ship_mode", "latitude", "longitude",
    "store_name", "location", "manager_name", "product_name", "price", "stock_quantity", "image_url"
]

selected_cols = [col for col in selected_cols if col in result_df.columns]

result_df = result_df.select(*selected_cols)
result_df.display()

result_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(gold_output_path)

spark.sql("DROP TABLE IF EXISTS StoreProductSalesAnalysis")
spark.sql(f"""
    CREATE TABLE StoreProductSalesAnalysis
    USING DELTA
    LOCATION '{gold_output_path}'
""")
