In [1]:
import pandas as pd
import numpy as np
import findspark
from sklearn.model_selection import train_test_split, TimeSeriesSplit
from sklearn.preprocessing import MinMaxScaler, LabelEncoder #, StandardScaler
import matplotlib.pyplot as plt
import seaborn as sns
from statsmodels.tsa.holtwinters import ExponentialSmoothing
import xgboost as xgb
from statsmodels.tsa.stattools import adfuller
from statsmodels.tsa.seasonal import seasonal_decompose
from sklearn.metrics import mean_squared_error,mean_absolute_error, r2_score
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, explode, sequence, to_date, lag
from pyspark.sql.window import Window
import pyspark.sql.functions as func
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler , StandardScaler
from sklearn.svm import OneClassSVM


In [2]:
findspark.init()

In [3]:
import os
os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk-11'
os.environ['HADOOP_HOME'] = r'C:\Program Files\hadoop-2.7.1'
os.environ['SPARK_HOME'] = r'C:\Program Files\Anaconda\envs\newenv\Lib\site-packages\pyspark'
os.environ['PYSPARK_PYTHON'] = r'C:\Program Files\Anaconda\envs\newenv\python.exe'
os.environ['PYSPARK_DRIVER_PYTHON'] = r'C:\Program Files\Anaconda\envs\newenv\python.exe'


In [4]:
#!echo %JAVA_HOME%
#!echo %HADOOP_HOME%
#!echo %PATH%


In [5]:
# Stop the existing Spark session
if 'spark' in locals():
    spark.stop()
spark = SparkSession.builder.appName("PricePredictionModel").config("spark.driver.memory", "12g").config("spark.executor.memory", "12g").config("spark.driver.maxResultSize", "12g").getOrCreate()

In [6]:

# Load the Australian Grocery Dataset
grocery_df = spark.read.csv("AustralianDataset/Australia_Grocery_2022Sep.csv", header=True, inferSchema=True)
grocery_df = grocery_df.withColumnRenamed("Package_price", "Package_price_grocery")
grocery_df = grocery_df.withColumnRenamed("RunDate", "RunDate_grocery")

# Load the Synthetic Dataset
synthetic_df = spark.read.csv("AustralianDataset/synthethic_time_point.csv", header=True, inferSchema=True)
synthetic_df = synthetic_df.withColumnRenamed("Package_price", "Package_price_synthetic")
synthetic_df = synthetic_df.withColumnRenamed("RunDate", "RunDate_synthetic")


In [7]:
# Join the datasets on SKU
combined_df = grocery_df.join(synthetic_df, on=["Sku", "city"], how="left")


In [8]:
# Select the RunDate and Package_price based on the availability in synthetic_df
combined_df = combined_df.withColumn("RunDate_final",
    func.when(col("RunDate_synthetic").isNotNull(), col("RunDate_synthetic")).otherwise(col("RunDate_grocery"))
    ).withColumn("Package_price_final",
    func.when(col("Package_price_synthetic").isNotNull(), col("Package_price_synthetic")).otherwise(col("Package_price_grocery"))
    )

# Drop intermediate columns if necessary
combined_df = combined_df.drop("RunDate_grocery", "RunDate_synthetic", "Package_price_grocery", "Package_price_synthetic")

# Show a few rows of the final DataFrame to verify
combined_df.show(5)

+--------+----------+------+-----------+--------------------+------------+-------------+------------------+--------------+------------+------------+----------+--------+------------+--------------------+----------------+----------+---------------+-----+--------+--------+-------------+-------------------+
|     Sku|      city| index|Postal_code|            Category|Sub_category|Product_Group|      Product_Name|Price_per_unit|package_size|is_estimated|is_special|in_stock|Retail_price|         Product_Url|           Brand|unit_price|unit_price_unit|state|     tid|     _c0|RunDate_final|Package_price_final|
+--------+----------+------+-----------+--------------------+------------+-------------+------------------+--------------+------------+------------+----------+--------+------------+--------------------+----------------+----------+---------------+-----+--------+--------+-------------+-------------------+
|1010087P|BIGGS FLAT|192519|       5153|Dairy, eggs & fridge|      Cheese| Block chee

