In [0]:
customer = spark.read.table("databricks_simulated_retail_customer_data.v01.customers")
sales = spark.read.table("databricks_simulated_retail_customer_data.v01.sales")

# Customer Sales Analysis

In [0]:
from pyspark.sql.functions import countDistinct, col

print("Total Customer \n")
display(customer.count())
print("\n")
print("Total Distinct Customers \n")
customer.select(countDistinct(col("customer_id"))).show()
print("\n")
print("Number of Duplicates")
customer.select(customer.count() - countDistinct(col("customer_id")).cast("int")).alias("Diff").show()


## Findings on Missing Values

During the analysis, we identified missing values in several key columns of the customer sales dataset. These gaps may impact the accuracy of sales insights and customer segmentation. The missing data is primarily concentrated in customer demographic fields and some transaction records. Addressing these missing values is recommended to ensure robust analysis and reliable reporting.

In [0]:
from pyspark.sql.functions import col, sum

missing_ratio = customer.select([
    (sum(col(c).isNull().cast("int")) / customer.count()).alias(c)
    for c in customer.columns
]).toPandas().iloc[0]

cols_2_drop = []
for cols, val in missing_ratio.items():
    if val > 0.00:
        print(f"{cols}: {val}")
        cols_2_drop.append(cols)

In [0]:
dupes = (customer.groupBy("customer_id").count().filter(col("count") > 1))
customer_duplicates = customer.join(dupes.select("customer_id"), on="customer_id", how="inner")
customer_no_dupes = customer.subtract(customer_duplicates)
display(customer_no_dupes.count())
print("\n")
display(customer.count())

In [0]:
customer_clean = customer_no_dupes.drop(*cols_2_drop)

In [0]:
customer_clean.select([
    (sum(col(c).isNull().cast("int"))/ customer_clean.count()).alias(c)
    for c in customer_clean.columns
]).toPandas()

In [0]:
columns_2_drop = [
 'customer_name',
 'state',
 'postcode',
 'street',
 'region',
 'ship_to_address',
 'valid_from',
 ]

customer_clean = customer_clean.drop(*columns_2_drop)

In [0]:
customer_clean.write.mode("overwrite").saveAsTable("default.customer_cleaned")

# Feature Engieering
  The only viable information that we get is that we are in the USA. Hence we have 50 States, as the Regions Tab contains a lot of NANs, and it would be important for the actual feature engieeering to use regions to cluster. We drop the regions column, and create 50 clusters using KNN based on the features `lon` and `lat`. The column `states`contains the respective names of the states, however they dont posess any information about proximity.

In [0]:
customer_clean.toPandas().head()

In [0]:
from pyspark.sql.functions import count, round

customer_clean = customer_clean.withColumn("lat", round(col("lat"), 2))
customer_clean = customer_clean.withColumn("lon", round(col("lon"), 2))

result = customer_clean.groupBy(
    "lat",
    "lon"
).agg(
    count("*").alias("lon_lat_count")
)
display(result.count())

In [0]:
from pyspark.ml.feature import VectorAssembler

vecAssembler = VectorAssembler(
    inputCols=["lat", "lon"],
    outputCol="lanlonVec"
)

customer_clean = vecAssembler.transform(customer_clean)
display(customer_clean)

In [0]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

silhoutte_score = []

evaluator = ClusteringEvaluator(predictionCol='prediction', featuresCol='lanlonVec', metricName='silhouette', distanceMeasure='squaredEuclidean')

for k in range(48,51):
    kmeans = KMeans(featuresCol="lanlonVec",k = k)
    model = kmeans.fit(final_data)
    predictions = model.transform(final_data)
    silhouette = evaluator.evaluate(predictions)
    silhoutte_score.append(silhouette)
    print("Silhouette with k = {}: {}".format(k, silhouette))

In [0]:
import matplotlib.pylab as plt

plt.plot(range(48,51),silhoutte_score)
plt.xlabel("k")
plt.ylabel("Silhouette Score")
plt.show()

In [0]:
kmeans = KMeans(featuresCol="lanlonVec",k = 50)
model = kmeans.fit(customer_clean)
predictions = model.transform(customer_clean)
#customer_clean = customer_clean.withColumnRenamed("predictions", "cluster")
#customer_clean = customer_clean.drop("lanlonVec", "lat", "lon")

customer_final = predictions.select("customer_id", "prediction", "units_purchased", "loyalty_segment")
customer_final.show(5)

In [0]:
customer_clean.write.mode("overwrite").saveAsTable("default.customer_cleaned_new")

# Sales Dataset


In [0]:
sales.printSchema()

In [0]:
sales_na = sales.select([
    (sum(col(c).isNull().cast("int")) / sales.count()).alias(c)
    for c in sales.columns
]).toPandas().iloc[0]

sales_dupes = sales.select([
    (countDistinct(col(c))).alias(c)
    for c in sales.columns
]).toPandas().iloc[0]

print("Missing Values in Sales Dataframe")
print(sales_na)

print("Duplicates in Sales Dataframe")
print(sales_dupes)

In [0]:
print("Sales Counts \n")
display(sales.count())
print("\n")

print("Sales Distinct Count\n")
display(sales.select(countDistinct(col("customer_id"))).show())
print("\n")

print("Difference Customer\n")
display(sales.count())




In [0]:
from pyspark.sql.functions import (
    sum, avg, count, countDistinct,
    min, max, stddev, col, datediff, lit
)

customer_features = sales.groupBy("customer_id").agg(
    count("*").alias("num_orders"),
    sum("total_price").alias("total_spend"),
    avg("total_price").alias("avg_order_value"),
    stddev("total_price").alias("std_order_value"),
    countDistinct("product").alias("num_distinct_products"),
    countDistinct("product_category").alias("num_distinct_categories"),
    min("order_date").alias("first_order_date"),
    max("order_date").alias("last_order_date")
)


customer_features = customer_features.withColumn(
    "customer_tenure_days",
    datediff(col("last_order_date"), col("first_order_date"))
)


max_date = sales.select(max("order_date")).collect()[0][0] #collect returns col[0] row[0]  

customer_features = customer_features.withColumn(
    "recency_days",
    datediff(lit(max_date), col("last_order_date"))
)


In [0]:
customer_features.toPandas().columns

# Merge the two dataframes to gain Informatin to only active customers

- An inner join would be the best idea because it ensures that only customers present in both `customer_features` and `customer_final` are included in the merged dataset. This avoids introducing records with missing or incomplete information, resulting in a clean dataset for analysis and modeling.

In [0]:
clustering_df = customer_final.join(
    customer_features,
    on="customer_id",
    how="inner"
)

clustering_df.write.mode("overwrite").saveAsTable("clustering_df")
