Since the HW grading is done in a semi-automatic manner, please adhere to the following naming format for your submission.
Each group of students (mostly pairs, with some approved exceptions) should submit a Zip file, whose name is the underscored-separated id list of all the submitters. For example, for two submitters, the naming format is: id1_id2.zip.
The ZIP file should contain:
1. Jupyter notebook (.ipynb file and not a .zip file) whose name is the underscored-separated id list of all the submitters. For example, for two submitters, the naming format is: id1_id2.ipynb.
2. Python File (map-reduce part, .py file) whose name is the underscored-separated id list of all the submitters. For example, for two submitters, the naming format is: id1_id2.py.

In [9]:
# install pyspark library (via pip install)
# There is no need to set a local spark server.
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285398 sha256=a635ac4cffd93fa57568b3a16f7a7ce11723798bb539c3c6fc568ef7b163976f
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [10]:
# run this command to make sure the versions of pyspark and your python match:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

for each of the tasks bellow, make sure you add a print command at the end of the cell, to show the results (df.show(1))

# Getting started:

start a spark session

In [11]:
from pyspark.sql import SparkSession
name = 'spark' #give a name to your app
spark = SparkSession.builder.appName('{name}'.format(name=name)).getOrCreate()

load the wine dataset:

In [12]:
from sklearn.datasets import load_wine

wine = load_wine()
data = wine.data
columns = list(wine.feature_names)

create spark dataframe to contain the data and the columns names as it's schema

In [13]:
import pandas as pd
from pyspark.sql import SparkSession
pandasDF = pd.DataFrame(data, columns=columns)
sparkDF=spark.createDataFrame(pandasDF)
sparkDF.show()

+-------+----------+----+-----------------+---------+-------------+----------+--------------------+---------------+---------------+----+----------------------------+-------+
|alcohol|malic_acid| ash|alcalinity_of_ash|magnesium|total_phenols|flavanoids|nonflavanoid_phenols|proanthocyanins|color_intensity| hue|od280/od315_of_diluted_wines|proline|
+-------+----------+----+-----------------+---------+-------------+----------+--------------------+---------------+---------------+----+----------------------------+-------+
|  14.23|      1.71|2.43|             15.6|    127.0|          2.8|      3.06|                0.28|           2.29|           5.64|1.04|                        3.92| 1065.0|
|   13.2|      1.78|2.14|             11.2|    100.0|         2.65|      2.76|                0.26|           1.28|           4.38|1.05|                         3.4| 1050.0|
|  13.16|      2.36|2.67|             18.6|    101.0|          2.8|      3.24|                 0.3|           2.81|           5.68

preform 3 initial data exploration tasks that will help you understand your data better

In [14]:
sparkDF.printSchema()

root
 |-- alcohol: double (nullable = true)
 |-- malic_acid: double (nullable = true)
 |-- ash: double (nullable = true)
 |-- alcalinity_of_ash: double (nullable = true)
 |-- magnesium: double (nullable = true)
 |-- total_phenols: double (nullable = true)
 |-- flavanoids: double (nullable = true)
 |-- nonflavanoid_phenols: double (nullable = true)
 |-- proanthocyanins: double (nullable = true)
 |-- color_intensity: double (nullable = true)
 |-- hue: double (nullable = true)
 |-- od280/od315_of_diluted_wines: double (nullable = true)
 |-- proline: double (nullable = true)



In [15]:
sparkDF.describe().show()


+-------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+--------------------+------------------+------------------+-------------------+----------------------------+------------------+
|summary|           alcohol|        malic_acid|               ash| alcalinity_of_ash|        magnesium|     total_phenols|        flavanoids|nonflavanoid_phenols|   proanthocyanins|   color_intensity|                hue|od280/od315_of_diluted_wines|           proline|
+-------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+--------------------+------------------+------------------+-------------------+----------------------------+------------------+
|  count|               178|               178|               178|               178|              178|               178|               178|                 178|               178|            

In [16]:
from pyspark.sql.functions import col, skewness, kurtosis
for col in columns:
    sparkDF.select(skewness(col),kurtosis(col)).show()

+--------------------+-------------------+
|   skewness(alcohol)|  kurtosis(alcohol)|
+--------------------+-------------------+
|-0.05104747165391...|-0.8622600987701761|
+--------------------+-------------------+

+--------------------+--------------------+
|skewness(malic_acid)|kurtosis(malic_acid)|
+--------------------+--------------------+
|   1.030869497803997|  0.2573482858330962|
+--------------------+--------------------+

+-------------------+------------------+
|      skewness(ash)|     kurtosis(ash)|
+-------------------+------------------+
|-0.1752067779255986|1.0785761323125556|
+-------------------+------------------+

