In [0]:
df_filtered = spark.read.parquet("/Volumes/workspace/default/abc/retail_filtered.parquet/")



<h4>Convert InvoiceDate to proper TimestampType.</h4>

In [0]:
from pyspark.sql import functions as F

In [0]:
df_transformed = df_filtered.withColumn(
    "InvoiceDate",
    F.coalesce(
        F.to_timestamp("InvoiceDate", "M/d/yyyy H:mm"),
        F.to_timestamp("InvoiceDate", "MM/dd/yyyy HH:mm"),
        F.to_timestamp("InvoiceDate", "dd/MM/yyyy HH:mm")
    )
)

In [0]:
df_transformed.select("InvoiceDate").show(5, truncate=False)
df_transformed.printSchema()


<h4>Add a new column TotalPrice = Quantity * UnitPrice.</h4>

In [0]:
df_transformed=df_transformed.withColumn('TotalPrice',F.col('Quantity')*F.col('UnitPrice'))
display(df_transformed.limit(10))

<h4>Extract InvoiceYear, InvoiceMonth, and Weekday.</h4>

In [0]:
df_transformed=df_transformed.withColumns({
    'InvoiceYear':F.year('InvoiceDate'),
    'InvoiceMonth':F.month('InvoiceDate'),
    "InvoiceDay": F.dayofmonth("InvoiceDate"),
    "WeekDay": F.date_format("InvoiceDate", "E")
})
display(df_transformed.limit(10))

<h4>Find top 10 product by revenue. </h4>

In [0]:
top_10_products=(df_transformed
.groupby('Description')
.agg(F.sum('TotalPrice').alias('TotalRevenue'))
.orderBy(F.desc('TotalRevenue'))
.limit(10))


<h4>Find top 5 customers by total spend.</h4>

In [0]:
top_5_customers=(df_transformed
.filter(F.col("CustomerID").isNotNull())
.groupby('CustomerID')
.agg(F.sum('TotalPrice').alias('TotalSpend'))
.orderBy(F.desc('TotalSpend'))
.limit(5))

display(top_5_customers)

In [0]:
daily_country_revenue=(
    df_transformed.
    withColumn('Invoice_Date_only',F.to_date('InvoiceDate'))
    .groupby('Invoice_Date_only','Country')
    .agg(F.sum('TotalPrice').alias('DailyRevenue'))
    .orderBy('Invoice_Date_only','Country')
)    
display(daily_country_revenue)


In [0]:
df_transformed.columns

<h4>Use a window function to get each customerâ€™s most recent purchase date.</h4>

In [0]:
df_transformed=df_transformed.withColumn('InvoiceDateOnly',F.to_date('InvoiceDate'))
df_transformed.columns

In [0]:
df_transformed=df_transformed.filter(F.col("CustomerID").isNotNull())

In [0]:
from pyspark.sql.window import Window

windowspec=Window.partitionBy('CustomerID').orderBy(F.col('InvoiceDateOnly').desc())

In [0]:
df_ranked=(df_transformed.
withColumn('rank',F.dense_rank().over(windowspec)))


In [0]:
most_recent_purchase=df_ranked.filter(F.col('rank')==1).select('CustomerID','InvoiceDateOnly','rank')

In [0]:
display(most_recent_purchase)

In [0]:
df_ranked.columns

<h4>Calculate rolling 7-day sales total (use window with date ordering).</h4>

In [0]:
window_spec_2=(
    Window
    .partitionBy('Country')
    .orderBy(F.col('InvoiceDateOnly').cast("timestamp").cast("long"))
    .rangeBetween(-6*86400,0)
)

In [0]:
rolling_7days_Sales=(df_transformed.
withColumn('RollingDayRevenue',F.sum('TotalPrice').over(window_spec_2))
)


<h4>Find customers who purchased more than 10 different products.</h4>

In [0]:
customer_product_count=(
    df_transformed.
groupBy('CustomerID')
.agg(F.countDistinct('StockCode').alias('distinct_product_Count'))
)

customer_g10=customer_product_count.filter(customer_product_count['distinct_product_Count']>=10)
display(customer_g10)


In [0]:
silver_path = "/Volumes/workspace/default/abc/silver"

df_transformed.write.mode("overwrite").parquet(silver_path)