<a href="https://colab.research.google.com/github/Tanuhlik/BigData/blob/master/Red_wine_spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.3/spark-2.4.3-bin-hadoop2.7.tgz
!tar -xvf spark-2.4.3-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.3-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
df = spark.read.csv("winequality-red.csv", header=True, inferSchema=True)

In [6]:
df.printSchema()

root
 |-- fixed acidity: double (nullable = true)
 |-- volatile acidity: double (nullable = true)
 |-- citric acid: double (nullable = true)
 |-- residual sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free sulfur dioxide: double (nullable = true)
 |-- total sulfur dioxide: double (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: integer (nullable = true)



In [7]:
df.show(3)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|
|          7.8|            0.88|        0.0|           2.6|    0.098|               25.0|                67.0| 0.9968| 3.2|     0.68|    9.8|      5|
|          7.8|            0.76|       0.04|           2.3|    0.092|               15.0|                54.0|  0.997|3.26|     0.65|    9.8|      5|
+-------------+----------------+-----------+--------------+---------+-------------------+-----------

In [0]:
df = df.drop('citric acid')
df = df.drop('chlorides')
df = df.drop('free sulfur dioxide')
df = df.drop('total sulfur dioxide')
df = df.drop('sulphates')

In [9]:
df.printSchema()

root
 |-- fixed acidity: double (nullable = true)
 |-- volatile acidity: double (nullable = true)
 |-- residual sugar: double (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: integer (nullable = true)



In [0]:
df = df.dropna()

In [0]:
from pyspark.ml.feature import VectorAssembler

In [0]:
train_cols = df.columns[0:6]

In [13]:
train_cols

['fixed acidity',
 'volatile acidity',
 'residual sugar',
 'density',
 'pH',
 'alcohol']

In [0]:
vector = VectorAssembler(inputCols = train_cols, outputCol = 'features')

In [0]:
v_df = vector.transform(df)

In [0]:
v_df = v_df.select(['features', 'quality'])

In [17]:
v_df.show(3)

+--------------------+-------+
|            features|quality|
+--------------------+-------+
|[7.4,0.7,1.9,0.99...|      5|
|[7.8,0.88,2.6,0.9...|      5|
|[7.8,0.76,2.3,0.9...|      5|
+--------------------+-------+
only showing top 3 rows



In [0]:
(train_df, test_df) = v_df.randomSplit([0.8,0.2])

In [0]:
from pyspark.ml.regression import LinearRegression

In [0]:
lr = LinearRegression(featuresCol='features', labelCol='quality')

In [0]:
lr_model = lr.fit(train_df)

In [0]:
trainingSummary = lr_model.summary

In [23]:
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)

RMSE: 0.663994


In [24]:
print("MSE: %f" % trainingSummary.meanSquaredError)

MSE: 0.440888


In [25]:
print("MAE: %f" % trainingSummary.meanAbsoluteError)

MAE: 0.523457


In [26]:
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","quality","features").show(5)

+-----------------+-------+--------------------+
|       prediction|quality|            features|
+-----------------+-------+--------------------+
|5.170668209868457|      5|[5.0,1.04,1.6,0.9...|
|6.340210183596822|      7|[5.1,0.51,2.1,0.9...|
|6.606768869797235|      7|[5.3,0.47,2.2,0.9...|
|5.757143239918497|      5|[5.6,0.54,1.7,0.9...|
|5.320313202998745|      5|[5.6,0.915,2.1,0....|
+-----------------+-------+--------------------+
only showing top 5 rows



In [0]:
from pyspark.ml.evaluation import RegressionEvaluator

In [0]:
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="quality", metricName="rmse")

In [29]:
print("MSE on test data = %g" % lr_evaluator.evaluate(lr_predictions))

MSE on test data = 0.664406
