# Data Access to Azure Gen2 storage

In [0]:
spark.conf.set("fs.azure.account.auth.type.customerrawstorage2.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.customerrawstorage2.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.customerrawstorage2.dfs.core.windows.net", "d61737f2-c5ff-49c5-8d39-ac28886f5a0d")
spark.conf.set("fs.azure.account.oauth2.client.secret.customerrawstorage2.dfs.core.windows.net", "RLh8Q~mwAR0pByd1flrrDwKZxf2ZMUel3PTizcmg")
spark.conf.set("fs.azure.account.oauth2.client.endpoint.customerrawstorage2.dfs.core.windows.net", "https://login.microsoftonline.com/729afaa2-2f3c-4167-bff8-2b2452c198ee/oauth2/token")

# Load the customer data 

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

cus_schema =  StructType([
    StructField("InvoiceNo", StringType(), True),
    StructField("StockCode", StringType(), True),
    StructField("Description", StringType(), True),
    StructField("Quantity", StringType(), True),
    StructField("InvoiceDate", StringType(), True),
    StructField("UnitPrice", StringType(), True),
    StructField("CustomerID", StringType(), True),
    StructField("Country", StringType(), True)
])
cust_raw = spark.read.format("parquet") \
    .options(header='true') \
    .load("abfss://rawstorage@customerrawstorage2.dfs.core.windows.net/customer/kani042/Customer_Segmentation_By_RFM_Analysis/main")



cust_raw.display()

# Data Cleaning

In [0]:
# Checking for Data types
cust_raw.describe()


In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
# Change the Datatype of the columns
cust_raw.withColumn("InvoiceNo", cust_raw["InvoiceNo"].cast(IntegerType())) \
.withColumn("Quantity", cust_raw["Quantity"].cast(IntegerType())) \
.withColumn("UnitPrice", cust_raw["UnitPrice"].cast(IntegerType())) \
.withColumn("CustomerID", cust_raw["CustomerID"].cast(IntegerType())) \
.withColumn("InvoiceDate", cust_raw["InvoiceDate"].cast(DateType()))

In [0]:
# Checking for missing values
missing_values = cust_raw.select([count(when(col(c).isNull(), c)).alias(c) for c in cust_raw.columns])
display(missing_values)

Insights : Only two columns Description and customerID has missing values.

In [0]:
# To get customer insights out of the data, customerid is a crucial column compared to other columns. So null value in customer id should be eliminated.
cust_raw = cust_raw.na.drop(subset=["CustomerID"])

In [0]:
import pandas as pd

# Getting statistics like mean and median for numerical columns
statistics = cust_raw.describe("Quantity", "UnitPrice").toPandas()
median_quantity = cust_raw.approxQuantile("Quantity", [0.5], 0.01)[0]
median_unit_price = cust_raw.approxQuantile("UnitPrice", [0.5], 0.01)[0]

statistics = statistics.append(
    pd.DataFrame({
        "summary": ["median"],
        "Quantity": [median_quantity],
        "UnitPrice": [median_unit_price]
    })
)

display(statistics)

Max values far higher than the median values. So it shows there is outliers in the data

In [0]:
# Eliminate the negative values in Quantity and UnitPrice
cust_raw = cust_raw.filter((cust_raw["Quantity"] > 0) & (cust_raw["UnitPrice"] > 0))

In [0]:
import pandas as pd

# Getting statistics like mean and median for numerical columns
statistics = cust_raw.describe("Quantity", "UnitPrice").toPandas()
median_quantity = cust_raw.approxQuantile("Quantity", [0.5], 0.01)[0]
median_unit_price = cust_raw.approxQuantile("UnitPrice", [0.5], 0.01)[0]

statistics = statistics.append(
    pd.DataFrame({
        "summary": ["median"],
        "Quantity": [median_quantity],
        "UnitPrice": [median_unit_price]
    })
)

display(statistics)

In [0]:
# Check for duplicate values with same InvoiceNo
cust_raw.groupBy("InvoiceNo").count().display()
cust_raw.where(col("InvoiceNo") == 536596).display()


The values are grouped by Invoice no and found to be different transactions with same invoice number.So these are not considered duplicates

# Exploratory Data Analysis

### Distribution of Quantity and unit price

In [0]:
# Plot the box plot of Quantity 
cust_raw.select("Quantity").toPandas().boxplot()


In [0]:
# Plot the box plot of UnitPrice
cust_raw.select("UnitPrice").toPandas().boxplot()

In [0]:
# Check the maximum value of Quantity
cust_raw.orderBy(desc("Quantity")).display()

# Check the maximum value of Unit Price
cust_raw.orderBy(desc("UnitPrice")).display()

The large value of Quantity and Unit Price doesn't seem to be errored value even though it is an outlier.

# Feature Engineering

In [0]:
# Derive the total amount of sales for each transaction
cust_raw = cust_raw.withColumn("TotalAmount", round(cust_raw["Quantity"] * cust_raw["UnitPrice"]))
cust_raw.display()

In [0]:
# Extract year column and month column from InvoiceDate
cust_raw = cust_raw.withColumn("InvoiceYear", year(col("InvoiceDate"))).withColumn("InvoiceMonth", month(col("InvoiceDate")))
cust_raw.display()

# Data Visualization and insights

In [0]:
# Grouping the data by customerid and aggregating the total amount of sales
# Display only top 10 customers
cust_raw.groupBy("CustomerID").agg(sum("TotalAmount").alias("TotalAmount")).orderBy(desc("TotalAmount")).limit(10).display()

In [0]:
# Grouping the data by country and aggregating the total amount of sales
cust_raw.groupBy("Country").agg(sum("TotalAmount").alias("TotalAmount")).orderBy(desc("TotalAmount")).limit(10).display()



