# PREDICȚIA TIPURILOR DE VIN SI CALITATII ACESTORA ÎN FUCȚIE DE SPECIFICAȚII

STUDENT: Valeria - Gabriela Spînu

GRUPA: 405

In [1]:
#Instalarea pachetului pyspark
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
#Conectarea la Google drive
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
#Importul pachetelor necesare rularii
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
import pyspark.sql.types as tp

In [4]:
#Crearea unui obiect SparkSession
spark = SparkSession.builder.appName("Wine prediction").getOrCreate()

## **Pregatirea si curatarea datelor**

In [5]:
#Citirea si afisarea datelor utilizand parametru de inferSchema
df_inferSchema = spark.read.csv("/content/drive/MyDrive/BigData/proiect/winequalityN.csv",inferSchema=True, header=True)
df_inferSchema.show()

+-----+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
| type|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-----+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|white|          7.0|            0.27|       0.36|          20.7|    0.045|               45.0|               170.0|  1.001| 3.0|     0.45|    8.8|      6|
|white|          6.3|             0.3|       0.34|           1.6|    0.049|               14.0|               132.0|  0.994| 3.3|     0.49|    9.5|      6|
|white|          8.1|            0.28|        0.4|           6.9|     0.05|               30.0|                97.0| 0.9951|3.26|     0.44|   10.1|      6|
|white|          7.2|            0.23|       0.32|           8.5

In [6]:
#Afisarea schemei
df_inferSchema.printSchema()

