In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
from pyspark.sql import SparkSession

In [None]:
Spark = SparkSession.builder.appName('One').getOrCreate()

In [None]:
Spark

In [None]:
dfSpark = Spark.read.csv('/kaggle/input/retail-sales-dataset/retail_sales_dataset.csv',header=True,inferSchema=True)
dfSpark.printSchema()

In [None]:
dfSpark.show(2)

In [None]:
TotalCustomer = dfSpark.select('Customer ID').count()
display(TotalCustomer)

In [None]:
UniqueCustomers = dfSpark.select('Customer ID').distinct()
UniqueCustomers.count()

What distinct customer groups can we identify based on demographics and purchase behavior to better target marketing and sales efforts?

In [None]:
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeans

In [None]:
# Encode categorical variables
gender_indexer = StringIndexer(inputCol="Gender", outputCol="GenderIndex")
category_indexer = StringIndexer(inputCol="Product Category", outputCol="CategoryIndex")

df_indexed = gender_indexer.fit(dfSpark).transform(dfSpark)
df_indexed = category_indexer.fit(df_indexed).transform(df_indexed)

# Assemble features
assembler = VectorAssembler(
    inputCols=["Age", "Quantity", "Total Amount", "GenderIndex", "CategoryIndex"], outputCol="features")

df_assembled = assembler.transform(df_indexed)

In [None]:
# Standardize
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
df_scaled = scaler.fit(df_assembled).transform(df_assembled)

# KMeans Clustering
kmeans = KMeans(featuresCol="scaledFeatures", k=4, seed=42)
model = kmeans.fit(df_scaled)
predictions = model.transform(df_scaled)

predictions.select("Customer ID", "prediction").show()

Can we predict daily sales for each product category based on the previous day’s sales data to improve inventory and demand planning?

In [None]:
from pyspark.sql.functions import to_date, sum as _sum

# Ensure Date is in proper format
df = dfSpark.withColumn("Date", to_date("Date", "yyyy-MM-dd"))

# Aggregate sales by date and category
df_sales = df.groupBy("Date", "Product Category") \
    .agg(_sum("Total Amount").alias("DailySales"))

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag

# Create lag feature for previous day sales
windowSpec = Window.partitionBy("Product Category").orderBy("Date")
df_sales = df_sales.withColumn("PrevDaySales", lag("DailySales", 1).over(windowSpec))

# Drop nulls (from lag)
df_lagged = df_sales.dropna()

# Assemble features
assembler = VectorAssembler(inputCols=["PrevDaySales"], outputCol="features")
df_features = assembler.transform(df_lagged)

In [None]:
# Train/Test Split
(train, test) = df_features.randomSplit([0.8, 0.2], seed=42)

# Regression Model
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol="features", labelCol="DailySales")
model = lr.fit(train)
predictions = model.transform(test)

predictions.select("Date", "Product Category", "DailySales", "prediction").show()

How does the price per unit affect the quantity sold, and can we predict sales volume based on product pricing?

In [None]:
from pyspark.ml.regression import LinearRegression

# Assemble features
assembler = VectorAssembler(inputCols=["Price per Unit"], outputCol="features")
df_reg = assembler.transform(df)

# Regression Model: Quantity ~ PricePerUnit
lr = LinearRegression(featuresCol="features", labelCol="Quantity")
model = lr.fit(df_reg)
predictions = model.transform(df_reg)

predictions.select("Product Category", "Price per Unit", "Quantity", "prediction").show()

In [None]:
from pyspark.sql.functions import collect_set

basket_df = dfSpark.groupBy("Transaction ID") \
    .agg(collect_set("Product Category").alias("items"))

basket_df.show()

In [None]:
from pyspark.ml.fpm import FPGrowth

# Run FPGrowth
fpGrowth = FPGrowth(itemsCol="items", minSupport=0.01, minConfidence=0.3)
model = fpGrowth.fit(basket_df)

model.freqItemsets.show(truncate=False)
model.associationRules.show(truncate=False)

In [None]:
from pyspark.sql.functions import size

basket_df = df.groupBy("Transaction ID") \
    .agg(collect_set("Product Category").alias("items"))

# Check how many transactions have more than 1 item
basket_df.withColumn("num_items", size("items")).groupBy("num_items").count().show()

In [None]:
from pyspark.sql.functions import rand

# Randomly assign a few customers multiple categories (just for experimentation)
df_fake = df.sample(withReplacement=True, fraction=0.3).withColumn("Transaction ID", df["Transaction ID"] + rand())

# Combine with original
df_combined = df.unionByName(df_fake)

# Recreate basket_df
basket_df = df_combined.groupBy("Transaction ID") \
    .agg(collect_set("Product Category").alias("items"))

In [None]:
from pyspark.ml.fpm import FPGrowth

fpGrowth = FPGrowth(itemsCol="items", minSupport=0.01, minConfidence=0.3)
model = fpGrowth.fit(basket_df)

model.freqItemsets.show(truncate=False)
model.associationRules.show(truncate=False)

Can we accurately predict the product category a customer will buy based on their demographics and transaction details?

In [None]:
from pyspark.ml.classification import RandomForestClassifier

# Encode categorical label
category_indexer = StringIndexer(inputCol="Product Category", outputCol="label")
gender_indexer = StringIndexer(inputCol="Gender", outputCol="GenderIndex")

df_encoded = gender_indexer.fit(df).transform(df)
df_encoded = category_indexer.fit(df_encoded).transform(df_encoded)

# Assemble features
assembler = VectorAssembler(inputCols=["Age", "GenderIndex", "Quantity", "Total Amount"], outputCol="features")
df_features = assembler.transform(df_encoded)

In [None]:
# Train/test split
(train, test) = df_features.randomSplit([0.8, 0.2], seed=42)

# Random Forest
rf = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=50)
model = rf.fit(train)
predictions = model.transform(test)

predictions.select("Age", "Gender", "Quantity", "Total Amount", "label", "prediction").show()