In [1]:
import findspark
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline

# Linear Regression Using Apache Spark
We will use Apache Spark to build a linear regression model to predict the "Yearly Amount Spent" by a person on an app based on featues such as
* Avg Session Length
* Time on app
* Time on website
* Length of membership
* Avatar color

We will use Avatar Color to understand how to map a categorical variable using One Hot Encoding in Spark

In [2]:
# create a spark session
spark = SparkSession.builder\
        .appName("LinearRegDemo")\
        .getOrCreate()

In [3]:
# read the input file by specifying that the file has header and to create schema based on the file structure
inputFile = spark.read\
            .option("header", "true")\
            .option("inferSchema", "true")\
            .csv("ecommerce.csv")

In [4]:
# pandas style file read with the same options as above
inputFile = spark.read.csv("ecommerce.csv", inferSchema = True, header = True)

In [5]:
# take a look at the schema created
inputFile.printSchema()

root
 |-- Email: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Avatar: string (nullable = true)
 |-- Avg Session Length: double (nullable = true)
 |-- Time on App: double (nullable = true)
 |-- Time on Website: double (nullable = true)
 |-- Length of Membership: double (nullable = true)
 |-- Yearly Amount Spent: double (nullable = true)



In [6]:
# view the summary statistics
inputFile.describe().show()

+-------+-----------------+--------------------+-----------+------------------+------------------+------------------+--------------------+-------------------+
|summary|            Email|             Address|     Avatar|Avg Session Length|       Time on App|   Time on Website|Length of Membership|Yearly Amount Spent|
+-------+-----------------+--------------------+-----------+------------------+------------------+------------------+--------------------+-------------------+
|  count|              500|                 500|        500|               500|               500|               500|                 500|                500|
|   mean|             null|                null|       null| 33.05319351819619|12.052487937166134| 37.06044542094859|   3.533461555915055|  499.3140382585909|
| stddev|             null|                null|       null|0.9925631110845354|0.9942156084725424|1.0104889067564033|  0.9992775024112585|   79.3147815497068|
|    min|aaron04@yahoo.com|0001 Mack MillNor..

In [7]:
# view few of the records
inputFile.show(5)

+--------------------+--------------------+----------------+------------------+------------------+------------------+--------------------+-------------------+
|               Email|             Address|          Avatar|Avg Session Length|       Time on App|   Time on Website|Length of Membership|Yearly Amount Spent|
+--------------------+--------------------+----------------+------------------+------------------+------------------+--------------------+-------------------+
|mstephenson@ferna...|835 Frank TunnelW...|          Violet| 34.49726772511229| 12.65565114916675| 39.57766801952616|  4.0826206329529615|  587.9510539684005|
|   hduke@hotmail.com|4547 Archer Commo...|       DarkGreen| 31.92627202636016|11.109460728682564|37.268958868297744|    2.66403418213262|  392.2049334443264|
|    pallen@yahoo.com|24645 Valerie Uni...|          Bisque|33.000914755642675|11.330278057777512|37.110597442120856|   4.104543202376424| 487.54750486747207|
|riverarebecca@gma...|1414 David Throug...|   

In [8]:
inputFile.columns

['Email',
 'Address',
 'Avatar',
 'Avg Session Length',
 'Time on App',
 'Time on Website',
 'Length of Membership',
 'Yearly Amount Spent']

In [9]:
# specify how to assemble the independent data variables as a single vector
assembler = VectorAssembler(inputCols = ['Avg Session Length', 'Time on App', 'Time on Website', 'Length of Membership'],
                            outputCol = 'features')

In [10]:
# assemble the input data to the specified format and add it to the existing data
output = assembler.transform(inputFile)

In [11]:
output.head(1)

[Row(Email='mstephenson@fernandez.com', Address='835 Frank TunnelWrightmouth, MI 82180-9605', Avatar='Violet', Avg Session Length=34.49726772511229, Time on App=12.65565114916675, Time on Website=39.57766801952616, Length of Membership=4.0826206329529615, Yearly Amount Spent=587.9510539684005, features=DenseVector([34.4973, 12.6557, 39.5777, 4.0826]))]