In [9]:
# Check data for SKU 1274566P before feature engineering
combined_df.filter(col("Sku") == "1274566P").select("Sku", "RunDate_final", "Package_price_final").show()


+--------+-------------+-------------------+
|     Sku|RunDate_final|Package_price_final|
+--------+-------------+-------------------+
|1274566P|   2022-11-09|                3.5|
|1274566P|   2022-11-10| 3.4992970916750075|
|1274566P|   2022-11-11| 3.4919676439251823|
|1274566P|   2022-11-12|  3.483318804164933|
|1274566P|   2022-11-13|  3.476728436445554|
|1274566P|   2022-11-14| 3.4713751650882436|
|1274566P|   2022-11-15|  3.468289742285664|
|1274566P|   2022-11-16|  3.469628802815525|
|1274566P|   2022-11-17| 3.4685520919892463|
|1274566P|   2022-11-18| 3.4618283238363503|
|1274566P|   2022-11-19|  3.459560920240769|
|1274566P|   2022-11-20| 3.4577208827467465|
|1274566P|   2022-11-21| 3.4587357747089897|
|1274566P|   2022-11-22| 3.4547550379785044|
|1274566P|   2022-11-23| 3.4830236298370316|
|1274566P|   2022-11-24|  3.465031927594927|
|1274566P|   2022-11-25| 3.4631063215063502|
|1274566P|   2022-11-26| 3.4582813749440575|
|1274566P|   2022-11-27| 3.4847090121384356|
|1274566P|

In [10]:
necessary_columns = ['Sku', 'Category', 'Sub_category', 'Product_Group', 'Product_Name', 'Brand', 'state', 'city','RunDate_final', 'Package_price_final']

combined_df = combined_df.select(necessary_columns)


In [11]:
# Get the count of rows in combined_df
row_count = combined_df.count()

# Print the count
print(f"Total number of rows in combined_df: {row_count}") #189592320

Total number of rows in combined_df: 189592320


In [12]:
# Count the total number of unique SKUs in combined_df
#total_sku_count = combined_df.select("Sku").distinct().count()

#print(f"Total number of unique SKUs: {total_sku_count}")  -- 8092


In [13]:
sku_counts = combined_df.groupBy("Sku").count()

# Step 2: Select the top 50 SKUs based on the count
top_120_skus = sku_counts.orderBy(func.desc("count")).limit(120).select("Sku")

# Step 3: Collect top 50 SKUs as a list
top_120_sku_list = [row['Sku'] for row in top_120_skus.collect()]

# Step 4: Filter the combined_df to keep only the rows with SKUs in top_120_sku_list
combined_df = combined_df.filter(combined_df.Sku.isin(top_120_sku_list))


In [14]:
# Get the count of rows in combined_df
row_count = combined_df.count()

# Print the count
print(f"Total number of rows in combined_df: {row_count}")

Total number of rows in combined_df: 7726244


In [15]:
combined_df = combined_df.withColumn("RunDate_final_str", func.date_format("RunDate_final", "yyyy-MM-dd"))

# Apply Label Encoding to categorical columns
indexers = [
    StringIndexer(inputCol=column, outputCol=column+'_index').fit(combined_df)
    for column in ['Category', 'Sub_category', 'Product_Group', 'Brand', 'state', 'city']
]

pipeline = Pipeline(stages=indexers)
combined_df = pipeline.fit(combined_df).transform(combined_df)

In [16]:
combined_df.printSchema()

