In [1]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64/"
os.environ["SPARK_HOME"] = "/opt/spark-2.4.0"
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()


In [7]:

red_df = spark.read.option("delimiter", ";").csv("winequality-red.csv", header=True, inferSchema=True)
white_df = spark.read.option("delimiter", ";").csv("winequality-white.csv", header=True, inferSchema=True)


In [6]:
red_df.printSchema()

root
 |-- fixed acidity: double (nullable = true)
 |-- volatile acidity: double (nullable = true)
 |-- citric acid: double (nullable = true)
 |-- residual sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free sulfur dioxide: double (nullable = true)
 |-- total sulfur dioxide: double (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: integer (nullable = true)



In [8]:
white_df.printSchema()

root
 |-- fixed acidity: double (nullable = true)
 |-- volatile acidity: double (nullable = true)
 |-- citric acid: double (nullable = true)
 |-- residual sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free sulfur dioxide: double (nullable = true)
 |-- total sulfur dioxide: double (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: integer (nullable = true)



In [9]:
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler

In [38]:
# convert to vector column first
vector_col = "corr_features"
assembler = VectorAssembler(inputCols=red_df.columns, outputCol=vector_col)
df_vector = assembler.transform(red_df).select(vector_col)
matrix = Correlation.corr(df_vector, vector_col)


In [39]:
matrix.collect()[0]["pearson({})".format(vector_col)].values

array([ 1.        , -0.25613089,  0.67170343,  0.11477672,  0.09370519,
       -0.15379419, -0.11318144,  0.66804729, -0.68297819,  0.18300566,
       -0.06166827,  0.12405165, -0.25613089,  1.        , -0.55249568,
        0.00191788,  0.06129777, -0.01050383,  0.07647   ,  0.02202623,
        0.23493729, -0.26098669, -0.20228803, -0.39055778,  0.67170343,
       -0.55249568,  1.        ,  0.14357716,  0.20382291, -0.06097813,
        0.03553302,  0.36494718, -0.54190414,  0.31277004,  0.10990325,
        0.22637251,  0.11477672,  0.00191788,  0.14357716,  1.        ,
        0.05560954,  0.187049  ,  0.20302788,  0.35528337, -0.08565242,
        0.00552712,  0.04207544,  0.01373164,  0.09370519,  0.06129777,
        0.20382291,  0.05560954,  1.        ,  0.00556215,  0.04740047,
        0.20063233, -0.26502613,  0.37126048, -0.22114054, -0.12890656,
       -0.15379419, -0.01050383, -0.06097813,  0.187049  ,  0.00556215,
        1.        ,  0.66766645, -0.02194583,  0.0703775 ,  0.05

In [16]:
red_df.show()

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|
|          7.8|            0.88|        0.0|           2.6|    0.098|               25.0|                67.0| 0.9968| 3.2|     0.68|    9.8|      5|
|          7.8|            0.76|       0.04|           2.3|    0.092|               15.0|                54.0|  0.997|3.26|     0.65|    9.8|      5|
|         11.2|            0.28|       0.56|           1.9|    0.075|               17.0|           

In [23]:
red_df.corr('volatile acidity','quality')


-0.390557780264006

In [31]:
from pyspark.ml.feature import VectorAssembler
train_cols = red_df.columns[:-1]
vectorAssembler = VectorAssembler(inputCols = train_cols, outputCol = 'features')
v_df = vectorAssembler.transform(red_df)
v_df = v_df.select(['features', 'quality'])
(train_df, test_df) = v_df.randomSplit([0.8,0.2])

In [32]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol='features', labelCol='quality')
lr_model = lr.fit(train_df)
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("MSE: %f" % trainingSummary.meanSquaredError)
print("MAE: %f" % trainingSummary.meanAbsoluteError)

RMSE: 0.656590
MSE: 0.431111
MAE: 0.508160


In [33]:
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","quality","features").show(5)

+-----------------+-------+--------------------+
|       prediction|quality|            features|
+-----------------+-------+--------------------+
|6.644970205799705|      6|[5.0,0.4,0.5,4.3,...|
|6.705087821972654|      8|[5.0,0.42,0.24,2....|
|6.392341485265039|      7|[5.1,0.585,0.0,1....|
|5.153878270839083|      5|[5.3,0.715,0.19,1...|
| 6.00795802589764|      7|[5.4,0.835,0.08,1...|
+-----------------+-------+--------------------+
only showing top 5 rows



In [34]:
from pyspark.ml.evaluation import RegressionEvaluator

lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="quality", metricName="mae")

print("MSE on test data = %g" % lr_evaluator.evaluate(lr_predictions))

MSE on test data = 0.469951


In [None]:
#Дальше по белому вину

In [40]:
# convert to vector column first
vector_col = "corr_features"
assembler = VectorAssembler(inputCols=white_df.columns, outputCol=vector_col)
df_vector = assembler.transform(white_df).select(vector_col)
matrix = Correlation.corr(df_vector, vector_col)
matrix.collect()[0]["pearson({})".format(vector_col)].values

array([ 1.00000000e+00, -2.26972901e-02,  2.89180698e-01,  8.90207014e-02,
        2.30856437e-02, -4.93958591e-02,  9.10697562e-02,  2.65331014e-01,
       -4.25858291e-01, -1.71429850e-02, -1.20881123e-01, -1.13662831e-01,
       -2.26972901e-02,  1.00000000e+00, -1.49471811e-01,  6.42860601e-02,
        7.05115715e-02, -9.70119393e-02,  8.92605036e-02,  2.71138455e-02,
       -3.19153683e-02, -3.57281469e-02,  6.77179428e-02, -1.94722969e-01,
        2.89180698e-01, -1.49471811e-01,  1.00000000e+00,  9.42116243e-02,
        1.14364448e-01,  9.40772210e-02,  1.21130798e-01,  1.49502571e-01,
       -1.63748211e-01,  6.23309403e-02, -7.57287301e-02, -9.20909088e-03,
        8.90207014e-02,  6.42860601e-02,  9.42116243e-02,  1.00000000e+00,
        8.86845359e-02,  2.99098354e-01,  4.01439311e-01,  8.38966455e-01,
       -1.94133454e-01, -2.66643659e-02, -4.50631222e-01, -9.75768289e-02,
        2.30856437e-02,  7.05115715e-02,  1.14364448e-01,  8.86845359e-02,
        1.00000000e+00,  

In [43]:
white_df.corr('alcohol','quality')

0.4355747154613733

In [45]:
train_cols_w = white_df.columns[:-1]
vectorAssembler = VectorAssembler(inputCols = train_cols_w, outputCol = 'features')
v_df = vectorAssembler.transform(white_df)
v_df = v_df.select(['features', 'quality'])
(train_df, test_df) = v_df.randomSplit([0.8,0.2])
lr = LinearRegression(featuresCol='features', labelCol='quality')
lr_model = lr.fit(train_df)
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("MSE: %f" % trainingSummary.meanSquaredError)
print("MAE: %f" % trainingSummary.meanAbsoluteError)

RMSE: 0.746185
MSE: 0.556792
MAE: 0.578383


In [48]:
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","quality","features").show(5)
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="quality", metricName="mae")

print("MSE on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+------------------+-------+--------------------+
|        prediction|quality|            features|
+------------------+-------+--------------------+
| 6.768540710089667|      7|[4.8,0.13,0.32,1....|
| 5.798816718853985|      5|[4.9,0.335,0.14,1...|
| 5.451656597106904|      5|[4.9,0.345,0.34,1...|
|6.8487935878707304|      7|[5.0,0.24,0.34,1....|
| 6.806183502218474|      7|[5.0,0.27,0.32,4....|
+------------------+-------+--------------------+
only showing top 5 rows

MSE on test data = 0.604597


In [None]:
# Модель отработала с MSE для белого вина 0.6 и для красного 0.47 
# Далее посмотрим на датасеты

In [49]:
red_df.show(5)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|
|          7.8|            0.88|        0.0|           2.6|    0.098|               25.0|                67.0| 0.9968| 3.2|     0.68|    9.8|      5|
|          7.8|            0.76|       0.04|           2.3|    0.092|               15.0|                54.0|  0.997|3.26|     0.65|    9.8|      5|
|         11.2|            0.28|       0.56|           1.9|    0.075|               17.0|           

In [79]:
r_min = red_df.agg({
    'fixed acidity': 'min',
'volatile acidity': 'min',
'citric acid': 'min',
'residual sugar': 'min',
'chlorides': 'min',
'free sulfur dioxide': 'min',
'total sulfur dioxide': 'min',
'density': 'min',
'pH': 'min',
'sulphates': 'min',
'alcohol': 'min',
'quality': 'min'} )



In [80]:
r_max = red_df.agg({
    'fixed acidity': 'max',
'volatile acidity': 'max',
'citric acid': 'max',
'residual sugar': 'max',
'chlorides': 'max',
'free sulfur dioxide': 'max',
'total sulfur dioxide': 'max',
'density': 'max',
'pH': 'max',
'sulphates': 'max',
'alcohol': 'max',
'quality': 'max'} )
r_min.union(r_max).show()


+------------+------------------------+------------------+-------------------------+-------------------+---------------------+--------------+--------------+----------------+------------+-------+------------+
|min(quality)|min(free sulfur dioxide)|min(fixed acidity)|min(total sulfur dioxide)|min(residual sugar)|min(volatile acidity)|min(sulphates)|min(chlorides)|min(citric acid)|min(density)|min(pH)|min(alcohol)|
+------------+------------------------+------------------+-------------------------+-------------------+---------------------+--------------+--------------+----------------+------------+-------+------------+
|           3|                     1.0|               4.6|                      6.0|                0.9|                 0.12|          0.33|         0.012|             0.0|     0.99007|   2.74|         8.4|
|           8|                    72.0|              15.9|                    289.0|               15.5|                 1.58|           2.0|         0.611|            

In [81]:
w_min = white_df.agg({
    'fixed acidity': 'min',
'volatile acidity': 'min',
'citric acid': 'min',
'residual sugar': 'min',
'chlorides': 'min',
'free sulfur dioxide': 'min',
'total sulfur dioxide': 'min',
'density': 'min',
'pH': 'min',
'sulphates': 'min',
'alcohol': 'min',
'quality': 'min'} )

In [82]:
w_max = white_df.agg({
    'fixed acidity': 'max',
'volatile acidity': 'max',
'citric acid': 'max',
'residual sugar': 'max',
'chlorides': 'max',
'free sulfur dioxide': 'max',
'total sulfur dioxide': 'max',
'density': 'max',
'pH': 'max',
'sulphates': 'max',
'alcohol': 'max',
'quality': 'max'} )

In [83]:
r_min.union(r_max).union(w_min).union(w_max).show()

+------------+------------------------+------------------+-------------------------+-------------------+---------------------+--------------+--------------+----------------+------------+-------+------------+
|min(quality)|min(free sulfur dioxide)|min(fixed acidity)|min(total sulfur dioxide)|min(residual sugar)|min(volatile acidity)|min(sulphates)|min(chlorides)|min(citric acid)|min(density)|min(pH)|min(alcohol)|
+------------+------------------------+------------------+-------------------------+-------------------+---------------------+--------------+--------------+----------------+------------+-------+------------+
|           3|                     1.0|               4.6|                      6.0|                0.9|                 0.12|          0.33|         0.012|             0.0|     0.99007|   2.74|         8.4|
|           8|                    72.0|              15.9|                    289.0|               15.5|                 1.58|           2.0|         0.611|            

In [88]:
w_mean = white_df.agg({
    'fixed acidity': 'mean',
'volatile acidity': 'mean',
'citric acid': 'mean',
'residual sugar': 'mean',
'chlorides': 'mean',
'free sulfur dioxide': 'mean',
'total sulfur dioxide': 'mean',
'density': 'mean',
'pH': 'mean',
'sulphates': 'mean',
'alcohol': 'mean',
'quality': 'mean'} )
r_mean = red_df.agg({
    'fixed acidity': 'mean',
'volatile acidity': 'mean',
'citric acid': 'mean',
'residual sugar': 'mean',
'chlorides': 'mean',
'free sulfur dioxide': 'mean',
'total sulfur dioxide': 'mean',
'density': 'mean',
'pH': 'mean',
'sulphates': 'mean',
'alcohol': 'mean',
'quality': 'mean'} )

r_mean.union(w_mean).show()

+------------------+------------------------+------------------+-------------------------+-------------------+---------------------+------------------+-------------------+-------------------+------------------+------------------+------------------+
|      avg(quality)|avg(free sulfur dioxide)|avg(fixed acidity)|avg(total sulfur dioxide)|avg(residual sugar)|avg(volatile acidity)|    avg(sulphates)|     avg(chlorides)|   avg(citric acid)|      avg(density)|           avg(pH)|      avg(alcohol)|
+------------------+------------------------+------------------+-------------------------+-------------------+---------------------+------------------+-------------------+-------------------+------------------+------------------+------------------+
|5.6360225140712945|      15.874921826141339| 8.319637273295838|        46.46779237023139| 2.5388055034396517|   0.5278205128205131|0.6581488430268921|0.08746654158849257| 0.2709756097560964|0.9967466791744831| 3.311113195747343|10.422983114446502|
|  5

In [90]:
from pyspark.sql import functions as F

In [108]:
red_df.agg(F.min(red_df['fixed acidity']), F.max(red_df['fixed acidity'])).show()

+------------------+------------------+
|min(fixed acidity)|max(fixed acidity)|
+------------------+------------------+
|               4.6|              15.9|
+------------------+------------------+



In [111]:
red_df.approxQuantile('total sulfur dioxide', [0.5], 0.25)

[22.0]

In [110]:
white_df.approxQuantile('total sulfur dioxide', [0.5], 0.25)

[108.0]