In [1]:
from pyspark.sql import SparkSession
spark= SparkSession.builder.appName("Missing").getOrCreate()

In [4]:
df=spark.read.csv(r"C:\Users\faisal\Downloads\winequality-red.csv",sep=';',header=True,inferSchema=True)
df.show()

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|
|          7.8|            0.88|        0.0|           2.6|    0.098|               25.0|                67.0| 0.9968| 3.2|     0.68|    9.8|      5|
|          7.8|            0.76|       0.04|           2.3|    0.092|               15.0|                54.0|  0.997|3.26|     0.65|    9.8|      5|
|         11.2|            0.28|       0.56|           1.9|    0.075|               17.0|           

In [5]:
df.printSchema()

root
 |-- fixed acidity: double (nullable = true)
 |-- volatile acidity: double (nullable = true)
 |-- citric acid: double (nullable = true)
 |-- residual sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free sulfur dioxide: double (nullable = true)
 |-- total sulfur dioxide: double (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: integer (nullable = true)



In [6]:
df.columns

['fixed acidity',
 'volatile acidity',
 'citric acid',
 'residual sugar',
 'chlorides',
 'free sulfur dioxide',
 'total sulfur dioxide',
 'density',
 'pH',
 'sulphates',
 'alcohol',
 'quality']

In [10]:
from pyspark.ml.feature import VectorAssembler
feature_assembler=VectorAssembler(inputCols=['fixed acidity','volatile acidity','citric acid','residual sugar', 'chlorides', 'free sulfur dioxide','total sulfur dioxide','density', 'pH','sulphates', 'alcohol'],outputCol='Independent Feature')

In [11]:
output=feature_assembler.transform(df)
output.show()

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+--------------------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality| Independent Feature|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+--------------------+
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|[7.4,0.7,0.0,1.9,...|
|          7.8|            0.88|        0.0|           2.6|    0.098|               25.0|                67.0| 0.9968| 3.2|     0.68|    9.8|      5|[7.8,0.88,0.0,2.6...|
|          7.8|            0.76|       0.04|           2.3|    0.092|               15.0|                54.0|  0.997|3.26|     0.65|    9.8|    

In [12]:
output.columns

['fixed acidity',
 'volatile acidity',
 'citric acid',
 'residual sugar',
 'chlorides',
 'free sulfur dioxide',
 'total sulfur dioxide',
 'density',
 'pH',
 'sulphates',
 'alcohol',
 'quality',
 'Independent Feature']

In [13]:
finalised_data=output.select('Independent Feature','quality')
finalised_data.show()

+--------------------+-------+
| Independent Feature|quality|
+--------------------+-------+
|[7.4,0.7,0.0,1.9,...|      5|
|[7.8,0.88,0.0,2.6...|      5|
|[7.8,0.76,0.04,2....|      5|
|[11.2,0.28,0.56,1...|      6|
|[7.4,0.7,0.0,1.9,...|      5|
|[7.4,0.66,0.0,1.8...|      5|
|[7.9,0.6,0.06,1.6...|      5|
|[7.3,0.65,0.0,1.2...|      7|
|[7.8,0.58,0.02,2....|      7|
|[7.5,0.5,0.36,6.1...|      5|
|[6.7,0.58,0.08,1....|      5|
|[7.5,0.5,0.36,6.1...|      5|
|[5.6,0.615,0.0,1....|      5|
|[7.8,0.61,0.29,1....|      5|
|[8.9,0.62,0.18,3....|      5|
|[8.9,0.62,0.19,3....|      5|
|[8.5,0.28,0.56,1....|      7|
|[8.1,0.56,0.28,1....|      5|
|[7.4,0.59,0.08,4....|      4|
|[7.9,0.32,0.51,1....|      6|
+--------------------+-------+
only showing top 20 rows



In [23]:
from pyspark.ml.regression import LinearRegression
#train test split
train_data,test_data=finalised_data.randomSplit([0.75,0.25])
#train the linear regresion model
regressor=LinearRegression(featuresCol='Independent Feature',labelCol='quality')
regressor=regressor.fit(train_data)

Exception ignored in: <function JavaWrapper.__del__ at 0x000002AB2E9EAE50>
Traceback (most recent call last):
  File "C:\Users\faisal\anaconda3\lib\site-packages\pyspark\ml\wrapper.py", line 53, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'VectorAssembler' object has no attribute '_java_obj'


In [24]:
#coefficient
regressor.coefficients

DenseVector([0.0526, -1.047, -0.2824, 0.0313, -1.6481, 0.0049, -0.0033, -36.6331, -0.2282, 0.921, 0.2898])

In [25]:
#intercept
regressor.intercept

39.627636611776694

In [26]:
#prediction
pred_results=regressor.evaluate(test_data)

In [27]:
pred_results.predictions.show()

+--------------------+-------+------------------+
| Independent Feature|quality|        prediction|
+--------------------+-------+------------------+
|[4.6,0.52,0.15,2....|      4| 6.112509029556158|
|[5.0,0.38,0.01,1....|      6| 6.993614331374559|
|[5.0,0.4,0.5,4.3,...|      6| 6.742770506735923|
|[5.0,0.74,0.0,1.2...|      6| 5.899884059404172|
|[5.0,1.02,0.04,1....|      4| 4.920874295849337|
|[5.2,0.34,0.0,1.8...|      6| 7.060250425434212|
|[5.2,0.34,0.0,1.8...|      6| 7.060250425434212|
|[5.4,0.74,0.0,1.2...|      6|5.9209246751230395|
|[5.6,0.5,0.09,2.3...|      5| 6.234172805770555|
|[5.6,0.54,0.04,1....|      5| 5.867245870714051|
|[5.6,0.615,0.0,1....|      5| 5.169627522344079|
|[5.6,0.66,0.0,2.5...|      5|6.2934365931182725|
|[5.6,0.85,0.05,1....|      8| 6.079268097077637|
|[5.6,0.915,0.0,2....|      5| 5.527852819036859|
|[6.0,0.42,0.19,2....|      6| 5.724177569134952|
|[6.0,0.49,0.0,2.3...|      6| 6.328352606048107|
|[6.0,0.5,0.0,1.4,...|      5| 5.325928314578945|


In [28]:
pred_results.meanAbsoluteError,pred_results.meanSquaredError

(0.520686123433572, 0.4726005606402937)

In [29]:
pred_results.r2

0.2874639874394723