+---------------------------+---------------------------+
|skewness(alcalinity_of_ash)|kurtosis(alcalinity_of_ash)|
+---------------------------+---------------------------+
|        0.21124732832155163|         0.4408231369002831|
+---------------------------+---------------------------+

+-------------------+-------------------+
|skewness(magnesium)|kur

# Data preparation tasks:

preform the following data cleaning task:

    1. rename the column "Alcohol" to "ABV"


In [17]:
sparkDF=sparkDF.withColumnRenamed("alcohol","ABV")
sparkDF.printSchema()

root
 |-- ABV: double (nullable = true)
 |-- malic_acid: double (nullable = true)
 |-- ash: double (nullable = true)
 |-- alcalinity_of_ash: double (nullable = true)
 |-- magnesium: double (nullable = true)
 |-- total_phenols: double (nullable = true)
 |-- flavanoids: double (nullable = true)
 |-- nonflavanoid_phenols: double (nullable = true)
 |-- proanthocyanins: double (nullable = true)
 |-- color_intensity: double (nullable = true)
 |-- hue: double (nullable = true)
 |-- od280/od315_of_diluted_wines: double (nullable = true)
 |-- proline: double (nullable = true)



    2. create a new column that will we be the standardization of column "magnesium". call it "magnesium_stand"

In [18]:
from pyspark.sql.functions import mean as _mean, stddev as _stddev, col

df_stats = sparkDF.select(
    _mean(col('magnesium')).alias('mean'),
    _stddev(col('magnesium')).alias('std')
).collect()

mean = df_stats[0]['mean']
std = df_stats[0]['std']
sparkDF = sparkDF.withColumn("magnesium_stand",((col("magnesium")-mean)/std))
sparkDF.show()


+-----+----------+----+-----------------+---------+-------------+----------+--------------------+---------------+---------------+----+----------------------------+-------+--------------------+
|  ABV|malic_acid| ash|alcalinity_of_ash|magnesium|total_phenols|flavanoids|nonflavanoid_phenols|proanthocyanins|color_intensity| hue|od280/od315_of_diluted_wines|proline|     magnesium_stand|
+-----+----------+----+-----------------+---------+-------------+----------+--------------------+---------------+---------------+----+----------------------------+-------+--------------------+
|14.23|      1.71|2.43|             15.6|    127.0|          2.8|      3.06|                0.28|           2.29|           5.64|1.04|                        3.92| 1065.0|  1.9085215072784814|
| 13.2|      1.78|2.14|             11.2|    100.0|         2.65|      2.76|                0.26|           1.28|           4.38|1.05|                         3.4| 1050.0|  0.0180939796650474|
|13.16|      2.36|2.67|            

    3. create a new column that will be valued 1 (and 0 otherwise) if the alcohol rate exceeds 14 (including) and color_intensity exceeds 6. call it "col_intense".

In [19]:
from pyspark.sql.functions import when,lit
sparkDF=sparkDF.withColumn("col_intense", \
   when((sparkDF.ABV > 14) &(sparkDF.color_intensity > 6), lit(1)) \
     .otherwise(lit(0)))
sparkDF.show()

+-----+----------+----+-----------------+---------+-------------+----------+--------------------+---------------+---------------+----+----------------------------+-------+--------------------+-----------+
|  ABV|malic_acid| ash|alcalinity_of_ash|magnesium|total_phenols|flavanoids|nonflavanoid_phenols|proanthocyanins|color_intensity| hue|od280/od315_of_diluted_wines|proline|     magnesium_stand|col_intense|
+-----+----------+----+-----------------+---------+-------------+----------+--------------------+---------------+---------------+----+----------------------------+-------+--------------------+-----------+
|14.23|      1.71|2.43|             15.6|    127.0|          2.8|      3.06|                0.28|           2.29|           5.64|1.04|                        3.92| 1065.0|  1.9085215072784814|          0|
| 13.2|      1.78|2.14|             11.2|    100.0|         2.65|      2.76|                0.26|           1.28|           4.38|1.05|                         3.4| 1050.0|  0.01809

We will load the labels of the data above:

In [20]:
target = wine.target
column = ['Class']

create spark dataframe to contain the data and the columns of the target

In [21]:
import pandas as pd
from pyspark.sql import SparkSession
pandasnewDF = pd.DataFrame(target, columns=column)
sparknewDF=spark.createDataFrame(pandasnewDF)
sparknewDF.show()

+-----+
|Class|
+-----+
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
+-----+
only showing top 20 rows



merge the two datasets to one dataset

