In [3]:
!pip install findspark



In [4]:
!pip install seaborn

Collecting seaborn
  Using cached seaborn-0.13.2-py3-none-any.whl.metadata (5.4 kB)
Using cached seaborn-0.13.2-py3-none-any.whl (294 kB)
Installing collected packages: seaborn
Successfully installed seaborn-0.13.2


In [15]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("BDM-3603-Assignment1").getOrCreate()

In [16]:
import pandas as pd
import matplotlib.pyplot as plt  
import seaborn as sns 

In [39]:
from pyspark.sql.functions import col, split

# Read the CSV file with the correct delimiter
data = spark.read.csv("online_retail.csv", header=True, inferSchema=True)


# Show the schema to verify
data.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)



In [30]:
# If the columns are still combined, split them
if len(Data.columns) == 1:
    column_names = ["InvoiceNo", "StockCode", "Description", "Quantity", "InvoiceDate", "UnitPrice", "CustomerID", "Country"]
    Data = Data.select(split(col(Data.columns[0]), ",").alias("temp"))
    for i, name in enumerate(column_names):
        Data = Data.withColumn(name, col("temp")[i])
    Data = Data.drop("temp")


In [40]:
total_rows = data.count()
print(f"Total number of rows: {total_rows}")

Total number of rows: 541909


In [19]:
# Step 2. Data Cleaning and Transformation
# Drop duplicates
data = data.dropDuplicates()

In [31]:
print(data.columns)

['InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country']


In [44]:
#check which colums have null values

from pyspark.sql.functions import col, count, when

null_counts = data.select([count(when(col(c).isNull(), c)).alias(c) for c in data.columns])
null_counts.show()

+---------+---------+-----------+--------+-----------+---------+----------+-------+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|
+---------+---------+-----------+--------+-----------+---------+----------+-------+
|        0|        0|          0|       0|          0|        0|         0|      0|
+---------+---------+-----------+--------+-----------+---------+----------+-------+



In [45]:
#check which columns have missing values

from pyspark.sql.functions import col, count, when, isnan

missing_counts = data.select([count(when(col(c).isNull() | isnan(c), c)).alias(c) for c in data.columns])
missing_counts.show()

+---------+---------+-----------+--------+-----------+---------+----------+-------+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|
+---------+---------+-----------+--------+-----------+---------+----------+-------+
|        0|        0|          0|       0|          0|        0|         0|      0|
+---------+---------+-----------+--------+-----------+---------+----------+-------+



In [43]:
#Fill missing Description with "No Description" and CustomerID with -1
data = data.fillna({"Description": "No Description", "CustomerID": -1})

In [12]:
# Handle missing values
# Here, we fill missing numeric values with 0 and string values with "Unknown" (modify as necessary) but this is not necessary for our case
from pyspark.sql.types import NumericType, StringType

# Identify numeric and string columns
numeric_columns = [f.name for f in data.schema.fields if isinstance(f.dataType, NumericType)]
string_columns = [f.name for f in data.schema.fields if isinstance(f.dataType, StringType)]

# Fill missing values
df_filled = data.fillna(0, subset=numeric_columns).fillna("Unknown", subset=string_columns)

In [47]:
from pyspark.sql.functions import col, to_date
from pyspark.sql.types import IntegerType, DoubleType, DateType
from pyspark.sql import functions as F

# Convert data types and handle missing values
# Convert Quantity to Integer, UnitPrice to Double, and InvoiceDate to Date
data = data.withColumn("Quantity", data["Quantity"].cast(IntegerType())) \
           .withColumn("UnitPrice", data["UnitPrice"].cast(DoubleType())) \
           .withColumn("InvoiceDate", F.to_date(data["InvoiceDate"], "MM/dd/yyyy"))



In [48]:
# 4. Filtering Out Invalid Rows
# Example: Filter out rows where Quantity or UnitPrice are non-positive
data = data.filter((data["Quantity"] > 0) & (data["UnitPrice"] > 0))

In [49]:
# If normalization is needed, we can scale columns using mean normalization
quantity_mean = data.select(F.mean("Quantity")).first()[0]
quantity_stddev = data.select(F.stddev("Quantity")).first()[0]

