In [34]:
from pyspark.sql import SparkSession

spark=SparkSession.builder.appName("Missing").getOrCreate()

In [35]:
# Read the data
df = spark.read.csv(r"C:\Users\user\Desktop\winequality-red.csv",header=True,inferSchema=True,sep=(';'))
df.show()

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|
|          7.8|            0.88|        0.0|           2.6|    0.098|               25.0|                67.0| 0.9968| 3.2|     0.68|    9.8|      5|
|          7.8|            0.76|       0.04|           2.3|    0.092|               15.0|                54.0|  0.997|3.26|     0.65|    9.8|      5|
|         11.2|            0.28|       0.56|           1.9|    0.075|               17.0|           

In [36]:
df.printSchema()

root
 |-- fixed acidity: double (nullable = true)
 |-- volatile acidity: double (nullable = true)
 |-- citric acid: double (nullable = true)
 |-- residual sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free sulfur dioxide: double (nullable = true)
 |-- total sulfur dioxide: double (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: integer (nullable = true)



In [37]:
df.columns

['fixed acidity',
 'volatile acidity',
 'citric acid',
 'residual sugar',
 'chlorides',
 'free sulfur dioxide',
 'total sulfur dioxide',
 'density',
 'pH',
 'sulphates',
 'alcohol',
 'quality']

In [38]:
from pyspark.ml.feature import VectorAssembler
feature_assember = VectorAssembler(inputCols=['fixed acidity',
 'volatile acidity',
 'citric acid',
 'residual sugar',
 'chlorides',
 'free sulfur dioxide',
 'total sulfur dioxide',
 'density',
 'pH',
 'sulphates',
 'alcohol'],outputCol='Independent Features')

In [39]:
output = feature_assember.transform(df)
output.show()

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+--------------------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|Independent Features|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+--------------------+
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|[7.4,0.7,0.0,1.9,...|
|          7.8|            0.88|        0.0|           2.6|    0.098|               25.0|                67.0| 0.9968| 3.2|     0.68|    9.8|      5|[7.8,0.88,0.0,2.6...|
|          7.8|            0.76|       0.04|           2.3|    0.092|               15.0|                54.0|  0.997|3.26|     0.65|    9.8|    

In [40]:
output.columns

['fixed acidity',
 'volatile acidity',
 'citric acid',
 'residual sugar',
 'chlorides',
 'free sulfur dioxide',
 'total sulfur dioxide',
 'density',
 'pH',
 'sulphates',
 'alcohol',
 'quality',
 'Independent Features']

In [41]:
finalized_data= output.select("Independent Features","quality")
finalized_data.show()

+--------------------+-------+
|Independent Features|quality|
+--------------------+-------+
|[7.4,0.7,0.0,1.9,...|      5|
|[7.8,0.88,0.0,2.6...|      5|
|[7.8,0.76,0.04,2....|      5|
|[11.2,0.28,0.56,1...|      6|
|[7.4,0.7,0.0,1.9,...|      5|
|[7.4,0.66,0.0,1.8...|      5|
|[7.9,0.6,0.06,1.6...|      5|
|[7.3,0.65,0.0,1.2...|      7|
|[7.8,0.58,0.02,2....|      7|
|[7.5,0.5,0.36,6.1...|      5|
|[6.7,0.58,0.08,1....|      5|
|[7.5,0.5,0.36,6.1...|      5|
|[5.6,0.615,0.0,1....|      5|
|[7.8,0.61,0.29,1....|      5|
|[8.9,0.62,0.18,3....|      5|
|[8.9,0.62,0.19,3....|      5|
|[8.5,0.28,0.56,1....|      7|
|[8.1,0.56,0.28,1....|      5|
|[7.4,0.59,0.08,4....|      4|
|[7.9,0.32,0.51,1....|      6|
+--------------------+-------+
only showing top 20 rows



In [42]:
from pyspark.ml.regression import LinearRegression


# train test split
train_data,test_data = finalized_data.randomSplit([0.70,0.30])

# train the linearRegression Model
regressor=LinearRegression(featuresCol="Independent Features", labelCol="quality")
regressor=regressor.fit(train_data)

In [43]:
# Coefficients
regressor.coefficients

DenseVector([0.0078, -1.132, -0.1697, 0.0176, -1.3858, 0.0047, -0.0037, -4.6468, -0.6188, 0.7454, 0.3088])

In [44]:
#Intercept
regressor.intercept

9.356956002600418

In [45]:
# prediction
pred_result= regressor.evaluate(test_data)

In [46]:
pred_result.predictions.show()

+--------------------+-------+------------------+
|Independent Features|quality|        prediction|
+--------------------+-------+------------------+
|[4.9,0.42,0.0,2.1...|      7| 6.778705762224322|
|[5.0,1.02,0.04,1....|      4| 4.731455976850267|
|[5.1,0.585,0.0,1....|      7| 6.315562669977012|
|[5.2,0.32,0.25,1....|      5| 5.287699042529093|
|[5.2,0.49,0.26,2....|      6| 5.853413306132402|
|[5.3,0.47,0.11,2....|      7| 6.607291797956927|
|[5.3,0.47,0.11,2....|      7|6.6175837763906316|
|[5.3,0.715,0.19,1...|      5|5.1532389420348865|
|[5.6,0.31,0.37,1....|      5| 5.199407687626371|
|[5.6,0.31,0.78,13...|      6| 5.692888226261616|
|[5.6,0.5,0.09,2.3...|      5| 6.121004278307179|
|[5.6,0.66,0.0,2.5...|      5| 6.207414068424551|
|[5.6,0.85,0.05,1....|      8| 5.899218963386685|
|[5.7,1.13,0.09,1....|      4| 4.455460144878269|
|[5.8,0.29,0.26,1....|      6|6.8109102620527455|
|[5.8,1.01,0.66,2....|      6| 4.985392852893079|
|[5.9,0.61,0.08,2....|      6| 5.802641866556652|


In [47]:
pred_result.meanAbsoluteError,pred_result.meanSquaredError

(0.5207295882602947, 0.44517697909365705)

In [48]:
pred_result.r2

0.3117177784429038