Databricks visualization. Run in Databricks to view.

## Country-wise Sales Overview

- 🇸🇦 **Saudi Arabia** has the **lowest sales** among all countries analyzed.
- 🇬🇧 **United Kingdom** stands out with the **highest total sales** amount.

### Insights:
- The UK may be a mature or high-demand market, indicating successful market strategies or stronger customer loyalty.
- The contrast with Saudi Arabia suggests potential for growth through localized marketing, promotions, or partnerships.

### Recommendations:
- **Double down on the UK**: Continue optimizing campaigns and scaling successful strategies.
- **Explore opportunities in Saudi Arabia**: Identify barriers to growth and consider tailored approaches to market entry or expansion.

In [0]:
# Grouping the data by country and aggregating the total amount of sales to get the lowest Sales country
cust_raw.groupBy("Country").agg(sum("TotalAmount").alias("TotalAmount")).orderBy(asc("TotalAmount")).limit(10).display()



Databricks visualization. Run in Databricks to view.

## Country-wise Sales Comparison

Among all the countries analyzed, **Saudi Arabia** recorded the **lowest sales** during the observed period. This may indicate:

- Limited market penetration
- Lower demand for the product/service
- Cultural or economic factors influencing buying behavior

### Recommendations:
- Conduct market research to understand local consumer preferences.
- Explore localized marketing strategies to boost engagement.
- Evaluate distribution channels and partnerships in the region.

In [0]:
from pyspark.sql.window import Window
# Grouping the data by year and aggregating the total amount of sales
cust_raw.groupBy("InvoiceYear").agg(sum("TotalAmount").alias("TotalAmount")).orderBy(desc("InvoiceYear")).display()


Databricks visualization. Run in Databricks to view.

The Sales has increased to 13.54% in 2011 compared to 2010

In [0]:
# Trend analysis for the year 2011
# Grouping the data by Month and aggregating the total amount of sales for Year
cust_raw.filter(col('InvoiceYear')== 2011).groupBy("InvoiceMonth").agg(sum("TotalAmount").alias("TotalAmount")).orderBy(desc("InvoiceMonth")).display()

Databricks visualization. Run in Databricks to view.

In [0]:
# Trend Analysis for the year 2010
# Grouping the data by Month and aggregating the total amount of sales for Year
cust_raw.filter(col('InvoiceYear')== 2010).groupBy("InvoiceMonth").agg(sum("TotalAmount").alias("TotalAmount")).orderBy(desc("InvoiceMonth")).display()


Databricks visualization. Run in Databricks to view.

In [0]:
cust_raw.filter(col('InvoiceYear')== 2010).orderBy(asc('InvoiceMonth')).display()

The data for the years 2010 and 2011 reveals a significant increase in sales, with an overall growth of approximately 13.56%. A deeper look into the monthly breakdown indicates that sales consistently peak in December and January for both years.

Seasonal Pattern Observed
December and January are high-performing months.

This trend suggests a seasonal sales cycle, possibly influenced by:

Holiday shopping season (e.g., Christmas, New Year)
Post-holiday promotions
New Year resolutions and spending habits

# RFM Analysis

####1. Recency - How recently did the customer purchase?
####2.Frequency - How often do they purchase?
####3.Monetary Value - How much do they spend?

In [0]:
# Get the maximum date of the dataset
cust_raw.select(max("InvoiceDate")).display()

In [0]:
# Step 1: Get the global max date from the InvoiceDate column
reference_date = cust_raw.agg(max("InvoiceDate")).first()[0]

# Step 2: Add the reference date as a new column to the DataFrame
rfm_df = cust_raw.withColumn("reference_date", lit(reference_date))

#rfm_df.display()
rfm_df = rfm_df.groupBy("CustomerID").agg(
    max("InvoiceDate").alias("LastPurchaseDate"),
    count("InvoiceNo").alias("TotalPurchases"),
    sum("TotalAmount").alias("TotalAmount"),
    max('reference_date').alias('reference_date')
)
rfm_df.select(col('reference_date')).display()
rfm_df.display
rfm_df = rfm_df.withColumn("Recency", datediff(col("reference_date"), col("LastPurchaseDate"))).withColumn("Frequency", col("TotalPurchases")).withColumn("Monetary", col("TotalAmount"))
rfm_df.display()


In [0]:
# Get the Statistics of the RFM DataFrame
rfm_df.describe().toPandas().display()

In [0]:
# Segment the customers based on the RFM DataFrame to find the RFM score   
import pandas as pd
rfm_pd = rfm_df.toPandas()
# Segment each RFM column using qcut
# qcut automatically creates quantile-based bins and assigns labels (e.g., 1–4)
rfm_pd['R_Score'] = pd.qcut(rfm_pd['Recency'], q=4, labels=[4, 3, 2, 1])
rfm_pd['F_Score'] = pd.qcut(rfm_pd['Frequency'], q=4, labels=[1, 2, 3, 4])
rfm_pd['M_Score'] = pd.qcut(rfm_pd['Monetary'], q=4, labels=[1, 2, 3, 4])


In [0]:
# Create a new column RFM_Score by concatenating the R, F, and M scores
rfm_spark = spark.createDataFrame(rfm_pd)
rfm_spark = rfm_spark.groupby("CustomerID") \
    .agg(sum("R_Score").alias("R_Score"), sum("F_Score").alias("F_Score"), sum("M_Score").alias("M_Score"))
rfm_spark = rfm_spark.withColumn("RFM_Score", concat(col("R_Score"), col("F_Score"), col("M_Score"))) \
    .withColumn("RFM_ag",col("R_Score")+col("F_Score")+col("M_Score"))
rfm_spark.display()