In [0]:
%sql
SELECT * FROM `hive_metastore`.`default`.`retail_data` limit 10000;

invoice_no,stock_code,description,quantity,invoice_date,unit_price,customer_id,country
489434,79323P,PINK CHERRY LIGHTS,12,2009-12-01T07:45:00.000+0000,6.75,13085.0,United Kingdom
489434,79323W,WHITE CHERRY LIGHTS,12,2009-12-01T07:45:00.000+0000,6.75,13085.0,United Kingdom
489434,22041,"RECORD FRAME 7"" SINGLE SIZE",48,2009-12-01T07:45:00.000+0000,2.1,13085.0,United Kingdom
489434,21232,STRAWBERRY CERAMIC TRINKET BOX,24,2009-12-01T07:45:00.000+0000,1.25,13085.0,United Kingdom
489434,22064,PINK DOUGHNUT TRINKET POT,24,2009-12-01T07:45:00.000+0000,1.65,13085.0,United Kingdom
489434,21871,SAVE THE PLANET MUG,24,2009-12-01T07:45:00.000+0000,1.25,13085.0,United Kingdom
489434,21523,FANCY FONT HOME SWEET HOME DOORMAT,10,2009-12-01T07:45:00.000+0000,5.95,13085.0,United Kingdom
489435,22350,CAT BOWL,12,2009-12-01T07:46:00.000+0000,2.55,13085.0,United Kingdom
489435,22349,"DOG BOWL , CHASING BALL DESIGN",12,2009-12-01T07:46:00.000+0000,3.75,13085.0,United Kingdom
489435,22195,HEART MEASURING SPOONS LARGE,24,2009-12-01T07:46:00.000+0000,1.65,13085.0,United Kingdom


**Load Data from sql table into DataFrame**

