**Advance Aggregation and Enrichment**

In [None]:
full_orders_df.show()

In [None]:
# Total Revenue & Average Order Value (AOV) per Customer
customer_spending_df = full_orders_df.groupBy('customer_id')\
.agg(
    count('order_id').alias('total_orders'),
sum('price').alias('total_spent'),
round(avg('price'),2).alias('AOV'))\
.orderBy(desc('total_spent'))
customer_spending_df.show()

In [None]:
# Seller Performance Metrics ( Revenue, Average Review, Order Count)
seller_performance_df = full_orders_df.groupBy('seller_id') \
.agg(
    count('order_id').alias('total_orders'),
sum('price').alias('total_revenue'),
round(avg('review_score'),2).alias('avg_review_score'),
round(stddev('price'),2).alias('price_variability')
)\
.orderBy(desc('total_revenue'))

In [None]:
seller_performance_df.show()

In [None]:
# Product Popularity Metrics
product_metrics_df = full_orders_df.groupBy('product_id')\
.agg(
count('order_id').alias('total_sales'),
sum('price').alias('total_revenue'),
round(avg('price'),2).alias('avg_price'),
round(stddev('price'),2).alias('price_volatility'),\
collect_set('seller_id').alias('unique_sellers')
)\
.orderBy(desc('total_sales'))

In [None]:
# Customer Retention Analysis ( First & Last Order )

customer_retention_df = full_orders_df.groupBy('customer_id')\
.agg(
first('order_purchase_timestamp').alias('first_order_date'),
last('order_purchase_timestamp').alias('last_order_date'),
count('order_id').alias('total_orders'),
round(avg('price'),2).alias('aov')
)\
.orderBy(desc('total_orders'))


In [None]:
customer_retention_df.show()

In [None]:
# Order Status Flags
full_orders_df = full_orders_df.withColumn('is_delivered',when(col('order_status')== 'delivered',lit(1)).otherwise(lit(0)))\
.withColumn('is_canceled', when(col('order_status')== 'canceled',lit(1)).otherwise(lit(0)))


In [None]:
full_orders_df.where(full_orders_df['order_status']=='canceled').select('order_status','is_delivered','is_canceled').show(100)

In [None]:
# Order Revenue Calcualtion

full_orders_df = full_orders_df.withColumn('order_revenue',col('price')+col('freight_value'))
full_orders_df.select('price','freight_value','order_revenue').show()


In [None]:
# Customer Segmentation based on spending
customer_spending_df = customer_spending_df.withColumn(
'customer_segment',
when(col('AOV') >=1200,"High-Value")
.when( (col('AOV')<1200) & (col('AOV') >=700),'Medium_Value')
.otherwise('Low-Value'))
customer_spending_df.show()

In [None]:
# Weekday vs Weekend Order
full_orders_df = full_orders_df.withColumn('order_day_type',when(dayofweek('order_purchase_timestamp').isin(1,7),lit('Weekend')).otherwise(lit('weekday')))
full_orders_df.select('order_purchase_timestamp','order_day_type').show()

In [None]:
!hadoop fs-mkdir /olist/processed/
full_orders_df.write.mode('overwrite').parquet('/olist/processed')


In [None]:
!hadoop fs-ls-h /olist/processed/