In [1]:
# Section must be included at the beginning of each new notebook. Remember to change the app name.
# If you're using VirtualBox, change the below to '/home/user/spark-2.1.1-bin-hadoop2.7'
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('linear_regression_adv').getOrCreate()

# If you're getting an error with numpy, please type 'sudo pip3 install numpy --user' into the console.
# If you're getting an error with another package, type 'sudo pip3 install PACKAGENAME --user'. 
# Replace PACKAGENAME with the relevant package (such as pandas, etc).
from pyspark.ml.regression import LinearRegression

# First, let's import the data. Note that we can infer the schema as it's a CSV file.
df1 = spark.read.csv("winequality-white.csv",inferSchema=True,header=True)

In [2]:
# Now that we understand the data's features, let's use a Python package to neatly describe the data.
import pandas as pd
df1.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
fixed acidity,4898,6.854787668436075,0.8438682276875127,3.8,14.2
volatile acidity,4898,0.27824111882401087,0.10079454842486532,0.08,1.1
citric acid,4898,0.33419150673743736,0.12101980420298254,0.0,1.66
residual sugar,4898,6.391414863209486,5.072057784014878,0.6,65.8
chlorides,4898,0.0457723560636995,0.021847968093728805,0.009,0.346
free sulfur dioxide,4898,35.30808493262556,17.00713732523259,2.0,289.0
total sulfur dioxide,4898,138.36065741118824,42.498064554142985,9.0,440.0
density,4898,0.9940273764801896,0.002990906916936997,0.98711,1.03898
pH,4898,3.1882666394446693,0.15100059961506673,2.72,3.82


In [3]:
# Import VectorAssembler and Vectors
from pyspark.ml.feature import VectorAssembler

# The input columns are the feature column names, and the output column is what you'd like the new column to be named. 
vector_assembler = VectorAssembler(inputCols = ['alcohol'], outputCol = 'feature')

# Now that we've created the assembler variable, let's actually transform the data.
vector_output = vector_assembler.transform(df1)

# Using print schema, you see that the features output column has been added. 
vector_output.printSchema()

# You can see that the features column is a DenseVector that combines the various features as expected.
vector_output.head(1)

root
 |-- fixed acidity: double (nullable = true)
 |-- volatile acidity: double (nullable = true)
 |-- citric acid: double (nullable = true)
 |-- residual sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free sulfur dioxide: double (nullable = true)
 |-- total sulfur dioxide: double (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: integer (nullable = true)
 |-- feature: vector (nullable = true)



[Row(fixed acidity=7.0, volatile acidity=0.27, citric acid=0.36, residual sugar=20.7, chlorides=0.045, free sulfur dioxide=45.0, total sulfur dioxide=170.0, density=1.001, pH=3.0, sulphates=0.45, alcohol=8.8, quality=6, feature=DenseVector([8.8]))]

In [4]:
# Because the features have been combined into one vector, we no longer need them. Below we select the features and label.
vector_output = vector_output.select(['feature', 'quality'])

# You can see that the dataframe now only contains two columns. 
print(vector_output.head(1))
vector_output.show(300)

[Row(feature=DenseVector([8.8]), quality=6)]
+-------+-------+
|feature|quality|
+-------+-------+
|  [8.8]|      6|
|  [9.5]|      6|
| [10.1]|      6|
|  [9.9]|      6|
|  [9.9]|      6|
| [10.1]|      6|
|  [9.6]|      6|
|  [8.8]|      6|
|  [9.5]|      6|
| [11.0]|      6|
| [12.0]|      5|
|  [9.7]|      5|
| [10.8]|      5|
| [12.4]|      7|
|  [9.7]|      5|
| [11.4]|      7|
|  [9.6]|      6|
| [12.8]|      8|
| [11.3]|      6|
|  [9.5]|      5|
| [12.8]|      8|
| [11.0]|      7|
| [10.5]|      8|
|  [9.3]|      5|
| [10.0]|      6|
| [10.4]|      6|
| [10.0]|      6|
| [10.5]|      6|
| [11.6]|      6|
| [12.3]|      7|
| [10.0]|      6|
| [10.2]|      6|
| [10.8]|      6|
|  [9.0]|      6|
| [10.2]|      5|
| [12.8]|      5|
| [10.0]|      5|
| [11.2]|      6|
|  [8.6]|      5|
|  [8.6]|      5|
|  [9.4]|      6|
|  [9.4]|      6|
|  [9.8]|      6|
|  [9.5]|      6|
|  [9.5]|      6|
| [10.0]|      7|
|  [9.8]|      4|
|  [9.8]|      5|
|  [9.8]|      6|
|  [9.8]|      5|
|

In [5]:
# Let's do a randomised 70/30 split. Remember, you should explain why you chose a particular split. 
train_data,test_data = vector_output.randomSplit([0.7,0.3])

# Let's see our training data.
train_data.describe().show()

# And our testing data.
test_data.describe().show()

+-------+------------------+
|summary|           quality|
+-------+------------------+
|  count|              3430|
|   mean| 5.882507288629737|
| stddev|0.8856541817407149|
|    min|                 3|
|    max|                 9|
+-------+------------------+

+-------+------------------+
|summary|           quality|
+-------+------------------+
|  count|              1468|
|   mean|  5.86716621253406|
| stddev|0.8858108218822441|
|    min|                 3|
|    max|                 8|
+-------+------------------+



# Linear Regression

In [7]:
# Importing the LR package.
from pyspark.ml.regression import LinearRegression

# Instantiate the instance.
lr = LinearRegression(featuresCol='feature', labelCol='quality')

In [8]:
# Fit the training data.
lr_model = lr.fit(train_data)

In [9]:
# Print the coefficients.
print("Coefficients: " + str(lr_model.coefficients))

# Print the intercept.
print("Intercept: " + str(lr_model.intercept) + "\n")

# Summarise the model and print out some evaluation metrics.
training_summary = lr_model.summary

# Print RMSE. 
print("RMSE: " + str(training_summary.rootMeanSquaredError))

# Print R2.
print("R2: " + str(training_summary.r2))

Coefficients: [0.30337216780423226]
Intercept: 2.6915232573177166

RMSE: 0.8022175026260114
R2: 0.17930356666971858