In [12]:
# choose the columns of interest
finalData = output.select('features', 'Yearly Amount Spent')

In [13]:
finalData.show()

+--------------------+-------------------+
|            features|Yearly Amount Spent|
+--------------------+-------------------+
|[34.4972677251122...|  587.9510539684005|
|[31.9262720263601...|  392.2049334443264|
|[33.0009147556426...| 487.54750486747207|
|[34.3055566297555...|  581.8523440352177|
|[33.3306725236463...|  599.4060920457634|
|[33.8710378793419...|   637.102447915074|
|[32.0215955013870...|  521.5721747578274|
|[32.7391429383803...|  549.9041461052942|
|[33.9877728956856...|  570.2004089636196|
|[31.9365486184489...|  427.1993848953282|
|[33.9925727749537...|  492.6060127179966|
|[33.8793608248049...|  522.3374046069357|
|[29.5324289670579...|  408.6403510726275|
|[33.1903340437226...|  573.4158673313865|
|[32.3879758531538...|  470.4527333009554|
|[30.7377203726281...|  461.7807421962299|
|[32.1253868972878...| 457.84769594494855|
|[32.3388993230671...| 407.70454754954415|
|[32.1878120459321...|  452.3156754800354|
|[32.6178560628234...|   605.061038804892|
+----------

In [14]:
# split into 70% train and 30% test
trainData, testData = finalData.randomSplit([0.7, 0.3])

In [15]:
# create the linear regression model
linRegModel = LinearRegression(labelCol = 'Yearly Amount Spent', featuresCol = 'features')

In [16]:
# fit the model using training data
linRegEstimator = linRegModel.fit(trainData)

In [17]:
# evaluate the model using test data
results = linRegEstimator.evaluate(testData)

In [18]:
results.residuals.show()

+--------------------+
|           residuals|
+--------------------+
|     -13.83976993617|
| -1.2909154711247197|
|  -5.014822986371598|
|   4.089553341017449|
| -7.7836026845250785|
| -14.428127435804697|
|  -23.41911636136126|
|   20.61406922397009|
|-0.08078074958712023|
|  2.3523065017881777|
| -4.8302521228127375|
|  -9.273703080478697|
|  3.6496045156816876|
|   6.152141479815043|
| -27.300264756555748|
|  -6.935565156549842|
|   0.645007324280698|
| -4.8695308779318225|
|  -4.048754137118692|
| -10.028761088484941|
+--------------------+
only showing top 20 rows



In [19]:
results.rootMeanSquaredError

10.620212958187311

In [20]:
results.r2

0.9829186100077835

## Including Categorical variables

In [21]:
assembler2 = VectorAssembler(inputCols = ['Avatar', 'Avg Session Length', 'Time on App', 'Time on Website',
                                          'Length of Membership'], outputCol = 'features')

In [22]:
# index the categorical variable to numbers
avatarIndexer = StringIndexer(inputCol = 'Avatar', outputCol = 'AvatarIndexer')

In [23]:
# create one hot encoding for the column
avatarEncoder = OneHotEncoder(inputCol = 'AvatarIndexer', outputCol = 'AvatarDummies')

In [24]:
# create a ML pipeline by specifying the sequence of steps involved
pipeline = Pipeline(stages = [avatarIndexer, avatarEncoder, assembler2])

In [25]:
output2 = pipeline.fit(inputFile)

In [26]:
# split into 70% train and 30% test
trainData2, testData2 = output.randomSplit([0.7, 0.3])

In [27]:
# create the linear regression model
linRegModel2 = LinearRegression(labelCol = 'Yearly Amount Spent', featuresCol = 'features')

In [28]:
# fit the model using training data
linRegEstimator2 = linRegModel2.fit(trainData2)

In [29]:
# evaluate the model using test data
results2 = linRegEstimator2.evaluate(testData2)

In [30]:
results2.r2

0.982448974127849

In [31]:
results2.rootMeanSquaredError

10.115014247297253