In [22]:
from pyspark.sql.functions import monotonically_increasing_id as mi
monotonicallyIncreasingId=mi()
sparkDF=sparkDF.withColumn("id1",monotonicallyIncreasingId)
sparknewDF=sparknewDF.withColumn("id2",monotonicallyIncreasingId)
merged_df=sparkDF.join(sparknewDF,col("id1")==col("id2"),"inner").drop("id1","id2")
merged_df.show()

+-----+----------+----+-----------------+---------+-------------+----------+--------------------+---------------+---------------+----+----------------------------+-------+--------------------+-----------+-----+
|  ABV|malic_acid| ash|alcalinity_of_ash|magnesium|total_phenols|flavanoids|nonflavanoid_phenols|proanthocyanins|color_intensity| hue|od280/od315_of_diluted_wines|proline|     magnesium_stand|col_intense|Class|
+-----+----------+----+-----------------+---------+-------------+----------+--------------------+---------------+---------------+----+----------------------------+-------+--------------------+-----------+-----+
|13.39|      1.77|2.62|             16.1|     93.0|         2.85|      2.94|                0.34|           1.45|            4.8|0.92|                        3.22| 1195.0|-0.47201686082732436|          0|    0|
|14.02|      1.68|2.21|             16.0|     96.0|         2.65|      2.33|                0.26|           1.98|            4.7|1.04|                      

# Explore your data and labels:

preform exploration tasks to find the answer to each question below:

1. how many samples do you have for each class

In [23]:
merged_df.groupby("Class").count().show()

+-----+-----+
|Class|count|
+-----+-----+
|    0|   59|
|    1|   71|
|    2|   48|
+-----+-----+



2. which class has the highest alcohol value? which class has highest alcohol values on average?  

In [25]:
import pyspark.sql.functions as functionss
merged_df.groupBy("Class").agg(functionss.max("ABV")).show()
merged_df.groupBy("Class").agg(functionss.avg("ABV")).show()
print("0 is the highest in both aspects")

+-----+--------+
|Class|max(ABV)|
+-----+--------+
|    0|   14.83|
|    1|   13.86|
|    2|   14.34|
+-----+--------+

+-----+------------------+
|Class|          avg(ABV)|
+-----+------------------+
|    0|13.744745762711863|
|    1|12.278732394366202|
|    2|13.153750000000002|
+-----+------------------+

0 is the highest in both aspects


3. which column has the highset correlation with Class column?

In [26]:
names=merged_df.schema.names
print(names)
m1=-1
m2=float('inf')
m1_col=""
m2_col=""
for c in names:
    if c =="Class":
        continue
    new_v=merged_df.stat.corr(c,"Class")
    if new_v <=m2:
        m2=new_v
        m2_col=c
    if new_v >=m1:
        m1=new_v
        m1_col=c
print("the highest in column " +m1_col +"val is :"+str(m1))
print("the lowest in column " +m2_col +"val is :"+str(m2))

['ABV', 'malic_acid', 'ash', 'alcalinity_of_ash', 'magnesium', 'total_phenols', 'flavanoids', 'nonflavanoid_phenols', 'proanthocyanins', 'color_intensity', 'hue', 'od280/od315_of_diluted_wines', 'proline', 'magnesium_stand', 'col_intense', 'Class']
the highest in column alcalinity_of_ashval is :0.5178591098214516
the lowest in column flavanoidsval is :-0.8474975401417593


# Train an ML model to predict the class (Bonus)

In [32]:
print(len(merged_df.columns))

16


1. create a column for the featers
2. split your data to train and test (70%,30%)
3. train Random Forest model to predict the column "Class"
4. print your test results (using metric accuracy)

In [36]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from sklearn.datasets import load_wine

data = merged_df


# Split the data into train and test sets
train_data, test_data = data.randomSplit([0.7, 0.3], seed=42)

# Create a vector assembler to combine the features into a single vector column
assembler = VectorAssembler(inputCols=["ABV", "malic_acid", "ash", "alcalinity_of_ash", "magnesium", "total_phenols", "flavanoids", "nonflavanoid_phenols", "proanthocyanins", "color_intensity", "hue", "od280/od315_of_diluted_wines", "proline", "magnesium_stand", "col_intense"], outputCol="features_vector")

# Transform the train and test data using the vector assembler
train_data = assembler.transform(train_data)
test_data = assembler.transform(test_data)

# Train a Random Forest classifier
rf = RandomForestClassifier(labelCol="Class", featuresCol="features_vector", seed=42)
model = rf.fit(train_data)

# Make predictions on the test data
predictions = model.transform(test_data)

# Evaluate the accuracy of the model
evaluator = MulticlassClassificationEvaluator(labelCol="Class", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print("Test Accuracy: {:.2f}%".format(accuracy * 100))



Test Accuracy: 98.08%