root
 |-- Sku: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Sub_category: string (nullable = true)
 |-- Product_Group: string (nullable = true)
 |-- Product_Name: string (nullable = true)
 |-- Brand: string (nullable = true)
 |-- state: string (nullable = true)
 |-- city: string (nullable = true)
 |-- RunDate_final: string (nullable = true)
 |-- Package_price_final: double (nullable = true)
 |-- RunDate_final_str: string (nullable = true)
 |-- Category_index: double (nullable = false)
 |-- Sub_category_index: double (nullable = false)
 |-- Product_Group_index: double (nullable = false)
 |-- Brand_index: double (nullable = false)
 |-- state_index: double (nullable = false)
 |-- city_index: double (nullable = false)



In [17]:
median_prices = combined_df.groupBy("Sku", "city").agg(func.expr('percentile_approx(Package_price_final, 0.5)').alias('median_price'))
combined_df = combined_df.join(median_prices, on=["Sku", "city"], how="left")
combined_df = combined_df.withColumn("Package_price_final",func.when(func.col("Package_price_final").isNull(), func.col("median_price")).otherwise(func.col("Package_price_final"))).drop("median_price")
missing_count_price = combined_df.filter(func.col("Package_price_final").isNull()).count()
print(f"Package_price_synthetic: {missing_count_price} missing values after filling with median")


Package_price_synthetic: 0 missing values after filling with median


In [18]:
# Check data for SKU 1274566P before feature engineering
combined_df.filter(col("Sku") == "1274566P").select("Sku", "RunDate_final", "Package_price_final").show()


+--------+-------------+-------------------+
|     Sku|RunDate_final|Package_price_final|
+--------+-------------+-------------------+
|1274566P|   2022-11-09|                3.5|
|1274566P|   2022-11-10| 3.4992970916750075|
|1274566P|   2022-11-11| 3.4919676439251823|
|1274566P|   2022-11-12|  3.483318804164933|
|1274566P|   2022-11-13|  3.476728436445554|
|1274566P|   2022-11-14| 3.4713751650882436|
|1274566P|   2022-11-15|  3.468289742285664|
|1274566P|   2022-11-16|  3.469628802815525|
|1274566P|   2022-11-17| 3.4685520919892463|
|1274566P|   2022-11-18| 3.4618283238363503|
|1274566P|   2022-11-19|  3.459560920240769|
|1274566P|   2022-11-20| 3.4577208827467465|
|1274566P|   2022-11-21| 3.4587357747089897|
|1274566P|   2022-11-22| 3.4547550379785044|
|1274566P|   2022-11-23| 3.4830236298370316|
|1274566P|   2022-11-24|  3.465031927594927|
|1274566P|   2022-11-25| 3.4631063215063502|
|1274566P|   2022-11-26| 3.4582813749440575|
|1274566P|   2022-11-27| 3.4847090121384356|
|1274566P|

In [None]:
#median_prices = combined_df.groupBy("Sku", "city").agg(func.expr('percentile_approx(Package_price_synthetic, 0.5)').alias('median_price'))
#combined_df = combined_df.join(median_prices, on=["Sku", "city"], how="left")
#combined_df = combined_df.withColumn("Package_price_synthetic",func.when(func.col("Package_price_synthetic").isNull(), func.col("median_price")).otherwise(func.col("Package_price_synthetic"))).drop("median_price")
#missing_count_price = combined_df.filter(F.col("Package_price_synthetic").isNull()).count()
#print(f"Package_price_synthetic: {missing_count_price} missing values after filling with median")

#median_prices = combined_df.groupBy("Sku", "city").agg(func.expr('percentile_approx(Package_price_synthetic, 0.5)').alias('median_price'))
#combined_df = combined_df.join(median_prices, on=["Sku", "city"], how="left")
#combined_df = combined_df.withColumn("Package_price_synthetic",func.when(func.col("Package_price_synthetic").isNull(), func.col("median_price")).otherwise(F.col("Package_price_synthetic"))).drop("median_price")
        
