In this project, the aim is to predict customer yearly spending for an Ecommerce Online Store using PostgreSQL, Databricks and PySpark.
The dataset is loaded into here (Databricks) from my own PostgreSQL database. Data will be processed into a suitable format for training a linear regression model and will be used to predict the dependent variable (customer yearly spending) based on the independent variables (customer features).

Import data from PostgreSQL into Databricks

In [0]:
# Import the necessary libraries
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName('my_app').getOrCreate()

# Set the PostgreSQL database connection properties
jdbc_url = "jdbc:postgresql://localhost:5432/ecom_customers"
connection_properties = {
  "user": "postgres",
  "password": "password",
  "driver": "org.postgresql.Driver"
}

# Read data from the PostgreSQL table into a DataFrame
df = spark.read.jdbc(url=jdbc_url, table="csv_table", properties=connection_properties)

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
[0;32m<command-1523107731420195>[0m in [0;36m<cell line: 16>[0;34m()[0m
[1;32m     14[0m [0;34m[0m[0m
[1;32m     15[0m [0;31m# Read data from the PostgreSQL table into a DataFrame[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[0;32m---> 16[0;31m [0mdf[0m [0;34m=[0m [0mspark[0m[0;34m.[0m[0mread[0m[0;34m.[0m[0mjdbc[0m[0;34m([0m[0murl[0m[0;34m=[0m[0mjdbc_url[0m[0;34m,[0m [0mtable[0m[0;34m=[0m[0;34m"csv_table"[0m[0;34m,[0m [0mproperties[0m[0;34m=[0m[0mconnection_properties[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/instrumentation_utils.py[0m in [0;36mwrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m             [0mstart[0m [0;34m=[0m [0mtime[0m[0;34m.[0m[0mperf_counter[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34

Importing libraries

In [0]:
import pyspark 
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression


In [0]:
df.printSchema()

root
 |-- Email: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Avatar: string (nullable = true)
 |-- Avg Session Length: double (nullable = true)
 |-- Time on App: double (nullable = true)
 |-- Time on Website: double (nullable = true)
 |-- Length of Membership: double (nullable = true)
 |-- Yearly Amount Spent: double (nullable = true)



Viewing the dataframe

In [0]:
df.show(5)

+--------------------+--------------------+----------------+------------------+------------------+------------------+--------------------+-------------------+
|               Email|             Address|          Avatar|Avg Session Length|       Time on App|   Time on Website|Length of Membership|Yearly Amount Spent|
+--------------------+--------------------+----------------+------------------+------------------+------------------+--------------------+-------------------+
|mstephenson@ferna...|835 Frank TunnelW...|          Violet| 34.49726772511229| 12.65565114916675| 39.57766801952616|  4.0826206329529615|  587.9510539684005|
|   hduke@hotmail.com|4547 Archer Commo...|       DarkGreen| 31.92627202636016|11.109460728682564|37.268958868297744|    2.66403418213262|  392.2049334443264|
|    pallen@yahoo.com|24645 Valerie Uni...|          Bisque|33.000914755642675|11.330278057777512|37.110597442120856|   4.104543202376424| 487.54750486747207|
|riverarebecca@gma...|1414 David Throug...|   

In [0]:
df.columns

Out[53]: ['Email',
 'Address',
 'Avatar',
 'Avg Session Length',
 'Time on App',
 'Time on Website',
 'Length of Membership',
 'Yearly Amount Spent']

Transforming the columns of features into a single column of vectorized features

In [0]:
assembler = VectorAssembler(
    inputCols=["Avg Session Length", "Time on App", 
               "Time on Website",'Length of Membership'],
    outputCol="features")
output = assembler.transform(df)

Checking the single column of vectorized features

In [0]:
output.select("features").show(5, truncate=False)

+----------------------------------------------------------------------------+
|features                                                                    |
+----------------------------------------------------------------------------+
|[34.49726772511229,12.65565114916675,39.57766801952616,4.0826206329529615]  |
|[31.92627202636016,11.109460728682564,37.268958868297744,2.66403418213262]  |
|[33.000914755642675,11.330278057777512,37.110597442120856,4.104543202376424]|
|[34.30555662975554,13.717513665142507,36.72128267790313,3.120178782748092]  |
|[33.33067252364639,12.795188551078114,37.53665330059473,4.446308318351434]  |
+----------------------------------------------------------------------------+
only showing top 5 rows



Displaying the features and labels in a dataframe

In [0]:
final_data = output.select("features",'Yearly Amount Spent')
final_data.show(5)

+--------------------+-------------------+
|            features|Yearly Amount Spent|
+--------------------+-------------------+
|[34.4972677251122...|  587.9510539684005|
|[31.9262720263601...|  392.2049334443264|
|[33.0009147556426...| 487.54750486747207|
|[34.3055566297555...|  581.8523440352177|
|[33.3306725236463...|  599.4060920457634|
+--------------------+-------------------+
only showing top 5 rows



Split dataset into train and test sets

In [0]:
train_data,test_data = final_data.randomSplit([0.7,0.3])

In [0]:
train_data.describe().show()

+-------+-------------------+
|summary|Yearly Amount Spent|
+-------+-------------------+
|  count|                340|
|   mean| 501.50426291150103|
| stddev|  79.45108415839093|
|    min|   266.086340948469|
|    max|  744.2218671047146|
+-------+-------------------+



In [0]:
test_data.describe().show()

+-------+-------------------+
|summary|Yearly Amount Spent|
+-------+-------------------+
|  count|                160|
|   mean| 494.65981087115625|
| stddev|  79.07088617412838|
|    min| 256.67058229005585|
|    max|  765.5184619388373|
+-------+-------------------+



Creating the linear regression model

In [0]:
# Create a Linear Regression Model object
lr = LinearRegression(labelCol='Yearly Amount Spent')

In [0]:
# Fit the model to the train data
lrModel = lr.fit(train_data)

In [0]:
# Print the coefficients and intercept for linear regression
print("Coefficients: {} Intercept: {}".format(lrModel.coefficients,lrModel.intercept))

Coefficients: [25.544541948467252,38.84585501390898,0.3088833399471749,61.34687085354519] Intercept: -1041.2049522045318


In [0]:
test_results = lrModel.evaluate(test_data)

In [0]:
test_results.residuals.show(10)

+-------------------+
|          residuals|
+-------------------+
| -12.79604406632177|
| -6.418935218500621|
| -6.768670132395812|
| -22.15188688843648|
|-1.8745185512144644|
| 3.2809162988556864|
| 0.2515621158972863|
|   2.26307199686579|
| -5.902755736539461|
| -9.525889694139039|
+-------------------+
only showing top 10 rows



In [0]:
unlabeled_data = test_data.select('features').show(5)

+--------------------+
|            features|
+--------------------+
|[30.3931845423455...|
|[30.4925366965402...|
|[31.0613251567161...|
|[31.1239743499119...|
|[31.2606468698795...|
+--------------------+
only showing top 5 rows



Using model to predict on features from test set

In [0]:
predictions = lrModel.transform(unlabeled_data)

Observation of actual vs predicted results

In [0]:
merged_df = predictions.join(test_data, on=["features"], how="left")
merged_df = merged_df.withColumnRenamed("Yearly Amount Spent", "Actual Yearly Amount Spent").withColumnRenamed("prediction","Predicted").show(5)



+--------------------+------------------+--------------------------+
|            features|         Predicted|Actual Yearly Amount Spent|
+--------------------+------------------+--------------------------+
|[30.3931845423455...| 332.7249138695154|         319.9288698031936|
|[30.4925366965402...|288.89018093841514|         282.4712457199145|
|[31.0613251567161...| 494.3241281902974|         487.5554580579016|
|[31.1239743499119...|509.09894072820225|         486.9470538397658|
|[31.2606468698795...|423.20114980816584|         421.3266312569514|
+--------------------+------------------+--------------------------+
only showing top 5 rows



Metrics Evaluation

In [0]:
print("RMSE: {}".format(test_results.rootMeanSquaredError))
print("MSE: {}".format(test_results.meanSquaredError))

RMSE: 10.486866587468773
MSE: 109.97437082336896


In [0]:
test_data.describe().show()

+-------+-------------------+
|summary|Yearly Amount Spent|
+-------+-------------------+
|  count|                160|
|   mean| 494.65981087115625|
| stddev|  79.07088617412838|
|    min| 256.67058229005585|
|    max|  765.5184619388373|
+-------+-------------------+



In [0]:
print("RMSE as a percentage of mean value of test_date =", (10.486/494.659)*100)

RMSE as a percentage of mean value of test_date = 2.119844175482504


To conclude the results of this project, the RMSE value of 10.4 equates to only a 2.11% error when RMSE is taken as a percentage of the mean value from the test_date of 494.65. This shows that the model's performance is pretty good and that's a wrap for this project.

Author: Eugene Yee, 
Date: 23 Feb 2023