In [1]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

sc = SparkContext('local')
spark = SparkSession(sc)

wine_df = spark.read.csv('wine.csv', header = True, inferSchema = True)

In [2]:
wine_df.take(1)

[Row(fixed acidity=7.4, volatile acidity=0.7, citric acid=0.0, residual sugar=1.9, chlorides=0.076, free sulfur dioxide=11.0, total sulfur dioxide=34.0, density=0.9978, pH=3.51, sulphates=0.56, alcohol=9.4, quality=5)]

In [3]:
wine_df.select("alcohol", "quality").show()

+-------+-------+
|alcohol|quality|
+-------+-------+
|    9.4|      5|
|    9.8|      5|
|    9.8|      5|
|    9.8|      6|
|    9.4|      5|
|    9.4|      5|
|    9.4|      5|
|   10.0|      7|
|    9.5|      7|
|   10.5|      5|
|    9.2|      5|
|   10.5|      5|
|    9.9|      5|
|    9.1|      5|
|    9.2|      5|
|    9.2|      5|
|   10.5|      7|
|    9.3|      5|
|    9.0|      4|
|    9.2|      6|
+-------+-------+
only showing top 20 rows



In [4]:
import six
for i in wine_df.columns:
    if not( isinstance(wine_df.select(i).take(1)[0][0], six.string_types)):
        print( "Correlation to MPG for ", i, wine_df.stat.corr('quality',i))

Correlation to MPG for  fixed acidity 0.12405164911322263
Correlation to MPG for  volatile acidity -0.3905577802640061
Correlation to MPG for  citric acid 0.22637251431804048
Correlation to MPG for  residual sugar 0.013731637340065798
Correlation to MPG for  chlorides -0.1289065599300529
Correlation to MPG for  free sulfur dioxide -0.05065605724427597
Correlation to MPG for  total sulfur dioxide -0.18510028892653774
Correlation to MPG for  density -0.17491922778336602
Correlation to MPG for  pH -0.0577313912053826
Correlation to MPG for  sulphates 0.25139707906925995
Correlation to MPG for  alcohol 0.4761663240011364
Correlation to MPG for  quality 1.0


In [5]:
wine_df = wine_df.drop("residual sugar").drop("free sulfur dioxide") \
                       .drop("pH").drop("density") \
                       .drop("chlorides").drop('fixed acidity')
print(wine_df.columns)

['volatile acidity', 'citric acid', 'total sulfur dioxide', 'sulphates', 'alcohol', 'quality']


In [6]:
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(inputCols = wine_df.columns, outputCol = 'features')
vwine_df = vectorAssembler.transform(wine_df)
vwine_df.take(1)

[Row(volatile acidity=0.7, citric acid=0.0, total sulfur dioxide=34.0, sulphates=0.56, alcohol=9.4, quality=5, features=DenseVector([0.7, 0.0, 34.0, 0.56, 9.4, 5.0]))]

In [7]:
splits = vwine_df.randomSplit([0.9, 0.1])
train_df = splits[0]
test_df = splits[1]

In [8]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol = 'features', labelCol='quality', maxIter=10)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [7.750056776406814e-15,-4.743761750938962e-16,1.5431731824448383e-17,-3.7061337257273726e-15,-1.86952088869007e-15,1.0000000000000064]
Intercept: -1.8839733359542293e-14


In [9]:
import numpy as np
predictions = lr_model.transform(test_df)

predictions.select("prediction","quality","features").show()


+------------------+-------+--------------------+
|        prediction|quality|            features|
+------------------+-------+--------------------+
| 7.000000000000003|      7|[0.12,0.45,21.0,0...|
|5.9999999999999964|      6|[0.16,0.44,31.0,0...|
| 5.999999999999998|      6|[0.18,0.37,109.0,...|
| 7.000000000000006|      7|[0.25,0.29,49.0,0...|
| 6.000000000000002|      6|[0.26,0.42,27.0,0...|
| 6.000000000000002|      6|[0.26,0.44,19.0,0...|
| 4.999999999999995|      5|[0.27,0.41,16.0,0...|
| 7.000000000000007|      7|[0.28,0.47,32.0,0...|
| 5.999999999999999|      6|[0.28,0.48,46.0,1...|
|6.0000000000000036|      6|[0.29,0.25,160.0,...|
| 6.000000000000003|      6|[0.29,0.49,60.0,0...|
| 6.000000000000001|      6|[0.3,0.48,61.0,0....|
| 8.000000000000009|      8|[0.3,0.56,17.0,0....|
| 7.000000000000008|      7|[0.3,0.68,278.0,0...|
|7.0000000000000036|      7|[0.31,0.34,35.0,0...|
| 7.000000000000005|      7|[0.31,0.4,10.0,0....|
| 7.000000000000003|      7|[0.31,0.45,16.0,0...|


In [10]:
#Find R2 for Linear Regression
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="quality",metricName="r2")
evaluator.evaluate(predictions)

1.0

In [11]:
sc.stop()