In [0]:
# Execute the SQL query
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, TimestampType
from pyspark.sql.functions import *
schema = StructType([
    StructField("invoice_no", StringType(), True),
    StructField("stock_code", StringType(), True),
    StructField("description", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("invoice_date", TimestampType(), True),
    StructField("unit_price", DoubleType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("country", StringType(), True)
])

# Read the table with the defined schema
df = spark.sql("SELECT * FROM hive_metastore.default.retail_data")
df = spark.createDataFrame(df.rdd, schema)
df.printSchema()

# Show the result
display(df.limit(10))


root
 |-- invoice_no: string (nullable = true)
 |-- stock_code: string (nullable = true)
 |-- description: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- invoice_date: timestamp (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- country: string (nullable = true)



invoice_no,stock_code,description,quantity,invoice_date,unit_price,customer_id,country
489434,79323P,PINK CHERRY LIGHTS,12,2009-12-01T07:45:00.000+0000,6.75,13085,United Kingdom
489434,79323W,WHITE CHERRY LIGHTS,12,2009-12-01T07:45:00.000+0000,6.75,13085,United Kingdom
489434,22041,"RECORD FRAME 7"" SINGLE SIZE",48,2009-12-01T07:45:00.000+0000,2.1,13085,United Kingdom
489434,21232,STRAWBERRY CERAMIC TRINKET BOX,24,2009-12-01T07:45:00.000+0000,1.25,13085,United Kingdom
489434,22064,PINK DOUGHNUT TRINKET POT,24,2009-12-01T07:45:00.000+0000,1.65,13085,United Kingdom
489434,21871,SAVE THE PLANET MUG,24,2009-12-01T07:45:00.000+0000,1.25,13085,United Kingdom
489434,21523,FANCY FONT HOME SWEET HOME DOORMAT,10,2009-12-01T07:45:00.000+0000,5.95,13085,United Kingdom
489435,22350,CAT BOWL,12,2009-12-01T07:46:00.000+0000,2.55,13085,United Kingdom
489435,22349,"DOG BOWL , CHASING BALL DESIGN",12,2009-12-01T07:46:00.000+0000,3.75,13085,United Kingdom
489435,22195,HEART MEASURING SPOONS LARGE,24,2009-12-01T07:46:00.000+0000,1.65,13085,United Kingdom


**Total Invoice Amount Distribution**

In [0]:
# Calculate invoice amounts
invoice_amounts = df.groupBy("invoice_no").agg(sum(col("unit_price") * col("quantity")).alias("invoice_amount"))

# Filter positive order amounts
positive_order_amounts = invoice_amounts.filter(col("invoice_amount") > 0)
display(positive_order_amounts)
# Calculate statistics
statistics = positive_order_amounts.agg(
    min("invoice_amount").alias("minimum"),
    max("invoice_amount").alias("maximum"),
    expr("percentile_approx(invoice_amount, 0.5)").alias("median"),
    expr("percentile_approx(invoice_amount, 0.75)").alias("mode"),  # Approximate mode
    mean("invoice_amount").alias("mean")
)
display(statistics)


invoice_no,invoice_amount
489677,192.0
491045,303.2
491658,155.05999999999997
493542,118.75
493977,275.95
494244,6711.0
494277,1335.92
495185,2507.06
495783,48.96
496171,199.3


minimum,maximum,median,mode,mean
0.19,168469.6,304.31,504.6,523.3016799241424


**Monthly Placed and Canceled Orders**

In [0]:
# Calculate the OrderYearMonth column
retail_df = df.withColumn('OrderYearMonth', expr("date_format(invoice_date, 'yyyyMM')"))

# Group by OrderYearMonth and calculate TotalOrders and CanceledOrders
grouped_orders = retail_df.groupby('OrderYearMonth').agg(
    countDistinct('invoice_no').alias('TotalOrders'),
    sum(when(col('invoice_no').startswith('C'), 1).otherwise(0) / 2).alias('CanceledOrders')
)

# Calculate PlacedOrders
grouped_orders = grouped_orders.withColumn('PlacedOrders', col('TotalOrders') - 2 * col('CanceledOrders'))
orders = grouped_orders.orderBy('OrderYearMonth')

# Display the grouped_orders DataFrame
display(orders)

OrderYearMonth,TotalOrders,CanceledOrders,PlacedOrders
200912,2330,507.5,1315.0
201001,1633,343.0,947.0
201002,1969,288.0,1393.0
201003,2367,422.0,1523.0
201004,1892,313.0,1266.0
201005,2418,491.5,1435.0
201006,2216,396.5,1423.0
201007,2017,367.0,1283.0
201008,1877,286.5,1304.0
201009,2375,406.0,1563.0


**Monthly Sales**

In [0]:
# Calculate the order amount
retail_df=retail_df.withColumn('OrderAmount', col('unit_price')*col('quantity'))

# Group the DataFrame by OrderYearMonth and sum the order amounts
monthly_sales= retail_df.groupby('OrderYearMonth').agg(sum('OrderAmount').alias('TotalAmount'))
# Show the result
display(monthly_sales.orderBy('OrderYearMonth'))

OrderYearMonth,TotalAmount
200912,799763.7100000143
201001,624032.8919999955
201002,533091.4260000042
201003,765848.7609999765
201004,590580.3919999823
201005,615322.8300000005
201006,679786.6099999842
201007,575236.349999999
201008,656776.3299999854
201009,853650.4309999745


**Monthly Sales Growth**

In [0]:
from pyspark.sql.functions import lag, col
from pyspark.sql.window import Window

# Add a column for previous month's total sales
window_spec = Window.orderBy('OrderYearMonth')
monthly_sales = monthly_sales.withColumn('PrevTotalAmount', lag('TotalAmount').over(window_spec))

# Calculate the sales growth percentage
monthly_sales = monthly_sales.withColumn('SalesGrowth', (col('TotalAmount') - col('PrevTotalAmount')) / col('PrevTotalAmount') * 100)

# Replace null values in the first row of 'SalesGrowth' column with 0
monthly_sales = monthly_sales.withColumn('SalesGrowth', when(col('SalesGrowth').isNull(), 0).otherwise(col('SalesGrowth')))

# Display the result
display(monthly_sales)

OrderYearMonth,TotalAmount,PrevTotalAmount,SalesGrowth
200912,799763.7100000143,,0.0
201001,624032.8919999955,799763.7100000143,-21.972842203607325
201002,533091.4260000042,624032.8919999955,-14.573184709627766
201003,765848.7609999765,533091.4260000042,43.661804269943474
201004,590580.3919999823,765848.7609999765,-22.88550663333901
201005,615322.8300000005,590580.3919999823,4.189512272195279
201006,679786.6099999842,615322.8300000005,10.476416095268814
201007,575236.349999999,679786.6099999842,-15.379864572499825
201008,656776.3299999854,575236.349999999,14.175039529401523
201009,853650.4309999745,656776.3299999854,29.97582160124337


**Monthly Active Users**

In [0]:
# Calculate the monthly active users
monthly_active_users = retail_df.groupby('OrderYearMonth').agg(countDistinct('customer_id').alias('ActiveUsers'))

# Rename the columns
monthly_active_users = monthly_active_users.withColumnRenamed('OrderYearMonth', 'OrderYearMonth') \
 .withColumnRenamed('ActiveUsers', 'ActiveUsers')

# Display the result
display(monthly_active_users.orderBy('OrderYearMonth'))

OrderYearMonth,ActiveUsers
200912,1045
201001,786
201002,807
201003,1111
201004,998
201005,1062
201006,1095
201007,988
201008,964
201009,1202


**New and Existing Users**

In [0]:
# Calculate the first purchase year-month for each user
first_purchase = retail_df.groupBy('customer_id').agg(min('OrderYearMonth').alias('first_purchase'))

# Join with transactional data to identify new and existing users
user_status = retail_df.join(first_purchase, on='customer_id', how='left')

# Create a column 'UserStatus' indicating if it's the first purchase month for each user
user_status = user_status.withColumn('UserStatus', when(col('OrderYearMonth') == col('first_purchase'), 'NewUser').otherwise('ExistingUser'))

# Calculate the count of new users and existing users for each year and month
user_counts = user_status.groupBy('OrderYearMonth', 'UserStatus').agg(countDistinct('customer_id').alias('UserCount'))

display(user_counts)




OrderYearMonth,UserStatus,UserCount
201101,NewUser,71
201012,ExistingUser,871
201104,NewUser,105
201111,NewUser,192
201107,NewUser,102
201111,ExistingUser,1519
201002,ExistingUser,444
201009,ExistingUser,960
201104,ExistingUser,794
201001,NewUser,394


In [0]:
from pyspark.sql.functions import col, max, datediff
import pyspark.sql.functions as F


## Remove rows with null customer_id
cleaned_df = retail_df.filter(col("customer_id").isNotNull())

# Calculate the reference date
reference_date = cleaned_df.agg(max("invoice_date")).collect()[0][0]

# Calculate recency, frequency, and monetary values for each customer
rfm_data = cleaned_df.groupby('customer_id').agg(
    datediff(F.lit(reference_date), max('invoice_date')).alias('Recency'),
    countDistinct('invoice_no').alias('Frequency'),
    sum('OrderAmount').alias('Monetary')
)

display(rfm_data)


customer_id,Recency,Frequency,Monetary
16574,71,3,1301.7799999999995
15727,16,15,9445.51
17389,0,77,54587.030000000006
15447,330,6,484.93
15619,10,1,336.40000000000003
18051,634,8,2275.98
13623,30,15,2446.36
12940,46,4,876.2900000000002
14450,180,7,1128.44
16503,106,13,3597.8899999999994