for column in combined_df.columns:
    # Check if the column is of type FloatType or DoubleType
    if column == 'Package_price_final':
        pass       
    elif dict(combined_df.dtypes)[column] in ['float', 'double','integer' ]:
        # Check for both NaN and Null values
        missing_count = combined_df.filter(func.col(column).isNull() | func.isnan(column)).count()
        combined_df = combined_df.withColumn(column, func.when(func.col(column).isNull() | func.isnan(column), 0.0).otherwise(func.col(column)))

    
    elif dict(combined_df.dtypes)[column] in ['boolean']:
        missing_count = combined_df.filter(func.col(column).isNull()).count()
        combined_df = combined_df.withColumn(column, func.when(func.col(column).isNull(), False).otherwise(func.col(column)))
        
    else:
        # Only check for Null values
        missing_count = combined_df.filter(func.col(column).isNull()).count()
        combined_df = combined_df.withColumn(column, func.when(func.col(column).isNull(), '').otherwise(func.col(column)))
    
    print(f"{column}: {missing_count} missing values")


Sku: 0 missing values
city: 0 missing values
Category: 0 missing values
Sub_category: 0 missing values
Product_Group: 0 missing values
Product_Name: 0 missing values
Brand: 0 missing values


In [None]:
combined_df.printSchema()

In [None]:
# Creating Lag Features (1 day, 7 days, 30 days, 60 days, 90 days)
windowSpec = Window.partitionBy("Sku", "city").orderBy("RunDate_final").rowsBetween(Window.unboundedPreceding, -1)
combined_df = combined_df.withColumn("lag_1", func.lag("Package_price_final", 1).over(windowSpec))
combined_df = combined_df.withColumn("lag_7", func.lag("Package_price_final", 7).over(windowSpec))
combined_df = combined_df.withColumn("lag_30", func.lag("Package_price_final", 30).over(windowSpec))
combined_df = combined_df.withColumn("lag_60", func.lag("Package_price_final", 60).over(windowSpec))
combined_df = combined_df.withColumn("lag_90", func.lag("Package_price_final", 90).over(windowSpec))

# Creating Rolling Statistics using `RunDate_synthetic`
combined_df = combined_df.withColumn("rolling_mean_7", func.avg("Package_price_final").over(windowSpec))
combined_df = combined_df.withColumn("rolling_mean_14", func.avg("Package_price_final").over(windowSpec))
combined_df = combined_df.withColumn("rolling_mean_30", func.avg("Package_price_final").over(windowSpec))

# Creating Rolling Standard Deviation (Price Volatility)
combined_df = combined_df.withColumn("rolling_std_7", func.stddev("Package_price_final").over(windowSpec))
combined_df = combined_df.withColumn("rolling_std_30", func.stddev("Package_price_final").over(windowSpec))

# Creating Price Differencing Feature
combined_df = combined_df.withColumn("price_diff", combined_df["Package_price_final"] - func.lag(combined_df["Package_price_final"], 1).over(windowSpec))

# Seasonality Features using `RunDate_synthetic`
combined_df = combined_df.withColumn("day_of_week", func.dayofweek("RunDate_final"))
combined_df = combined_df.withColumn("month", func.month("RunDate_final"))


In [None]:
feature_columns = [
    "lag_1", "lag_7", "lag_30", "lag_60", "lag_90",
    "rolling_mean_7", "rolling_mean_14", "rolling_mean_30",
    "rolling_std_7", "rolling_std_30", "price_diff",
    "day_of_week", "month",
    "Category_index", "Sub_category_index",
    "Product_Group_index", 
    "Brand_index", "state_index", "city_index"
]

In [None]:
for column in feature_columns:
    missing_count = combined_df.filter(func.col(column).isNull()).count()
    if missing_count > 0:
        print(f"Column '{column}' has {missing_count} missing values. Filling them with zeros.")
        combined_df = combined_df.withColumn(column, func.when(func.col(column).isNull(), 0.0).otherwise(func.col(column)))


