In [9]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

In [10]:
spark = SparkSession.builder.appName('sparkApp').config("spark.python.worker.timeout", "120") .getOrCreate()

In [11]:
df = spark.read.parquet("../../data/processed/cleanedData.parquet")
df.show()

+--------------+-----------+-------------------+---------+---+------+------+----------------+----------+----+---------+--------+---------------+---------+------------+----------------+------------+---------------+--------------+------------+-------+
|Transaction_ID|Customer_ID|               City|  Country|Age|Gender|Income|Customer_Segment|      Date|Year|    Month|    Time|Total_Purchases|   Amount|Total_Amount|Product_Category|Product_Type|Shipping_Method|Payment_Method|Order_Status|Ratings|
+--------------+-----------+-------------------+---------+---+------+------+----------------+----------+----+---------+--------+---------------+---------+------------+----------------+------------+---------------+--------------+------------+-------+
|       1000043|      91680|         Fort Worth|      USA| 19|  Male|   Low|             New|2023-11-23|2023| November| 8:23:26|             10|285.67474|   2856.7476|     Electronics|  Smartphone|       Same-Day|        PayPal|   Delivered|      4|


In [12]:
df = df.groupBy("Product_Category", "Date").agg(F.sum(F.col('Total_Purchases')).alias("Total_Purchases")).orderBy("Date", "Product_Category")
df.show(50)

+----------------+----------+---------------+
|Product_Category|      Date|Total_Purchases|
+----------------+----------+---------------+
|           Books|2001-01-24|            747|
|        Clothing|2001-01-24|            732|
|     Electronics|2001-01-24|           1062|
|         Grocery|2001-01-24|            882|
|      Home Decor|2001-01-24|            774|
|           Books|2001-02-24|            954|
|        Clothing|2001-02-24|            733|
|     Electronics|2001-02-24|            982|
|         Grocery|2001-02-24|            821|
|      Home Decor|2001-02-24|            651|
|           Books|2001-03-24|            840|
|        Clothing|2001-03-24|            838|
|     Electronics|2001-03-24|            868|
|         Grocery|2001-03-24|            954|
|      Home Decor|2001-03-24|            897|
|           Books|2001-04-24|            775|
|        Clothing|2001-04-24|            743|
|     Electronics|2001-04-24|            949|
|         Grocery|2001-04-24|     

In [13]:
df.createOrReplaceTempView("sales_data")

In [14]:
minMaxDates = spark.sql("""
    SELECT 
        Product_Category, 
        MIN(Date) AS min_date, 
        MAX(Date) AS max_date 
    FROM sales_data 
    GROUP BY Product_Category
""")
minMaxDates.createOrReplaceTempView("minMaxDates")


dateSeries = spark.sql("""
    SELECT 
        Product_Category, 
        date_add(min_date, idx) AS Date
    FROM (
        SELECT 
            Product_Category, 
            min_date, 
            max_date, 
            posexplode(
                split(space(datediff(max_date, min_date)), ' ')
            ) AS (idx, _)
        FROM minMaxDates
    )
""")
dateSeries.createOrReplaceTempView("dateSeries")


dfFilled = spark.sql("""
    SELECT 
        ds.Product_Category, 
        ds.Date, 
        COALESCE(sd.Total_Purchases, 0) AS Total_Purchases
    FROM dateSeries ds
    LEFT JOIN sales_data sd
    ON ds.Product_Category = sd.Product_Category AND ds.Date = sd.Date
""")
dfFilled.createOrReplaceTempView("filled_data")


dfInterpolated = spark.sql("""
    SELECT 
        Product_Category, 
        Date, 
        Total_Purchases,
        COALESCE(
            Total_Purchases,
            AVG(Total_Purchases) OVER (
                PARTITION BY Product_Category 
                ORDER BY Date 
                ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING
            )
        ) AS Interpolated_Purchases
    FROM filled_data
""")

dfInterpolated.show(60)


+----------------+----------+---------------+----------------------+
|Product_Category|      Date|Total_Purchases|Interpolated_Purchases|
+----------------+----------+---------------+----------------------+
|         Grocery|2001-01-24|            882|                 882.0|
|         Grocery|2001-01-25|              0|                   0.0|
|         Grocery|2001-01-26|              0|                   0.0|
|         Grocery|2001-01-27|              0|                   0.0|
|         Grocery|2001-01-28|              0|                   0.0|
|         Grocery|2001-01-29|              0|                   0.0|
|         Grocery|2001-01-30|              0|                   0.0|
|         Grocery|2001-01-31|              0|                   0.0|
|         Grocery|2001-02-01|              0|                   0.0|
|         Grocery|2001-02-02|              0|                   0.0|
|         Grocery|2001-02-03|              0|                   0.0|
|         Grocery|2001-02-04|     

In [None]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Input

In [16]:
def salesPreProcess(data, seqLength = 30):
    scaler = MinMaxScaler(feature_range=(0, 1))
    scaledData = scaler.fit_transform(data)
    
    X, y = [], []
    for i in range(seqLength, len(scaledData)):
        X.append(scaledData[i-seqLength:i, 0])
        y.append(scaledData[i, 0])
    
    X, y = np.array(X), np.array(y)
    X = np.reshape(X, (X.shape[0], X.shape[1], 1))
    return X, y, scaler

In [None]:
def model(inputShape):
    model = Sequential()
    model.add(Input(shape=inputShape)) 
    model.add(LSTM(50, return_sequences=True))
    model.add(LSTM(50, return_sequences=False))
    model.add(Dense(25))
    model.add(Dense(1))
    model.compile(optimizer='adam', loss='mean_squared_error')
    return model

In [18]:
def prediction(model, data, scaler, seqLength = 30):
    lastSequence = data[-seqLength:]
    lastSequence = np.reshape(lastSequence, (1, seqLength, 1))
    predictedScaled = model.predict(lastSequence)
    predictedValue = scaler.inverse_transform(predictedScaled)
    return predictedValue[0][0]

In [19]:
predictions = {}
pdf = dfInterpolated.toPandas()

for product_category in pdf['Product_Category'].unique():
    categoryDf = pdf[pdf['Product_Category'] == product_category]
    categoryDf = categoryDf.sort_values(by='Date')
    
    data = categoryDf['Total_Purchases'].values.reshape(-1, 1)
    X, y, scaler = salesPreProcess(data)

    model = model((X.shape[1], 1))
    model.fit(X, y, batch_size=1, epochs=10, verbose=2)
    
    predictedResults = prediction(model, data, scaler)
    predictions[product_category] = predictedResults
    
for category, pred in predictions.items():
    print(f"Predicted Total Purchases for {category} next month: {pred:.2f}")

  super().__init__(**kwargs)


Epoch 1/10
8407/8407 - 92s - 11ms/step - loss: 0.0156
Epoch 2/10


KeyboardInterrupt: 