data = data.withColumn("Quantity_Normalized", 
                       (data["Quantity"] - quantity_mean) / quantity_stddev)

In [54]:
data.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = false)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: date (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = false)
 |-- Country: string (nullable = true)
 |-- Quantity_Normalized: double (nullable = true)



In [55]:

# Step 3. Data Analysis Using Spark SQL

# Register the cleaned DataFrame as a temporary SQL view
data.createOrReplaceTempView("invoice_data")

In [56]:
# 1. Aggregation: Summary Statistics
summary_stats = spark.sql("""
    SELECT 
        AVG(Quantity) AS avg_quantity,
        PERCENTILE(Quantity, 0.5) AS median_quantity,
        STDDEV(Quantity) AS stddev_quantity,
        AVG(UnitPrice) AS avg_unit_price,
        PERCENTILE(UnitPrice, 0.5) AS median_unit_price,
        STDDEV(UnitPrice) AS stddev_unit_price
    FROM invoice_data
""")
summary_stats.show()

+------------------+---------------+------------------+------------------+-----------------+-----------------+
|      avg_quantity|median_quantity|   stddev_quantity|    avg_unit_price|median_unit_price|stddev_unit_price|
+------------------+---------------+------------------+------------------+-----------------+-----------------+
|10.542037034242338|            3.0|155.52412351063617|3.9076252471218242|             2.08|35.91568110425546|
+------------------+---------------+------------------+------------------+-----------------+-----------------+



In [57]:
# 2. Grouping and Filtering: Group data by 'Country' and calculate total sales and average price
country_stats = spark.sql("""
    SELECT 
        Country,
        SUM(Quantity * UnitPrice) AS total_sales,
        AVG(UnitPrice) AS avg_unit_price
    FROM invoice_data
    GROUP BY Country
    HAVING total_sales > 10000
    ORDER BY total_sales DESC
""")
country_stats.show()

+---------------+------------------+------------------+
|        Country|       total_sales|    avg_unit_price|
+---------------+------------------+------------------+
| United Kingdom| 9025222.084000133| 3.849679429753172|
|    Netherlands|         285446.34|2.6484654514624855|
|           EIRE| 283453.9600000004|4.8783206590620924|
|        Germany|228867.13999999987| 3.709307522123887|
|         France|209715.11000000004| 4.400236707505655|
|      Australia|138521.30999999997|3.0562605752961063|
|          Spain|          61577.11| 3.826223832528186|
|    Switzerland| 57089.89999999997|3.3745473041709064|
|        Belgium| 41196.34000000002|3.6301575578532757|
|         Sweden|          38378.33|3.7600665188470073|
|          Japan|          37416.37|2.0473831775700937|
|         Norway|          36165.44| 5.287086834733894|
|       Portugal|33747.100000000006| 5.843251165889407|
|        Finland|22546.079999999998| 5.296992700729926|
|      Singapore|          21279.29| 58.33328828

3 Valuable Insights from the above aggregation and grouping:

1) Big Spread in Order Sizes: The average order quantity is 10.5, but the median is only 3, with a large standard deviation. This shows that while most orders are small, a few very large ones are pushing up the average.

2) UK Leads in Sales: The UK has by far the highest total sales at over 9 million, much more than any other country. Its average unit price is close to the global average, suggesting the UK is the main market, with possible growth opportunities in places like Australia and Germany.

3) High Price Differences by Country: Some countries, like Singapore and Hong Kong, have noticeably higher average prices, likely due to unique customer demands or higher-value products in those regions.

Machine Learning process:


In [70]:
data.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = false)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = false)
 |-- Country: string (nullable = true)
 |-- Quantity_Normalized: double (nullable = true)



In [80]:
from pyspark.sql.functions import to_timestamp

# Convert 'InvoiceDate' to a timestamp
data = data.withColumn("InvoiceDate", to_timestamp("InvoiceDate", "yyyy-MM-dd HH:mm:ss"))

# Check the schema to confirm it's now in the correct format
data.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = false)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = false)
 |-- Country: string (nullable = true)
 |-- Quantity_Normalized: double (nullable = true)



In [82]:
# Remove the 'InvoiceDate' column from the DataFrame. For some reason my dataframe would not function properly with the invoice date. Will look into it further.
data = data.drop("InvoiceDate")