root
 |-- type: string (nullable = true)
 |-- fixed acidity: double (nullable = true)
 |-- volatile acidity: double (nullable = true)
 |-- citric acid: double (nullable = true)
 |-- residual sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free sulfur dioxide: double (nullable = true)
 |-- total sulfur dioxide: double (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: integer (nullable = true)



In [7]:
#Crearea schemei
schema = tp.StructType([
    tp.StructField(name = 'type', dataType= tp.StringType(), nullable=True),
    tp.StructField(name = 'fixed_acidity', dataType= tp.FloatType(), nullable=True),
    tp.StructField(name = 'volatile_acidity', dataType= tp.FloatType(), nullable=True),
    tp.StructField(name = 'citric_acid', dataType= tp.FloatType(), nullable=True),
    tp.StructField(name = 'residual_sugar', dataType= tp.FloatType(), nullable=True),
    tp.StructField(name = 'chlorides', dataType= tp.FloatType(), nullable=True),
    tp.StructField(name = 'free_sulfur_dioxide', dataType= tp.FloatType(), nullable=True),
    tp.StructField(name = 'total_sulfur_dioxide', dataType= tp.FloatType(), nullable=True),
    tp.StructField(name = 'density', dataType= tp.FloatType(), nullable=True),
    tp.StructField(name = 'pH', dataType= tp.FloatType(), nullable=True),
    tp.StructField(name = 'sulphates', dataType= tp.FloatType(), nullable=True),
    tp.StructField(name = 'alcohol', dataType= tp.FloatType(), nullable=True),
    tp.StructField(name = 'quality', dataType= tp.IntegerType(), nullable=True),

])

In [8]:
#Citirea datelor folosind noua schema creata mai sus
df = spark.read.csv("/content/drive/MyDrive/BigData/proiect/winequalityN.csv",schema=schema, header=True)
df.printSchema()

root
 |-- type: string (nullable = true)
 |-- fixed_acidity: float (nullable = true)
 |-- volatile_acidity: float (nullable = true)
 |-- citric_acid: float (nullable = true)
 |-- residual_sugar: float (nullable = true)
 |-- chlorides: float (nullable = true)
 |-- free_sulfur_dioxide: float (nullable = true)
 |-- total_sulfur_dioxide: float (nullable = true)
 |-- density: float (nullable = true)
 |-- pH: float (nullable = true)
 |-- sulphates: float (nullable = true)
 |-- alcohol: float (nullable = true)
 |-- quality: integer (nullable = true)



In [9]:
#Numararea valorilor null din fiecare coloana
df_null = df.agg(*[f.count(f.when(f.isnull(c),c)).alias(c) for c in df.columns])
df_null.show()

+----+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+---+---------+-------+-------+
|type|fixed_acidity|volatile_acidity|citric_acid|residual_sugar|chlorides|free_sulfur_dioxide|total_sulfur_dioxide|density| pH|sulphates|alcohol|quality|
+----+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+---+---------+-------+-------+
|   0|           10|               8|          3|             2|        2|                  0|                   0|      0|  9|        4|      0|      0|
+----+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+---+---------+-------+-------+



In [10]:
#Afisarea numarului de recorduri din dataset
print((df.count(),len(df.columns)),"- Numarul total de recorduri")
#Stergerea recordurilor care contin valori nule
df_wo_null = df.na.drop()
print((df_wo_null.count(),len(df_wo_null.columns)),"- Numarul de recorduri care nu contin valori nule")


(6497, 13) - Numarul total de recorduri
(6463, 13) - Numarul de recorduri care nu contin valori nule


In [11]:
#Afisarea statisticilor aferente setului de date
df_wo_null.summary().show()

+-------+-----+-----------------+-------------------+-------------------+-----------------+--------------------+-------------------+--------------------+--------------------+-------------------+-------------------+------------------+------------------+
|summary| type|    fixed_acidity|   volatile_acidity|        citric_acid|   residual_sugar|           chlorides|free_sulfur_dioxide|total_sulfur_dioxide|             density|                 pH|          sulphates|           alcohol|           quality|
+-------+-----+-----------------+-------------------+-------------------+-----------------+--------------------+-------------------+--------------------+--------------------+-------------------+-------------------+------------------+------------------+
|  count| 6463|             6463|               6463|               6463|             6463|                6463|               6463|                6463|                6463|               6463|               6463|              6463|        

## **Procesarea datelor**


### **Dataframes**

In [12]:
# Sa se adauge o coloana in care sa se specifice categoria de vin pentru fiecare record
df_wo_null = df_wo_null.withColumn("category", f.expr("case when quality <=4 then 'Bad' when quality <=7 then 'Good' when quality>7 then 'Excellent' end"))
df_wo_null.show()

+-----+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+---------+
| type|fixed_acidity|volatile_acidity|citric_acid|residual_sugar|chlorides|free_sulfur_dioxide|total_sulfur_dioxide|density|  pH|sulphates|alcohol|quality| category|
+-----+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+---------+
|white|          7.0|            0.27|       0.36|          20.7|    0.045|               45.0|               170.0|  1.001| 3.0|     0.45|    8.8|      6|     Good|
|white|          6.3|             0.3|       0.34|           1.6|    0.049|               14.0|               132.0|  0.994| 3.3|     0.49|    9.5|      6|     Good|
|white|          8.1|            0.28|        0.4|           6.9|     0.05|               30.0|                97.0| 0.9951|3.26|     0.44|   10.1|      6|     Good|
|whi

In [13]:
#Sa se afiseze primele 20 de inregistrari care au o cantitate de zahar de peste 10 iar alcoolul continut sa fie mai mare de 12. Se vor afisa doar atributele type, residual_sugar, alcohol si quality
df_wo_null.filter("residual_sugar > 10 AND alcohol > 12").select(['type','residual_sugar','alcohol','quality']).show()

+-----+--------------+-------+-------+
| type|residual_sugar|alcohol|quality|
+-----+--------------+-------+-------+
|white|          14.0|   12.2|      7|
|white|          14.5|   12.5|      8|
|white|          12.8|   12.2|      8|
|white|         15.75|   12.1|      6|
|white|          11.0|   12.2|      6|
|white|          10.6|   12.8|      6|
|white|          11.1|   13.0|      6|
|white|         10.55|   12.7|      6|
|white|          10.2|   12.1|      7|
|white|         12.75|   12.9|      6|
|white|          14.5|   12.5|      5|
|white|         11.25|   12.4|      6|
|white|         11.25|   12.4|      6|
|white|          11.3|   12.8|      7|
|white|          15.5|   13.0|      7|
|white|          15.5|   13.0|      7|
|white|          15.5|   13.0|      7|
|white|          12.9|   13.0|      6|
|white|          10.8|   13.6|      6|
|white|          10.2|   12.2|      7|
+-----+--------------+-------+-------+
only showing top 20 rows



In [14]:
#Sa se determine cate inregistrari apartin fiecarei categorii
df_wo_null.groupBy('category').count().show()

+---------+-----+
| category|count|
+---------+-----+
|Excellent|  197|
|     Good| 6022|
|      Bad|  244|
+---------+-----+



In [15]:
#Sa se determine cate cate inregistrari apartin fiecarui tip de vinuri
df_wo_null.groupBy('type').count().show()

+-----+-----+
| type|count|
+-----+-----+
|white| 4870|
|  red| 1593|
+-----+-----+



In [16]:
#Sa se afiseze minimul pentru fiecare coloana, in functie de coloana quality
df_wo_null.groupBy('quality').min().show()

+-------+------------------+---------------------+----------------+-------------------+--------------+------------------------+-------------------------+------------+-------+--------------+------------+------------+
|quality|min(fixed_acidity)|min(volatile_acidity)|min(citric_acid)|min(residual_sugar)|min(chlorides)|min(free_sulfur_dioxide)|min(total_sulfur_dioxide)|min(density)|min(pH)|min(sulphates)|min(alcohol)|min(quality)|
+-------+------------------+---------------------+----------------+-------------------+--------------+------------------------+-------------------------+------------+-------+--------------+------------+------------+
|      6|               3.8|                 0.08|             0.0|                0.7|         0.015|                     1.0|                      6.0|     0.98758|   2.72|          0.23|         8.4|           6|
|      3|               4.2|                 0.17|             0.0|                0.7|         0.022|                     3.0|         

In [17]:
#Sa se determine inregistrarile care au total_sulfur_dioxide peste 150 si pH-ul peste 3
df_wo_null.filter((df_wo_null['pH']>3) & (df_wo_null['total_sulfur_dioxide']> 150)).show()

+-----+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+--------+
| type|fixed_acidity|volatile_acidity|citric_acid|residual_sugar|chlorides|free_sulfur_dioxide|total_sulfur_dioxide|density|  pH|sulphates|alcohol|quality|category|
+-----+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+--------+
|white|          7.2|            0.23|       0.32|           8.5|    0.058|               47.0|               186.0| 0.9956|3.19|      0.4|    9.9|      6|    Good|
|white|          7.2|            0.23|       0.32|           8.5|    0.058|               47.0|               186.0| 0.9956|3.19|      0.4|    9.9|      6|    Good|
|white|          7.4|            0.34|       0.42|           1.1|    0.033|               17.0|               171.0| 0.9917|3.12|     0.53|   11.3|      6|    Good|
|white|   

In [18]:
#Sa se determine inregistrarile care fac parte din categoria 'Excellent' si sunt rosii
df_wo_null.filter((df_wo_null['type']=='red') & (df_wo_null['category']== 'Excellent')).show()

+----+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+---------+
|type|fixed_acidity|volatile_acidity|citric_acid|residual_sugar|chlorides|free_sulfur_dioxide|total_sulfur_dioxide|density|  pH|sulphates|alcohol|quality| category|
+----+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+---------+
| red|          7.9|            0.35|       0.46|           3.6|    0.078|               15.0|                37.0| 0.9973|3.35|     0.86|   12.8|      8|Excellent|
| red|         10.3|            0.32|       0.45|           6.4|    0.073|                5.0|                13.0| 0.9976|3.23|     0.82|   12.6|      8|Excellent|
| red|          5.6|            0.85|       0.05|           1.4|    0.045|               12.0|                88.0| 0.9924|3.56|     0.82|   12.9|      8|Excellent|
| red|    

### **Spark SQL**

In [19]:
#Crearea unei vizualizari temporare pentru folosirea comenzilor SQL
df_wo_null.createOrReplaceTempView('wine')

In [20]:
#Sa se afiseze descrescator primele 10 vinuri care contin cea mai mare cantitate de alcool
output = spark.sql("SELECT * FROM wine ORDER BY alcohol DESC LIMIT 10")
output.show()

+-----+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+---------+
| type|fixed_acidity|volatile_acidity|citric_acid|residual_sugar|chlorides|free_sulfur_dioxide|total_sulfur_dioxide|density|  pH|sulphates|alcohol|quality| category|
+-----+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+---------+
|  red|         15.9|            0.36|       0.65|           7.5|    0.096|               22.0|                71.0| 0.9976|2.98|     0.84|   14.9|      5|     Good|
|white|          6.4|            0.35|       0.28|           1.6|    0.037|               31.0|               113.0|0.98779|3.12|      0.4|   14.2|      7|     Good|
|white|          5.8|            0.61|       0.01|           8.4|    0.041|               31.0|               104.0| 0.9909|3.26|     0.72|  14.05|      7|     Good|
|whi

In [21]:
#Sa se afiseze vinurile grupate dupa tip si categorie
output = spark.sql("SELECT DISTINCT type, category, count(*) as count FROM wine GROUP BY type, category").show()

+-----+---------+-----+
| type| category|count|
+-----+---------+-----+
|  red|Excellent|   18|
|  red|     Good| 1513|
|white|Excellent|  179|
|white|      Bad|  182|
|white|     Good| 4509|
|  red|      Bad|   62|
+-----+---------+-----+



In [22]:
#Sa se afiseze primele 3 vinuri cu aciditatea cea mai mica care au o densitate peste 0.995
output = spark.sql("SELECT * FROM wine WHERE density > 0.995 ORDER BY fixed_acidity ASC").show(3)

+-----+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+--------+
| type|fixed_acidity|volatile_acidity|citric_acid|residual_sugar|chlorides|free_sulfur_dioxide|total_sulfur_dioxide|density|  pH|sulphates|alcohol|quality|category|
+-----+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+--------+
|white|          4.2|           0.215|       0.23|           5.1|    0.041|               64.0|               157.0|0.99688|3.42|     0.44|    8.0|      3|     Bad|
|white|          4.9|           0.235|       0.27|         11.75|     0.03|               34.0|               118.0| 0.9954|3.07|      0.5|    9.4|      6|    Good|
|white|          5.0|           0.235|       0.27|         11.75|     0.03|               34.0|               118.0| 0.9954|3.07|      0.5|    9.4|      6|    Good|
+-----+---

In [23]:
#Sa se afiseze media aritmetica, minimul si maximul de alcol pentru fiecare tip de vin
output = spark.sql("SELECT type, AVG(alcohol), MIN(alcohol), MAX(alcohol) FROM wine GROUP BY type").show()

+-----+------------------+------------+------------+
| type|      avg(alcohol)|min(alcohol)|max(alcohol)|
+-----+------------------+------------+------------+
|white|10.516772055968612|         8.0|        14.2|
|  red|10.419617055424787|         8.4|        14.9|
+-----+------------------+------------+------------+



In [24]:
#Sa se afiseze vinurile care au un concentrat de cloruri peste medie, ordonandu-le descrescator dupa calitatea lor.
output = spark.sql("SELECT * FROM wine WHERE chlorides > (SELECT AVG(chlorides) FROM wine) ORDER BY quality DESC").show()

+-----+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+---------+
| type|fixed_acidity|volatile_acidity|citric_acid|residual_sugar|chlorides|free_sulfur_dioxide|total_sulfur_dioxide|density|  pH|sulphates|alcohol|quality| category|
+-----+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+---------+
|white|          7.3|            0.17|       0.24|           8.1|    0.121|               32.0|               162.0|0.99508|3.17|     0.38|   10.4|      8|Excellent|
|  red|          7.8|            0.57|       0.09|           2.3|    0.065|               34.0|                45.0|0.99417|3.46|     0.74|   12.7|      8|Excellent|
|white|          7.3|            0.17|       0.24|           8.1|    0.121|               32.0|               162.0|0.99508|3.17|     0.38|   10.4|      8|Excellent|
|whi

In [25]:
#Sa se afiseze cate inregistrari au acidul citric mai mare decat 0.8
output = spark.sql("SELECT * FROM wine WHERE citric_acid > 0.8").show()

+-----+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+--------+
| type|fixed_acidity|volatile_acidity|citric_acid|residual_sugar|chlorides|free_sulfur_dioxide|total_sulfur_dioxide|density|  pH|sulphates|alcohol|quality|category|
+-----+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+--------+
|white|         10.2|            0.44|       0.88|           6.2|    0.049|               20.0|               124.0| 0.9968|2.99|     0.51|    9.9|      4|     Bad|
|white|          7.4|             0.2|       1.66|           2.1|    0.022|               34.0|               113.0|0.99165|3.26|     0.55|   12.2|      6|    Good|
|white|          8.2|           0.345|        1.0|          18.2|    0.047|               55.0|               205.0|0.99965|2.96|     0.43|    9.6|      5|    Good|
|white|   

## **Machine learning**

### **DecisionTreeClassifier**

In [26]:
#importul pachetelor
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

Clasificarea vinurilor in funtie de calitate, folosind caracteristicile prezente

In [27]:
#Setarea dataframe-ului
dataFrameQuality = df_wo_null

#Transformarea coloanei type in label numeric
typeIndexer = StringIndexer(inputCol="type", outputCol="typeNum").fit(dataFrameQuality)
dataFrameQuality = typeIndexer.transform(dataFrameQuality)

#Crearea vectorului de caracteristici
dataFrameQuality = VectorAssembler(inputCols=['typeNum','fixed_acidity','volatile_acidity', 'citric_acid', 'residual_sugar','chlorides', 
                                       'free_sulfur_dioxide', 'total_sulfur_dioxide','density','pH','sulphates','alcohol',], outputCol="features").transform(dataFrameQuality)


#Identificarea caracteristicilor categoriale si indexarea lor
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=10).fit(dataFrameQuality)

