In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [3]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

In [19]:
from pyspark.sql.types import StringType, FloatType, DoubleType
from pyspark.ml.feature import MinMaxScaler, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf, col

In [5]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# EDA

In [6]:
ecom_df = spark.read.csv('/content/drive/My Drive/Colab Notebooks/Applications of AI/Assignment 2/Ecomm-Customers.csv', header=True, multiLine=True)

In [7]:
ecom_df.count()

500

In [8]:
ecom_df.show(5)

+--------------------+--------------------+----------------+-------------------+------------------+------------------+--------------------+-------------------+
|               Email|             Address|          Avatar|Avg. Session Length|       Time on App|   Time on Website|Length of Membership|Yearly Amount Spent|
+--------------------+--------------------+----------------+-------------------+------------------+------------------+--------------------+-------------------+
|mstephenson@ferna...|835 Frank Tunnel
...|          Violet|  34.49726772511229|12.655651149166752| 39.57766801952616|   4.082620632952961|  587.9510539684005|
|   hduke@hotmail.com|4547 Archer Commo...|       DarkGreen| 31.926272026360156|11.109460728682564|37.268958868297744|    2.66403418213262|  392.2049334443264|
|    pallen@yahoo.com|24645 Valerie Uni...|          Bisque| 33.000914755642675|11.330278057777512| 37.11059744212085|   4.104543202376424| 487.54750486747207|
|riverarebecca@gma...|1414 David Throug.

### There are 8 columns. 3 columns contain strings, the rest are numeric. Of the string datatype features only one is categorical and loosely at that. We need only use the numeric data for regression.

In [9]:
ecom_df = ecom_df.withColumnRenamed('Avg. Session Length', 'Avg Session Length')

In [46]:
features = ecom_df.select('Avg Session Length', 'Time on App', 'Time on Website', 'Length of Membership', 'Yearly Amount Spent') 

In [48]:
features.describe()

summary,Avg Session Length,Time on App,Time on Website,Length of Membership,Yearly Amount Spent
count,500.0,500.0,500.0,500.0,500.0
mean,33.0531935181962,12.052487937166132,37.06044542094858,3.5334615559150557,499.314038258591
stddev,0.9925631110845354,0.9942156084725424,1.0104889067564031,0.9992775024112583,79.3147815497068
min,29.532428967057943,10.012583366223025,33.91384724758464,0.2699010899842742,256.67058229005585
max,36.13966248879052,9.98451439654646,40.005181638101895,6.922689335035807,765.5184619388372


In [49]:
for n in features.columns:
  features = features.withColumn(n, col(n).cast(FloatType()))

# Feature Engineering

### Standardize each column using MinMax scaling. It seems in PySpark we need to use something called a vectorizer and must run each column through a pipeline to do so.

In [50]:
unlist = udf(lambda x: round(float(list(x)[0]), 3), DoubleType())

for n in features.columns:
  assembler = VectorAssembler(inputCols=[n], outputCol=n+" Vect")

  scaler = MinMaxScaler(inputCol=n+" Vect", outputCol=n+" Scaled")

  pipeline = Pipeline(stages=[assembler, scaler])

  features = pipeline.fit(features).transform(features).withColumn(n+" Scaled", unlist(n+" Scaled")).drop(n+" Vect")

In [51]:
for n in features.columns:
  if 'Scaled' not in n:
    features = features.drop(n)

In [52]:
assembler = VectorAssembler(inputCols=['Avg Session Length Scaled',	'Time on App Scaled',	'Time on Website Scaled',	'Length of Membership Scaled'], outputCol='features')
features = assembler.transform(features)
features = features.select(['features', 'Yearly Amount Spent Scaled'])
features = features.withColumnRenamed('Yearly Amount Spent Scaled', 'target')

In [53]:
features.show(5)

+--------------------+------+
|            features|target|
+--------------------+------+
|[0.751,0.627,0.93...| 0.651|
|[0.362,0.393,0.55...| 0.266|
|[0.525,0.426,0.52...| 0.454|
|[0.722,0.787,0.46...| 0.639|
|[0.575,0.648,0.59...| 0.674|
+--------------------+------+
only showing top 5 rows



# Divide data into training and testing sets

In [54]:
train, test = features.randomSplit([0.8, 0.2])

# Apply linear regression. Train and test the model

In [55]:
lr = LinearRegression(featuresCol='features', labelCol='target')

model = lr.fit(train)

print('RMSE: %f' % model.summary.rootMeanSquaredError)
print('r2: %f' % model.summary.r2)

In [58]:
preds = model.transform(test)

In [59]:
preds.show(5)

+--------------------+------+-------------------+
|            features|target|         prediction|
+--------------------+------+-------------------+
|[0.158,0.43,0.521...| 0.364|0.36302439010486043|
|[0.182,0.624,0.37...| 0.403| 0.3806161877200489|
|[0.231,0.582,0.37...| 0.454| 0.4647593479736911|
|[0.29,0.241,0.678...| 0.318| 0.3336027709131625|
|[0.302,0.534,0.75...|   0.3|  0.315233170011078|
+--------------------+------+-------------------+
only showing top 5 rows



# Conclusion: 98.1% accuracy on the test set is superb. The model performed well on the test data. It generalized nicely.

In [60]:
eval = RegressionEvaluator(predictionCol='prediction', labelCol='target', metricName='r2')
eval.evaluate(preds)

0.9812224596442902