# Show the first 5 rows to verify
data.show(5)

+---------+---------+--------------------+--------+---------+----------+--------------+--------------------+
|InvoiceNo|StockCode|         Description|Quantity|UnitPrice|CustomerID|       Country| Quantity_Normalized|
+---------+---------+--------------------+--------+---------+----------+--------------+--------------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|     2.55|     17850|United Kingdom|-0.02920471070156...|
|   536365|    71053| WHITE METAL LANTERN|       6|     3.39|     17850|United Kingdom|-0.02920471070156...|
|   536365|   84406B|CREAM CUPID HEART...|       8|     2.75|     17850|United Kingdom|-0.01634496936463...|
|   536365|   84029G|KNITTED UNION FLA...|       6|     3.39|     17850|United Kingdom|-0.02920471070156...|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|     3.39|     17850|United Kingdom|-0.02920471070156...|
+---------+---------+--------------------+--------+---------+----------+--------------+--------------------+
only showing top 5 

In [83]:
from pyspark.sql.functions import col

# Create a new column 'total_sales' by multiplying 'Quantity' and 'UnitPrice'
New_data = data.withColumn("total_sales", col("Quantity") * col("UnitPrice"))


In [106]:
New_data.show(5)

+---------+---------+--------------------+--------+---------+----------+--------------+--------------------+------------------+
|InvoiceNo|StockCode|         Description|Quantity|UnitPrice|CustomerID|       Country| Quantity_Normalized|       total_sales|
+---------+---------+--------------------+--------+---------+----------+--------------+--------------------+------------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|     2.55|     17850|United Kingdom|-0.02920471070156...|15.299999999999999|
|   536365|    71053| WHITE METAL LANTERN|       6|     3.39|     17850|United Kingdom|-0.02920471070156...|             20.34|
|   536365|   84406B|CREAM CUPID HEART...|       8|     2.75|     17850|United Kingdom|-0.01634496936463...|              22.0|
|   536365|   84029G|KNITTED UNION FLA...|       6|     3.39|     17850|United Kingdom|-0.02920471070156...|             20.34|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|     3.39|     17850|United Kingdom|-0.02920471070156

In [107]:
New_data = New_data.na.drop(subset=["Country", "Description", "Quantity", "UnitPrice"])


In [114]:
train_data = New_data.na.drop(column=["Country", "Quantity", "UnitPrice"])

In [115]:
train_data.show(5)

+---------+---------+--------------------+--------+---------+----------+--------------+--------------------+------------------+
|InvoiceNo|StockCode|         Description|Quantity|UnitPrice|CustomerID|       Country| Quantity_Normalized|       total_sales|
+---------+---------+--------------------+--------+---------+----------+--------------+--------------------+------------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|     2.55|     17850|United Kingdom|-0.02920471070156...|15.299999999999999|
|   536365|    71053| WHITE METAL LANTERN|       6|     3.39|     17850|United Kingdom|-0.02920471070156...|             20.34|
|   536365|   84406B|CREAM CUPID HEART...|       8|     2.75|     17850|United Kingdom|-0.01634496936463...|              22.0|
|   536365|   84029G|KNITTED UNION FLA...|       6|     3.39|     17850|United Kingdom|-0.02920471070156...|             20.34|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|     3.39|     17850|United Kingdom|-0.02920471070156

In [108]:


# Split the data into training and test sets again
train_data, test_data = New_data.randomSplit([0.8, 0.2], seed=1234)

In [117]:
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator



#Encoding the 'Country' column
# Use StringIndexer to convert 'Country' to a numeric format
indexer_country = StringIndexer(inputCol="Country", outputCol="CountryIndex")
encoder_country = OneHotEncoder(inputCol="CountryIndex", outputCol="CountryVec")


# OneHotEncoder: Convert numerical indices into one-hot encoded vectors
encoder_country = OneHotEncoder(inputCol="CountryIndex", outputCol="CountryVec")
encoder_description = OneHotEncoder(inputCol="DescriptionIndex", outputCol="DescriptionVec")


In [118]:


#  Assemble features into a single vector
assembler = VectorAssembler(
    inputCols=["CountryVec", "Quantity", "UnitPrice"],  # Only these columns for features
    outputCol="features"
)





