In [1]:
import findspark
findspark.init()
import pyspark
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
import numpy as np
import pandas as pd

In [2]:
conf = (SparkConf().set("spark.cores.max", "4"))
sc = SparkContext(conf=conf)

In [3]:
spark = SparkSession\
    .builder\
    .appName("MyApp")\
    .config("spark.cores.max", "4")\
    .getOrCreate()

## Read file as text

In [4]:
data = sc.textFile("Datasets/Winequality_red.csv", 3) #3 partitions

In [5]:
data.getNumPartitions()

3

In [6]:
data.count()

1600

In [7]:
data.take(10)

[u'"fixed acidity";"volatile acidity";"citric acid";"residual sugar";"chlorides";"free sulfur dioxide";"total sulfur dioxide";"density";"pH";"sulphates";"alcohol";"quality"',
 u'7.4;0.7;0;1.9;0.076;11;34;0.9978;3.51;0.56;9.4;5',
 u'7.8;0.88;0;2.6;0.098;25;67;0.9968;3.2;0.68;9.8;5',
 u'7.8;0.76;0.04;2.3;0.092;15;54;0.997;3.26;0.65;9.8;5',
 u'11.2;0.28;0.56;1.9;0.075;17;60;0.998;3.16;0.58;9.8;6',
 u'7.4;0.7;0;1.9;0.076;11;34;0.9978;3.51;0.56;9.4;5',
 u'7.4;0.66;0;1.8;0.075;13;40;0.9978;3.51;0.56;9.4;5',
 u'7.9;0.6;0.06;1.6;0.069;15;59;0.9964;3.3;0.46;9.4;5',
 u'7.3;0.65;0;1.2;0.065;15;21;0.9946;3.39;0.47;10;7',
 u'7.8;0.58;0.02;2;0.073;9;18;0.9968;3.36;0.57;9.5;7']

In [8]:
data.saveAsTextFile("saved-data")

In [9]:
data2 = sc.textFile("saved-data/part-*")

In [10]:
data2.count()

1600

## Load dataset 1

In [11]:
df1 = spark.read\
    .format("csv")\
    .option("header", "true")\
    .option("inferschema", "true")\
    .option("delimiter",";")\
    .load("Datasets/Winequality_red.csv").repartition(3)

In [12]:
df1.rdd.getNumPartitions()

3

## Explore dataset 1

In [13]:
df1.show(5)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.8|            0.76|       0.04|           2.3|    0.092|               15.0|                54.0|  0.997|3.26|     0.65|    9.8|      5|
|          7.4|            0.66|        0.0|           1.8|    0.075|               13.0|                40.0| 0.9978|3.51|     0.56|    9.4|      5|
|          7.8|            0.58|       0.02|           2.0|    0.073|                9.0|                18.0| 0.9968|3.36|     0.57|    9.5|      7|
|          7.5|             0.5|       0.36|           6.1|    0.071|               17.0|           

In [14]:
df1.printSchema()