In [None]:
from pyspark.ml.feature import StandardScaler, VectorAssembler

if 'features' in combined_df.columns:
    combined_df = combined_df.drop('features')
    
assembler = VectorAssembler(inputCols=feature_columns, outputCol='features', handleInvalid='skip')
combined_df = assembler.transform(combined_df)

# Apply StandardScaler to scale the 'features' column
#scaler = StandardScaler(inputCol='features', outputCol='scaled_features')

In [None]:
import time
start_time = time.time()
scaler = StandardScaler(inputCol='features', outputCol='scaled_features', withMean=True, withStd=True)
#scaler = StandardScaler().setInputCol('features').setOutputCol('scaled_features')
scaler_model = scaler.fit(combined_df)
scaled_df = scaler_model.transform(combined_df)
end_time = time.time()
elapsed_time = start_time - end_time
print(f'Time Elapsed : {elapsed_time} seconds')

In [None]:
print(f"Number of records in scaled_df: {scaled_df.count()}")
scaled_df.show(5)

In [None]:
# Convert to Pandas for further processing with XGBoost and Exponential Smoothing
pandas_df = scaled_df.select("scaled_features", "Sku", "Category", "Sub_category", "Product_Group", "Product_Name", "Brand", "state", "city", "Package_price_final", "RunDate_final").toPandas()

cutoff_date = '2023-11-01'

# Split into train and test sets
train_df = pandas_df[pandas_df['RunDate_final'] < cutoff_date]
test_df = pandas_df[pandas_df['RunDate_final'] >= cutoff_date]



In [None]:
X_train = train_df['scaled_features'].tolist()
y_train = train_df['Package_price_final']
X_test = test_df['scaled_features'].tolist()
y_test = test_df['Package_price_final']


In [None]:
# Model 2: XGBoost
from xgboost import XGBRegressor

xgb_model = XGBRegressor(objective='reg:squarederror')
xgb_model.fit(X_train, y_train)
y_pred_xgb = xgb_model.predict(X_test)


In [None]:
# Evaluate XGBoost Model
mse_xgb = mean_squared_error(y_test, y_pred_xgb)
print(f"XGBoost MSE: {mse_xgb}")     #XGBoost MSE: 0.02099907390528608


In [None]:
# Calculate RMSE for XGBoost
rmse_xgb = np.sqrt(mse_xgb)
print(f"XGBoost RMSE: {rmse_xgb}")   #XGBoost RMSE: 0.14491057209633146

In [None]:
# Calculate MAE for XGBoost
mae_xgb = mean_absolute_error(y_test, y_pred_xgb)
print(f"XGBoost MAE: {mae_xgb}")    #XGBoost MAE: 0.0392099127999695

In [None]:
# Calculate R-squared for XGBoost
r2_xgb = r2_score(y_test, y_pred_xgb)
print(f"XGBoost R-Squared: {r2_xgb}")    #XGBoost R-Squared: 0.9995470814112523


In [None]:
# Saving the prices - pred vs actual
test_df_pd = test_df.copy()

# Add the actual and predicted values to the DataFrame
test_df_pd['Actual'] = y_test
test_df_pd['Predicted_XGBoost'] = y_pred_xgb
#test_df_pd['Predicted_ExponentialSmoothing'] = y_pred_es[:len(y_test)]  # Ensure the length matches y_test

# Save the DataFrame to a CSV file
test_df_pd.to_csv('predictions_with_all_features1.csv', index=False)


In [None]:
# Model 2: Exponential Smoothing
es_model = ExponentialSmoothing(y_train, trend="add", seasonal="add", seasonal_periods=12).fit()  
y_pred_es = es_model.forecast(len(y_test))

In [None]:
# Evaluate Exponential Smoothing Model
mse_es = mean_squared_error(y_test, y_pred_es)
print(f"Exponential Smoothing MSE: {mse_es}")
