In [None]:
storage_account_name = "<your storage_account_name>"
container_name = "<your 2nd container name>"
sas_token = "<your SAS token>"

spark.conf.set(f"fs.azure.account.auth.type.{storage_account_name}.dfs.core.windows.net", "SAS")
spark.conf.set(f"fs.azure.sas.token.provider.type.{storage_account_name}.dfs.core.windows.net", 
               "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set(f"fs.azure.sas.fixed.token.{storage_account_name}.dfs.core.windows.net", sas_token)

parquet_path = "abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/curated_sales_data.parquet/"
df_curated = spark.read.parquet(parquet_path)

df_curated.show(5)
df_curated.printSchema()


In [None]:
from pyspark.sql.functions import dayofweek, weekofyear, month, year, to_date, col, when

df_curated = df_curated.withColumn("sales_date", to_date(col("sales_date"), "yyyy-MM-dd"))
df_fe = (df_curated
    .withColumn("day_of_week", dayofweek(col("sales_date")))   # returns 1=Sunday, 2=Monday, ...
    .withColumn("week_of_year", weekofyear(col("sales_date")))
    .withColumn("month", month(col("sales_date")))
    .withColumn("year", year(col("sales_date")))
)

df_fe.show(5)

In [None]:
df_fe = df_fe.withColumn("is_holiday", when(col("month") == 1, 1).otherwise(0))

In [None]:
import pyspark.sql.functions as F
from pyspark.sql.window import Window

# Partition by store_id, product_id, ordered by sales_date ascending
windowSpec = Window.partitionBy("store_id", "product_id").orderBy("sales_date")

# 7-Day Rolling Sales
df_fe = df_fe.withColumn("rolling_7d_qty",
    F.avg(col("sales_qty")).over(windowSpec.rowsBetween(-6, 0))
)

df_fe.show(5)


In [None]:
promotions_df = spark.read.option("header","true") \
    .csv(f"abfss://{your 1st container name}@{storage_account_name}.dfs.core.windows.net/promotions.csv")

# Convert date columns to DateType
promotions_df = promotions_df \
    .withColumn("promo_start", to_date(col("promo_start"), "yyyy-MM-dd")) \
    .withColumn("promo_end", to_date(col("promo_end"), "yyyy-MM-dd"))

# Join on store_id, product_id, and filter by date range
joined_df = df_fe.join(
    promotions_df,
    on=["store_id", "product_id"],
    how="left"
)

# Then create a new column is_promo_active
joined_df = joined_df.withColumn(
    "is_promo_active",
    when(
        (col("sales_date") >= col("promo_start")) & (col("sales_date") <= col("promo_end")),
        1
    ).otherwise(0)
)

joined_df.show(5)


In [None]:
output_path = f"abfss://{your first container name}@{storage_account_name}.dfs.core.windows.net/cpg_features.parquet"
joined_df.write.mode("overwrite").parquet(output_path)


In [None]:
df_check = spark.read.parquet(output_path)
df_check.show(5)

In [None]:
display(dbutils.fs.ls(f"abfss://raw@{storage_account_name}.dfs.core.windows.net/"))