In [119]:
# Initialize the classifier

from pyspark.ml.regression import DecisionTreeRegressor
regressor = DecisionTreeRegressor(featuresCol="features", labelCol="total_sales")

In [120]:
# Split the data into training and testing sets (70% training, 30% testing)
train_data_split, test_data_split = train_data.randomSplit([0.7, 0.3], seed=42)

In [121]:
# Build the pipeline
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[indexer_country, encoder_country, assembler, regressor])


In [122]:
#Train the model on the training data
model = pipeline.fit(train_data)


In [123]:
# Make predictions on the test data
predictions = model.transform(test_data_split)

In [125]:
# Evaluate the model
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol="total_sales", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on test data = {rmse}")

Root Mean Squared Error (RMSE) on test data = 436.76738139297873


Model Tuning:

In [134]:
train_data.show(5)

+---------+---------+--------------------+--------+---------+----------+--------------+--------------------+------------------+
|InvoiceNo|StockCode|         Description|Quantity|UnitPrice|CustomerID|       Country| Quantity_Normalized|       total_sales|
+---------+---------+--------------------+--------+---------+----------+--------------+--------------------+------------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|     2.55|     17850|United Kingdom|-0.02920471070156...|15.299999999999999|
|   536365|    71053| WHITE METAL LANTERN|       6|     3.39|     17850|United Kingdom|-0.02920471070156...|             20.34|
|   536365|   84406B|CREAM CUPID HEART...|       8|     2.75|     17850|United Kingdom|-0.01634496936463...|              22.0|
|   536365|   84029G|KNITTED UNION FLA...|       6|     3.39|     17850|United Kingdom|-0.02920471070156...|             20.34|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|     3.39|     17850|United Kingdom|-0.02920471070156

In [135]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Step 1: Prepare the features
# First, we need to index the 'Country' column
indexer = StringIndexer(inputCol="Country", outputCol="CountryIndex")

# Then, we'll use OneHotEncoder to create dummy variables
encoder = OneHotEncoder(inputCols=["CountryIndex"], outputCols=["CountryVec"])

# Assemble the features into a single vector column
assembler = VectorAssembler(
    inputCols=["CountryVec", "UnitPrice", "Quantity"],
    outputCol="features"
)

In [138]:
# Step 2: Create the model
rf = RandomForestRegressor(labelCol="total_sales", featuresCol="features")


In [139]:
# Step 5: Set up the parameter grid for tuning
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 50, 100]) \
    .addGrid(rf.maxDepth, [5, 10, 15]) \
    .addGrid(rf.minInstancesPerNode, [1, 2, 4]) \
    .build()

In [141]:
# Step 6: Set up the cross-validator
evaluator = RegressionEvaluator(labelCol="total_sales", predictionCol="prediction", metricName="rmse")

cv = CrossValidator(estimator=pipeline,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator,
                    numFolds=3)  # Use 3-fold cross-validation

In [142]:
# Step 7: Fit the model
cvModel = cv.fit(train_data)

In [143]:
# Step 8: Make predictions on the test data
predictions = cvModel.transform(test_data)

In [144]:
# Step 9: Evaluate the model
rmse = evaluator.evaluate(predictions)
r2 = RegressionEvaluator(labelCol="total_sales", predictionCol="prediction", metricName="r2").evaluate(predictions)

print(f"Root Mean Squared Error (RMSE) on test data = {rmse}")
print(f"R-squared (R2) on test data = {r2}")


Root Mean Squared Error (RMSE) on test data = 65.46127173534236
R-squared (R2) on test data = 0.3707788975838975


In [145]:
# Step 10: Show feature importances
bestModel = cvModel.bestModel
featureImportances = bestModel.stages[-1].featureImportances
featureNames = ["Country"] + ["UnitPrice", "Quantity"]
for feature, importance in zip(featureNames, featureImportances):
    print(f"Feature: {feature}, Importance: {importance}")

Feature: Country, Importance: 0.03368380625729595
Feature: UnitPrice, Importance: 0.0002073017789438663
Feature: Quantity, Importance: 0.009288465532555855


In [None]:
# Optional: Show a sample of predictions
predictions.select("total_sales", "prediction", "Country", "UnitPrice", "Quantity").show(5)