#Indexarea coloanei quality-label
labelIndexer = StringIndexer(inputCol="quality", outputCol="indexedLabel").fit(dataFrameQuality)

#Partitionarea datelor
(trainingDataQuality, testDataQuality) = dataFrameQuality.randomSplit([0.8, 0.2])


#Definirea modelului
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

#Crearea pipelineului
pipeline = Pipeline(stages=[labelIndexer,featureIndexer, dt])

#Antrenarea modelului cu ajutorul pipelineului
model = pipeline.fit(trainingDataQuality)

#Realizarea predictiilor
predictions = model.transform(testDataQuality)

#Selectarea coloanelor  
predictions.select("prediction", "indexedLabel", "features").show()

#Evaluarea modelului
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))

#Afisarea unui sumar
treeModel = model.stages[2]
print(treeModel)

+----------+------------+--------------------+
|prediction|indexedLabel|            features|
+----------+------------+--------------------+
|       2.0|         0.0|[1.0,5.0,0.400000...|
|       0.0|         3.0|[1.0,5.0,1.019999...|
|       1.0|         1.0|[1.0,5.1999998092...|
|       0.0|         0.0|[1.0,5.1999998092...|
|       0.0|         1.0|[1.0,5.3000001907...|
|       0.0|         0.0|[1.0,5.4000000953...|
|       0.0|         1.0|[1.0,5.5999999046...|
|       0.0|         1.0|[1.0,5.5999999046...|
|       0.0|         0.0|[1.0,5.6999998092...|
|       0.0|         0.0|[1.0,5.8000001907...|
|       1.0|         1.0|[1.0,5.8000001907...|
|       2.0|         0.0|[1.0,5.9000000953...|
|       0.0|         0.0|[1.0,6.0,0.509999...|
|       0.0|         0.0|[1.0,6.0,0.509999...|
|       0.0|         0.0|[1.0,6.0,0.540000...|
|       1.0|         1.0|[1.0,6.0999999046...|
|       1.0|         2.0|[1.0,6.0999999046...|
|       0.0|         0.0|[1.0,6.0999999046...|
|       0.0| 

Clasificarea vinurilor in funtie de tip, folosind caracteristicile prezente

In [28]:
#Crearea vectorului de caracteristici
dataFrame = VectorAssembler(inputCols=['fixed_acidity','volatile_acidity', 'citric_acid', 'residual_sugar','chlorides', 
                                       'free_sulfur_dioxide', 'total_sulfur_dioxide','density','pH','sulphates','alcohol','quality'], outputCol="features").transform(df_wo_null)

#Indexarea coloanei type-label
labelIndexer = StringIndexer(inputCol="type", outputCol="indexedLabel").fit(dataFrame)

#Identificarea caracteristicilor categoriale si indexarea lor
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=10).fit(dataFrame)

#Partitionarea datelor
(trainingData, testData) = dataFrame.randomSplit([0.8, 0.2])

#Definirea modelului
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

#Crearea pipelineului
pipeline = Pipeline(stages=[labelIndexer,featureIndexer, dt])

#Antrenarea modelului cu ajutorul pipelineului
model = pipeline.fit(trainingData)

#Realizarea predictiilor
predictions = model.transform(testData)

#Selectarea coloanelor  
predictions.select("prediction", "indexedLabel", "features").show()

#Evaluarea modelului
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))

#Afisarea unui sumar
treeModel = model.stages[2]
print(treeModel)

+----------+------------+--------------------+
|prediction|indexedLabel|            features|
+----------+------------+--------------------+
|       1.0|         1.0|[4.59999990463256...|
|       1.0|         1.0|[4.69999980926513...|
|       0.0|         1.0|[5.09999990463256...|
|       1.0|         1.0|[5.40000009536743...|
|       1.0|         1.0|[5.40000009536743...|
|       0.0|         1.0|[5.59999990463256...|
|       1.0|         1.0|[5.59999990463256...|
|       0.0|         1.0|[5.80000019073486...|
|       1.0|         1.0|[5.90000009536743...|
|       1.0|         1.0|[6.0,0.5,0.0,1.39...|
|       1.0|         1.0|[6.0,0.5,0.039999...|
|       1.0|         1.0|[6.0,0.5099999904...|
|       1.0|         1.0|[6.0,0.5799999833...|
|       1.0|         1.0|[6.09999990463256...|
|       1.0|         1.0|[6.19999980926513...|
|       1.0|         1.0|[6.19999980926513...|
|       1.0|         1.0|[6.19999980926513...|
|       1.0|         1.0|[6.19999980926513...|
|       1.0| 

### **RandomForestClassifier**

In [29]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString

In [30]:
#Crearea vectorului de caracteristici
dataframeRFC = df_wo_null
dataframeRFC = VectorAssembler(inputCols=['fixed_acidity','volatile_acidity', 'citric_acid', 'residual_sugar','chlorides', 
                                       'free_sulfur_dioxide', 'total_sulfur_dioxide','density','pH','sulphates','alcohol','quality'], outputCol="features").transform(dataframeRFC)

#Indexarea coloanei type-label
labelIndexerRFC = StringIndexer(inputCol="type", outputCol="indexedLabel").fit(dataframeRFC)

#Identificarea caracteristicilor categoriale si indexarea lor
featureIndexerRFC = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=12).fit(dataframeRFC)

#Partitionarea datelor
(trainingDataRFC, testDataRFC) = dataframeRFC.randomSplit([0.8, 0.2])

#Definirea modelului
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)

# Conversia etichetelor indexate înapoi la etichetele originale
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labelIndexerRFC.labels)

#Crearea pipelineului
pipeline = Pipeline(stages=[labelIndexerRFC, featureIndexerRFC, rf, labelConverter])

#Antrenarea pipeline-ului
model = pipeline.fit(trainingDataRFC)

#Realizarea predictiilor
predictionsRFC = model.transform(testDataRFC)

#Selectarea coloanelor
predictionsRFC.select("predictedLabel", "type", "features").show()

#Evaluarea modelului
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictionsRFC)
print("Test Error = %g" % (1.0 - accuracy))

#Afisarea unui sumar
rfModel = model.stages[2]
print(rfModel) 


+--------------+----+--------------------+
|predictedLabel|type|            features|
+--------------+----+--------------------+
|         white| red|[5.0,1.0199999809...|
|           red| red|[5.19999980926513...|
|           red| red|[5.19999980926513...|
|           red| red|[5.30000019073486...|
|           red| red|[5.59999990463256...|
|           red| red|[5.59999990463256...|
|           red| red|[5.59999990463256...|
|           red| red|[5.69999980926513...|
|           red| red|[5.80000019073486...|
|           red| red|[6.0,0.3100000023...|
|           red| red|[6.0,0.5,0.0,1.39...|
|           red| red|[6.0,0.5099999904...|
|           red| red|[6.09999990463256...|
|           red| red|[6.09999990463256...|
|           red| red|[6.19999980926513...|
|           red| red|[6.19999980926513...|
|           red| red|[6.19999980926513...|
|           red| red|[6.19999980926513...|
|           red| red|[6.19999980926513...|
|           red| red|[6.30000019073486...|
+----------

### **LogisticRegression**

In [31]:
from pyspark.ml.classification import LogisticRegression

#Setarea dataframe-ului
dataFrameLR = df_wo_null

#Transformarea coloanei type in label numeric
labelIndexer = StringIndexer(inputCol="type", outputCol="label").fit(dataFrameLR)
dataFrameLR = labelIndexer.transform(dataFrameLR)

#Crearea vectorului de caracteristici
dataFrameLR = VectorAssembler(inputCols=['fixed_acidity','volatile_acidity', 'citric_acid', 'residual_sugar','chlorides', 
                                       'free_sulfur_dioxide', 'total_sulfur_dioxide','density','pH','sulphates','alcohol','quality'], outputCol="features").transform(dataFrameLR)


#Definirea modelului
lr = LogisticRegression(maxIter=10)

#Antrenarea modelului
lrModel = lr.fit(dataFrameLR)

# Afisarea coeficientilor si interceptorilor modelului
print("Coefficients: \n" + str(lrModel.coefficientMatrix))
print("\nIntercept: " + str(lrModel.interceptVector))



trainingSummary = lrModel.summary
print("\nFalse positive rate by label:")
for i, rate in enumerate(trainingSummary.falsePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print("\nTrue positive rate by label:")
for i, rate in enumerate(trainingSummary.truePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print("\nTest Error = %g" % (1.0 - trainingSummary.accuracy))
print("\nFPR= %s" % (trainingSummary.weightedFalsePositiveRate))
print("\nTPR= %s" % (trainingSummary.weightedTruePositiveRate))

Coefficients: 
DenseMatrix([[ 6.81597507e-01,  9.32577850e+00, -1.99197392e+00,
              -5.76216883e-01,  2.96878859e+01,  5.73849121e-02,
              -6.04131216e-02,  9.94256776e+02,  3.95645315e+00,
               5.66187457e+00,  7.01248516e-01,  3.77766947e-01]])

Intercept: [-1019.3413373947726]

False positive rate by label:
label 0: 0.01820464532328939
label 1: 0.003490759753593429

True positive rate by label:
label 0: 0.9965092402464065
label 1: 0.9817953546767106

Test Error = 0.00711744

FPR= 0.01457796735446289

TPR= 0.9928825622775801


## **Deep learning**

In [32]:
#importul librariilor
import numpy as np
import pandas as pd

In [33]:
#citirea datelor
df = pd.read_csv("/content/drive/MyDrive/BigData/proiect/winequalityN.csv")
df.head()

Unnamed: 0,type,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality
0,white,7.0,0.27,0.36,20.7,0.045,45.0,170.0,1.001,3.0,0.45,8.8,6
1,white,6.3,0.3,0.34,1.6,0.049,14.0,132.0,0.994,3.3,0.49,9.5,6
2,white,8.1,0.28,0.4,6.9,0.05,30.0,97.0,0.9951,3.26,0.44,10.1,6
3,white,7.2,0.23,0.32,8.5,0.058,47.0,186.0,0.9956,3.19,0.4,9.9,6
4,white,7.2,0.23,0.32,8.5,0.058,47.0,186.0,0.9956,3.19,0.4,9.9,6


In [34]:
#transformarea coloanei type de tip string in coloana label de tip integer
from sklearn.preprocessing import OneHotEncoder
encoder = OneHotEncoder(drop='first')
encoded_type = encoder.fit_transform(df[['type']]).toarray()
df[['label']] = encoded_type

In [35]:
#stergerea randurilor care contin null
df = df.dropna()

In [36]:
#Impartirea datelor in features si label
y= df['label']
x= df.drop(['type','label'],axis =1)

In [37]:
#Impartirea datelor in training si test
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(x, y, test_size=0.2)

In [38]:
#Scalarea datelor
from sklearn.preprocessing import MinMaxScaler
scaler =  MinMaxScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

In [39]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense


In [40]:
# Construirea unei retele neuronale complet conectata
model = Sequential()
model.add(Dense(12, activation='relu',input_shape=(12,) ))
model.add(Dense(24, activation='relu'))
model.add(Dense(1, activation='sigmoid'))

In [41]:
model.compile(optimizer = 'adam', loss = 'binary_crossentropy', metrics = ['accuracy'])

In [42]:
#Antrenarea modelului
history = model.fit(x=X_train, 
                   y=y_train, 
                   epochs=50,
                   validation_data=(X_test, y_test), 
                   verbose=1,batch_size=128)

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


In [43]:
#Calcularea predictiilor si 
y_pred = model.predict(X_test) > 0.5
y_pred = y_pred.astype(np.int32)

In [44]:
#Matricea de confuzie
from sklearn.metrics import confusion_matrix
confusion_matrix(y_test, y_pred)

array([[301,   6],
       [  9, 977]])

In [45]:
#Evaluarea modelului folosind datele de tes
model.evaluate(X_test, y_test)



[0.052926987409591675, 0.988399088382721]