root
 |-- fixed acidity: double (nullable = true)
 |-- volatile acidity: double (nullable = true)
 |-- citric acid: double (nullable = true)
 |-- residual sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free sulfur dioxide: double (nullable = true)
 |-- total sulfur dioxide: double (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: integer (nullable = true)



In [15]:
df1.columns

['fixed acidity',
 'volatile acidity',
 'citric acid',
 'residual sugar',
 'chlorides',
 'free sulfur dioxide',
 'total sulfur dioxide',
 'density',
 'pH',
 'sulphates',
 'alcohol',
 'quality']

In [16]:
len(df1.columns)

12

In [17]:
df1.count()

1599

In [18]:
df1.describe("quality").show()

+-------+------------------+
|summary|           quality|
+-------+------------------+
|  count|              1599|
|   mean|5.6360225140712945|
| stddev|0.8075694397347046|
|    min|                 3|
|    max|                 8|
+-------+------------------+



In [19]:
df1.describe("quality","fixed acidity").show()

+-------+------------------+------------------+
|summary|           quality|     fixed acidity|
+-------+------------------+------------------+
|  count|              1599|              1599|
|   mean|5.6360225140712945| 8.319637273295806|
| stddev|0.8075694397347046|1.7410963181276964|
|    min|                 3|               4.6|
|    max|                 8|              15.9|
+-------+------------------+------------------+



In [20]:
df1.select(mean("quality"),max("quality"),min("quality")).show()

+------------------+------------+------------+
|      avg(quality)|max(quality)|min(quality)|
+------------------+------------+------------+
|5.6360225140712945|           8|           3|
+------------------+------------+------------+



In [21]:
df1.select("quality").distinct().sort("quality").show()

+-------+
|quality|
+-------+
|      3|
|      4|
|      5|
|      6|
|      7|
|      8|
+-------+



In [22]:
df1.groupBy("quality").count().sort("quality").show()

+-------+-----+
|quality|count|
+-------+-----+
|      3|   10|
|      4|   53|
|      5|  681|
|      6|  638|
|      7|  199|
|      8|   18|
+-------+-----+



In [23]:
df1.stat.corr("quality","density")

-0.17491922778334745

In [24]:
#L = [[cn,df1.stat.corr("quality",cn)] for cn in df1.columns]
L =[]
for cn in df1.columns:
    L.append([cn,df1.stat.corr("quality",cn)])
L

[['fixed acidity', 0.12405164911322447],
 ['volatile acidity', -0.3905577802640072],
 ['citric acid', 0.2263725143180415],
 ['residual sugar', 0.013731637340066348],
 ['chlorides', -0.12890655993005268],
 ['free sulfur dioxide', -0.050656057244276624],
 ['total sulfur dioxide', -0.18510028892653768],
 ['density', -0.17491922778334731],
 ['pH', -0.0577313912053819],
 ['sulphates', 0.2513970790692617],
 ['alcohol', 0.47616632400113595],
 ['quality', 1.0]]

In [25]:
df_corr = spark.createDataFrame(L).toDF("colname","correlation")

In [26]:
df_corr.sort("correlation").show()

+--------------------+--------------------+
|             colname|         correlation|
+--------------------+--------------------+
|    volatile acidity| -0.3905577802640072|
|total sulfur dioxide|-0.18510028892653768|
|             density|-0.17491922778334731|
|           chlorides|-0.12890655993005268|
|                  pH| -0.0577313912053819|
| free sulfur dioxide|-0.05065605724427...|
|      residual sugar|0.013731637340066348|
|       fixed acidity| 0.12405164911322447|
|         citric acid|  0.2263725143180415|
|           sulphates|  0.2513970790692617|
|             alcohol| 0.47616632400113595|
|             quality|                 1.0|
+--------------------+--------------------+



In [27]:
df_corr.select(col("colname"),abs(col("correlation")))\
    .sort(col("abs(correlation)").desc()).show()

+--------------------+--------------------+
|             colname|    abs(correlation)|
+--------------------+--------------------+
|             quality|                 1.0|
|             alcohol| 0.47616632400113595|
|    volatile acidity|  0.3905577802640072|
|           sulphates|  0.2513970790692617|
|         citric acid|  0.2263725143180415|
|total sulfur dioxide| 0.18510028892653768|
|             density| 0.17491922778334731|
|           chlorides| 0.12890655993005268|
|       fixed acidity| 0.12405164911322447|
|                  pH|  0.0577313912053819|
| free sulfur dioxide|0.050656057244276624|
|      residual sugar|0.013731637340066348|
+--------------------+--------------------+



## Load dataset 2

In [28]:
df2 = spark.read\
    .format("csv")\
    .option("header", "true")\
    .option("inferschema", "true")\
    .option("delimiter",";")\
    .load("Datasets/Winequality_white.csv")

## Explore dataset 2

In [29]:
df2.show(5)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.0|            0.27|       0.36|          20.7|    0.045|               45.0|               170.0|  1.001| 3.0|     0.45|    8.8|      6|
|          6.3|             0.3|       0.34|           1.6|    0.049|               14.0|               132.0|  0.994| 3.3|     0.49|    9.5|      6|
|          8.1|            0.28|        0.4|           6.9|     0.05|               30.0|                97.0| 0.9951|3.26|     0.44|   10.1|      6|
|          7.2|            0.23|       0.32|           8.5|    0.058|               47.0|           

In [30]:
df2.printSchema()

root
 |-- fixed acidity: double (nullable = true)
 |-- volatile acidity: double (nullable = true)
 |-- citric acid: double (nullable = true)
 |-- residual sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free sulfur dioxide: double (nullable = true)
 |-- total sulfur dioxide: double (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: integer (nullable = true)



In [31]:
len(df2.columns)

12

In [32]:
df2.count()

4898

In [33]:
df2.groupBy("quality").count().sort("quality").show()

+-------+-----+
|quality|count|
+-------+-----+
|      3|   20|
|      4|  163|
|      5| 1457|
|      6| 2198|
|      7|  880|
|      8|  175|
|      9|    5|
+-------+-----+



## Concatenate the two datasets

In [34]:
df = df1.unionAll(df2)
df.show(10)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.8|            0.76|       0.04|           2.3|    0.092|               15.0|                54.0|  0.997|3.26|     0.65|    9.8|      5|
|          7.4|            0.66|        0.0|           1.8|    0.075|               13.0|                40.0| 0.9978|3.51|     0.56|    9.4|      5|
|          7.8|            0.58|       0.02|           2.0|    0.073|                9.0|                18.0| 0.9968|3.36|     0.57|    9.5|      7|
|          7.5|             0.5|       0.36|           6.1|    0.071|               17.0|           

In [35]:
df.count()

6497

## Repartition and save the new dataset

In [36]:
df.repartition(1).write\
    .format("csv")\
    .option("header", "true")\
    .mode("overwrite")\
    .save("wine-data")

## Merge the two datasets

In [37]:
Columns_1 = []
for name in df1.columns:
    name_1 = name + "_1"
    Columns_1.append(name_1)

In [38]:
Columns_1

['fixed acidity_1',
 'volatile acidity_1',
 'citric acid_1',
 'residual sugar_1',
 'chlorides_1',
 'free sulfur dioxide_1',
 'total sulfur dioxide_1',
 'density_1',
 'pH_1',
 'sulphates_1',
 'alcohol_1',
 'quality_1']

In [39]:
F1 = df1.toDF(*Columns_1)

In [40]:
Columns_2 = []
for name in df2.columns:
    name_2 = name + "_2"
    Columns_2.append(name_2)

In [41]:
Columns_2

['fixed acidity_2',
 'volatile acidity_2',
 'citric acid_2',
 'residual sugar_2',
 'chlorides_2',
 'free sulfur dioxide_2',
 'total sulfur dioxide_2',
 'density_2',
 'pH_2',
 'sulphates_2',
 'alcohol_2',
 'quality_2']

In [42]:
F2 = df2.toDF(*Columns_2)

In [43]:
dfm = F1.join(F2, F1["quality_1"]==F2["quality_2"])
dfm.show(5)

+---------------+------------------+-------------+----------------+-----------+---------------------+----------------------+---------+----+-----------+---------+---------+---------------+------------------+-------------+----------------+-----------+---------------------+----------------------+---------+----+-----------+---------+---------+
|fixed acidity_1|volatile acidity_1|citric acid_1|residual sugar_1|chlorides_1|free sulfur dioxide_1|total sulfur dioxide_1|density_1|pH_1|sulphates_1|alcohol_1|quality_1|fixed acidity_2|volatile acidity_2|citric acid_2|residual sugar_2|chlorides_2|free sulfur dioxide_2|total sulfur dioxide_2|density_2|pH_2|sulphates_2|alcohol_2|quality_2|
+---------------+------------------+-------------+----------------+-----------+---------------------+----------------------+---------+----+-----------+---------+---------+---------------+------------------+-------------+----------------+-----------+---------------------+----------------------+---------+----+-------

## Load dataset 3

In [44]:
df3 = spark.read\
    .format("csv")\
    .option("header", "true")\
    .option("inferschema", "true")\
    .option("delimiter",",")\
    .load("Datasets/Breast_cancer_wisconsin.csv")

In [45]:
df3.show(5)

+---------------+---------------+----------------+-----------------+---------------+-------------+---------------+---------------+-------+-----+
|clump_thickness|size_uniformity|shape_uniformity|marginal_adhesion|epithelial_size|bare_nucleoli|bland_chromatin|normal_nucleoli|mitoses|class|
+---------------+---------------+----------------+-----------------+---------------+-------------+---------------+---------------+-------+-----+
|              5|              1|               1|                1|              2|            1|              3|              1|      1|    0|
|              5|              4|               4|                5|              7|           10|              3|              2|      1|    0|
|              3|              1|               1|                1|              2|            2|              3|              1|      1|    0|
|              6|              8|               8|                1|              3|            4|              3|              7|

In [46]:
df3.printSchema()

root
 |-- clump_thickness: integer (nullable = true)
 |-- size_uniformity: integer (nullable = true)
 |-- shape_uniformity: integer (nullable = true)
 |-- marginal_adhesion: integer (nullable = true)
 |-- epithelial_size: integer (nullable = true)
 |-- bare_nucleoli: string (nullable = true)
 |-- bland_chromatin: integer (nullable = true)
 |-- normal_nucleoli: integer (nullable = true)
 |-- mitoses: integer (nullable = true)
 |-- class: integer (nullable = true)



In [47]:
df3.filter(col("bare_nucleoli") == "?").show(5)

+---------------+---------------+----------------+-----------------+---------------+-------------+---------------+---------------+-------+-----+
|clump_thickness|size_uniformity|shape_uniformity|marginal_adhesion|epithelial_size|bare_nucleoli|bland_chromatin|normal_nucleoli|mitoses|class|
+---------------+---------------+----------------+-----------------+---------------+-------------+---------------+---------------+-------+-----+
|              8|              4|               5|                1|              2|            ?|              7|              3|      1|    1|
|              6|              6|               6|                9|              6|            ?|              7|              8|      1|    0|
|              1|              1|               1|                1|              1|            ?|              2|              1|      1|    0|
|              1|              1|               3|                1|              2|            ?|              2|              1|

In [48]:
df3.filter(col("bare_nucleoli") == "?").count()

16

## Cast columns to double

In [49]:
df3.columns

['clump_thickness',
 'size_uniformity',
 'shape_uniformity',
 'marginal_adhesion',
 'epithelial_size',
 'bare_nucleoli',
 'bland_chromatin',
 'normal_nucleoli',
 'mitoses',
 'class']

In [50]:
df3.select(col('clump_thickness').cast('double')).show()#.collect()

+---------------+
|clump_thickness|
+---------------+
|            5.0|
|            5.0|
|            3.0|
|            6.0|
|            4.0|
|            8.0|
|            1.0|
|            2.0|
|            2.0|
|            4.0|
|            1.0|
|            2.0|
|            5.0|
|            1.0|
|            8.0|
|            7.0|
|            4.0|
|            4.0|
|           10.0|
|            6.0|
+---------------+
only showing top 20 rows



In [51]:
CD = []
for name in df3.columns:
    CD.append(col(name).cast('double'))

In [52]:
df4 = df3.select(*CD)

In [53]:
df4.show(5)

+---------------+---------------+----------------+-----------------+---------------+-------------+---------------+---------------+-------+-----+
|clump_thickness|size_uniformity|shape_uniformity|marginal_adhesion|epithelial_size|bare_nucleoli|bland_chromatin|normal_nucleoli|mitoses|class|
+---------------+---------------+----------------+-----------------+---------------+-------------+---------------+---------------+-------+-----+
|            5.0|            1.0|             1.0|              1.0|            2.0|          1.0|            3.0|            1.0|    1.0|  0.0|
|            5.0|            4.0|             4.0|              5.0|            7.0|         10.0|            3.0|            2.0|    1.0|  0.0|
|            3.0|            1.0|             1.0|              1.0|            2.0|          2.0|            3.0|            1.0|    1.0|  0.0|
|            6.0|            8.0|             8.0|              1.0|            3.0|          4.0|            3.0|            7.0|

In [54]:
df4.printSchema()

root
 |-- clump_thickness: double (nullable = true)
 |-- size_uniformity: double (nullable = true)
 |-- shape_uniformity: double (nullable = true)
 |-- marginal_adhesion: double (nullable = true)
 |-- epithelial_size: double (nullable = true)
 |-- bare_nucleoli: double (nullable = true)
 |-- bland_chromatin: double (nullable = true)
 |-- normal_nucleoli: double (nullable = true)
 |-- mitoses: double (nullable = true)
 |-- class: double (nullable = true)



## Replace NaNs with the mean

In [55]:
df4.filter(col("bare_nucleoli").isNull()).count()

16

In [56]:
M = df4.select(mean("bare_nucleoli")).collect()[0][0] #take(1)[0][0]
M

3.5446559297218156

In [57]:
df5 = df4.na.fill(M)

In [58]:
df5.filter(col("bare_nucleoli").isNull()).count()

0