In [1]:
import org.apache.spark.sql.functions._

## Load dataset 1

In [2]:
val df1 = spark.read.
  format("csv").
  option("header", "true").
  option("inferschema", "true").
  option("delimiter",";").
  load("../Datasets/Winequality_red.csv")

df1 = [fixed acidity: double, volatile acidity: double ... 10 more fields]


[fixed acidity: double, volatile acidity: double ... 10 more fields]

## Explore dataset 1

In [3]:
df1.show(1)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
only showing top 1 row



In [4]:
df1.printSchema

root
 |-- fixed acidity: double (nullable = true)
 |-- volatile acidity: double (nullable = true)
 |-- citric acid: double (nullable = true)
 |-- residual sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free sulfur dioxide: double (nullable = true)
 |-- total sulfur dioxide: double (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: integer (nullable = true)



In [5]:
df1.columns

[fixed acidity, volatile acidity, citric acid, residual sugar, chlorides, free sulfur dioxide, total sulfur dioxide, density, pH, sulphates, alcohol, quality]

In [6]:
df1.columns.length

12

In [7]:
df1.count

1599

In [8]:
df1.describe("quality").show()

+-------+------------------+
|summary|           quality|
+-------+------------------+
|  count|              1599|
|   mean|5.6360225140712945|
| stddev|0.8075694397347051|
|    min|                 3|
|    max|                 8|
+-------+------------------+



In [9]:
df1.select(mean("quality"),max("quality"),min("quality")).show()

+------------------+------------+------------+
|      avg(quality)|max(quality)|min(quality)|
+------------------+------------+------------+
|5.6360225140712945|           8|           3|
+------------------+------------+------------+



In [10]:
df1.select("quality").distinct.sort("quality").show()//collect

|quality|
+-------+
|      3|
|      4|
|      5|
|      6|
|      7|
|      8|
+-------+



In [11]:
df1.groupBy("quality").count.sort("quality").show()

+-------+-----+
|quality|count|
+-------+-----+
|      3|   10|
|      4|   53|
|      5|  681|
|      6|  638|
|      7|  199|
|      8|   18|
+-------+-----+



In [12]:
df1.select("quality").filter(col("quality")>=5).count

1536

In [13]:
var L = List.empty[(String,Double)]

for(cn <- df1.columns) L = L :+ (cn,df1.stat.corr("quality", cn))

val df_corr = L.toDF("colname","correlation")
df_corr.show()

+--------------------+--------------------+
|             colname|         correlation|
+--------------------+--------------------+
|       fixed acidity| 0.12405164911322263|
|    volatile acidity| -0.3905577802640061|
|         citric acid| 0.22637251431804048|
|      residual sugar|0.013731637340065798|
|           chlorides|-0.12890655993005293|
| free sulfur dioxide|-0.05065605724427597|
|total sulfur dioxide|-0.18510028892653774|
|             density|-0.17491922778336474|
|                  pH| -0.0577313912053826|
|           sulphates| 0.25139707906925995|
|             alcohol|  0.4761663240011364|
|             quality|                 1.0|
+--------------------+--------------------+



L = List((fixed acidity,0.12405164911322263), (volatile acidity,-0.3905577802640061), (citric acid,0.22637251431804048), (residual sugar,0.013731637340065798), (chlorides,-0.12890655993005293), (free sulfur dioxide,-0.05065605724427597), (total sulfur dioxide,-0.18510028892653774), (density,-0.17491922778336474), (pH,-0.0577313912053826), (sulphates,0.25139707906925995), (alcohol,0.4761663240011364), (quality,1.0))
df_corr = [colname: string, correlation: double]


[colname: string, correlation: double]

In [14]:
df_corr.sort("correlation").show()

+--------------------+--------------------+
|             colname|         correlation|
+--------------------+--------------------+
|    volatile acidity| -0.3905577802640061|
|total sulfur dioxide|-0.18510028892653774|
|             density|-0.17491922778336474|
|           chlorides|-0.12890655993005293|
|                  pH| -0.0577313912053826|
| free sulfur dioxide|-0.05065605724427597|
|      residual sugar|0.013731637340065798|
|       fixed acidity| 0.12405164911322263|
|         citric acid| 0.22637251431804048|
|           sulphates| 0.25139707906925995|
|             alcohol|  0.4761663240011364|
|             quality|                 1.0|
+--------------------+--------------------+



In [15]:
df_corr.select(col("colname"),abs(col("correlation"))).sort(col("abs(correlation)").desc).show()

+--------------------+--------------------+
|             colname|    abs(correlation)|
+--------------------+--------------------+
|             quality|                 1.0|
|             alcohol|  0.4761663240011364|
|    volatile acidity|  0.3905577802640061|
|           sulphates| 0.25139707906925995|
|         citric acid| 0.22637251431804048|
|total sulfur dioxide| 0.18510028892653774|
|             density| 0.17491922778336474|
|           chlorides| 0.12890655993005293|
|       fixed acidity| 0.12405164911322263|
|                  pH|  0.0577313912053826|
| free sulfur dioxide| 0.05065605724427597|
|      residual sugar|0.013731637340065798|
+--------------------+--------------------+



## Load dataset 2

In [16]:
val df2 = spark.read.
  format("csv").
  option("header", "true").
  option("inferschema", "true").
  option("delimiter",";").
  load("../Datasets/Winequality_white.csv")

df2 = [fixed acidity: double, volatile acidity: double ... 10 more fields]


[fixed acidity: double, volatile acidity: double ... 10 more fields]

## Explore dataset 2

In [17]:
df2.show(1)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+---+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density| pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+---+---------+-------+-------+
|          7.0|            0.27|       0.36|          20.7|    0.045|               45.0|               170.0|  1.001|3.0|     0.45|    8.8|      6|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+---+---------+-------+-------+
only showing top 1 row



In [18]:
df2.printSchema

root
 |-- fixed acidity: double (nullable = true)
 |-- volatile acidity: double (nullable = true)
 |-- citric acid: double (nullable = true)
 |-- residual sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free sulfur dioxide: double (nullable = true)
 |-- total sulfur dioxide: double (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: integer (nullable = true)



In [19]:
df2.columns.length

12

In [20]:
df2.count

4898

In [21]:
df2.groupBy("quality").count.sort("quality").show()

+-------+-----+
|quality|count|
+-------+-----+
|      3|   20|
|      4|  163|
|      5| 1457|
|      6| 2198|
|      7|  880|
|      8|  175|
|      9|    5|
+-------+-----+



## Concatenate the two datasets

In [22]:
val df = df1.unionAll(df2)
df.show(10)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|
|          7.8|            0.88|        0.0|           2.6|    0.098|               25.0|                67.0| 0.9968| 3.2|     0.68|    9.8|      5|
|          7.8|            0.76|       0.04|           2.3|    0.092|               15.0|                54.0|  0.997|3.26|     0.65|    9.8|      5|
|         11.2|            0.28|       0.56|           1.9|    0.075|               17.0|           

df = [fixed acidity: double, volatile acidity: double ... 10 more fields]




[fixed acidity: double, volatile acidity: double ... 10 more fields]

## Repartition and save the new dataset

In [23]:
df.repartition(1).write.
    format("csv").
    option("header", "true").
    mode("overwrite").
    save("wine-data")

In [24]:
df.count

6497

## Merge the two dataset

In [25]:
val Columns_1 = df1.columns.map(name => df1(name).as(name+"_1"))
val F1 = df1.select(Columns_1 : _*)

Columns_1 = Array(fixed acidity AS `fixed acidity_1`, volatile acidity AS `volatile acidity_1`, citric acid AS `citric acid_1`, residual sugar AS `residual sugar_1`, chlorides AS `chlorides_1`, free sulfur dioxide AS `free sulfur dioxide_1`, total sulfur dioxide AS `total sulfur dioxide_1`, density AS `density_1`, pH AS `pH_1`, sulphates AS `sulphates_1`, alcohol AS `alcohol_1`, quality AS `quality_1`)
F1 = [fixed acidity_1: double, volatile acidity_1: double ... 10 more fields]


[fixed acidity_1: double, volatile acidity_1: double ... 10 more fields]

In [26]:
val Columns_2 = df2.columns.map(name => df2(name).as(name+"_2"))
val F2 = df2.select(Columns_2 : _*)

Columns_2 = Array(fixed acidity AS `fixed acidity_2`, volatile acidity AS `volatile acidity_2`, citric acid AS `citric acid_2`, residual sugar AS `residual sugar_2`, chlorides AS `chlorides_2`, free sulfur dioxide AS `free sulfur dioxide_2`, total sulfur dioxide AS `total sulfur dioxide_2`, density AS `density_2`, pH AS `pH_2`, sulphates AS `sulphates_2`, alcohol AS `alcohol_2`, quality AS `quality_2`)
F2 = [fixed acidity_2: double, volatile acidity_2: double ... 10 more fields]


[fixed acidity_2: double, volatile acidity_2: double ... 10 more fields]

In [27]:
val dfm = F1.join(F2, F1("quality_1")=== F2("quality_2"))
dfm.show(10)

+---------------+------------------+-------------+----------------+-----------+---------------------+----------------------+---------+----+-----------+---------+---------+---------------+------------------+-------------+----------------+-----------+---------------------+----------------------+---------+----+-----------+----------------+---------+
|fixed acidity_1|volatile acidity_1|citric acid_1|residual sugar_1|chlorides_1|free sulfur dioxide_1|total sulfur dioxide_1|density_1|pH_1|sulphates_1|alcohol_1|quality_1|fixed acidity_2|volatile acidity_2|citric acid_2|residual sugar_2|chlorides_2|free sulfur dioxide_2|total sulfur dioxide_2|density_2|pH_2|sulphates_2|       alcohol_2|quality_2|
+---------------+------------------+-------------+----------------+-----------+---------------------+----------------------+---------+----+-----------+---------+---------+---------------+------------------+-------------+----------------+-----------+---------------------+----------------------+--------

dfm = [fixed acidity_1: double, volatile acidity_1: double ... 22 more fields]


[fixed acidity_1: double, volatile acidity_1: double ... 22 more fields]

In [28]:
dfm.count

2581650

## Load dataset 3

In [29]:
val df3 = spark.read.
  format("csv").
  option("header", "true").
  option("inferschema", "true").
  option("delimiter",",").
  load("../Datasets/Breast_cancer_wisconsin.csv")

df3 = [clump_thickness: int, size_uniformity: int ... 8 more fields]


[clump_thickness: int, size_uniformity: int ... 8 more fields]

In [30]:
df3.show()

+---------------+---------------+----------------+-----------------+---------------+-------------+---------------+---------------+-------+-----+
|clump_thickness|size_uniformity|shape_uniformity|marginal_adhesion|epithelial_size|bare_nucleoli|bland_chromatin|normal_nucleoli|mitoses|class|
+---------------+---------------+----------------+-----------------+---------------+-------------+---------------+---------------+-------+-----+
|              5|              1|               1|                1|              2|            1|              3|              1|      1|    0|
|              5|              4|               4|                5|              7|           10|              3|              2|      1|    0|
|              3|              1|               1|                1|              2|            2|              3|              1|      1|    0|
|              6|              8|               8|                1|              3|            4|              3|              7|

In [31]:
//df.select(df("color"), df("color").isin("green")).show()
df3.printSchema

root
 |-- clump_thickness: integer (nullable = true)
 |-- size_uniformity: integer (nullable = true)
 |-- shape_uniformity: integer (nullable = true)
 |-- marginal_adhesion: integer (nullable = true)
 |-- epithelial_size: integer (nullable = true)
 |-- bare_nucleoli: string (nullable = true)
 |-- bland_chromatin: integer (nullable = true)
 |-- normal_nucleoli: integer (nullable = true)
 |-- mitoses: integer (nullable = true)
 |-- class: integer (nullable = true)



In [32]:
df3.filter(df3("bare_nucleoli") === "?").show()

+---------------+---------------+----------------+-----------------+---------------+-------------+---------------+---------------+-------+-----+
|clump_thickness|size_uniformity|shape_uniformity|marginal_adhesion|epithelial_size|bare_nucleoli|bland_chromatin|normal_nucleoli|mitoses|class|
+---------------+---------------+----------------+-----------------+---------------+-------------+---------------+---------------+-------+-----+
|              8|              4|               5|                1|              2|            ?|              7|              3|      1|    1|
|              6|              6|               6|                9|              6|            ?|              7|              8|      1|    0|
|              1|              1|               1|                1|              1|            ?|              2|              1|      1|    0|
|              1|              1|               3|                1|              2|            ?|              2|              1|

In [33]:
df3.filter(df3("bare_nucleoli") === "?").count

16

## Cast columns to double

In [34]:
val Columns_cast = df3.columns.map(name => df3(name).cast("double"))
val df4 = df3.select(Columns_cast : _*)

Columns_cast = Array(CAST(clump_thickness AS DOUBLE), CAST(size_uniformity AS DOUBLE), CAST(shape_uniformity AS DOUBLE), CAST(marginal_adhesion AS DOUBLE), CAST(epithelial_size AS DOUBLE), CAST(bare_nucleoli AS DOUBLE), CAST(bland_chromatin AS DOUBLE), CAST(normal_nucleoli AS DOUBLE), CAST(mitoses AS DOUBLE), CAST(class AS DOUBLE))
df4 = [clump_thickness: double, size_uniformity: double ... 8 more fields]


[clump_thickness: double, size_uniformity: double ... 8 more fields]

In [35]:
df4.show()

+---------------+---------------+----------------+-----------------+---------------+-------------+---------------+---------------+-------+-----+
|clump_thickness|size_uniformity|shape_uniformity|marginal_adhesion|epithelial_size|bare_nucleoli|bland_chromatin|normal_nucleoli|mitoses|class|
+---------------+---------------+----------------+-----------------+---------------+-------------+---------------+---------------+-------+-----+
|            5.0|            1.0|             1.0|              1.0|            2.0|          1.0|            3.0|            1.0|    1.0|  0.0|
|            5.0|            4.0|             4.0|              5.0|            7.0|         10.0|            3.0|            2.0|    1.0|  0.0|
|            3.0|            1.0|             1.0|              1.0|            2.0|          2.0|            3.0|            1.0|    1.0|  0.0|
|            6.0|            8.0|             8.0|              1.0|            3.0|          4.0|            3.0|            7.0|

In [36]:
df4.printSchema

root
 |-- clump_thickness: double (nullable = true)
 |-- size_uniformity: double (nullable = true)
 |-- shape_uniformity: double (nullable = true)
 |-- marginal_adhesion: double (nullable = true)
 |-- epithelial_size: double (nullable = true)
 |-- bare_nucleoli: double (nullable = true)
 |-- bland_chromatin: double (nullable = true)
 |-- normal_nucleoli: double (nullable = true)
 |-- mitoses: double (nullable = true)
 |-- class: double (nullable = true)



## Replace NaNs with the mean

In [37]:
val M = df4.select(mean("bare_nucleoli")).as[Double].collect
M

M = Array(3.5446559297218156)


[3.5446559297218156]

In [38]:
df4.na.fill(M(0)).show()

+---------------+---------------+----------------+-----------------+---------------+-------------+---------------+---------------+-------+-----+
|clump_thickness|size_uniformity|shape_uniformity|marginal_adhesion|epithelial_size|bare_nucleoli|bland_chromatin|normal_nucleoli|mitoses|class|
+---------------+---------------+----------------+-----------------+---------------+-------------+---------------+---------------+-------+-----+
|            5.0|            1.0|             1.0|              1.0|            2.0|          1.0|            3.0|            1.0|    1.0|  0.0|
|            5.0|            4.0|             4.0|              5.0|            7.0|         10.0|            3.0|            2.0|    1.0|  0.0|
|            3.0|            1.0|             1.0|              1.0|            2.0|          2.0|            3.0|            1.0|    1.0|  0.0|
|            6.0|            8.0|             8.0|              1.0|            3.0|          4.0|            3.0|            7.0|