# IST 718: Big Data Analytics

- Professor: Daniel Acuna <deacuna@syr.edu>
- TAs: Tong Zeng <tozeng@syr.edu>, Priya Matnani <psmatnan@syr.edu>

## General instructions:

- You are welcome to discuss the problems with your classmates but __you are not allowed to copy any part of your answers either from your classmates or from the internet__
- You can put the homework files anywhere you want in your http://notebook.acuna.io workspace but _do not change_ the file names. The TAs and the professor use these names to grade your homework.
- Remove or comment out code that contains `raise NotImplementedError`. This is mainly to make the `assert` statement fail if nothing is submitted.
- The tests shown in some cells (i.e., `assert` and `np.testing.` statements) are used to grade your answers. **However, the professor and TAs will use __additional__ test for your answer. Think about cases where your code should run even if it passess all the tests you see.**
- Before downloading and submitting your work through Blackboard, remember to save and press `Validate` (or go to 
`Kernel`$\rightarrow$`Restart and Run All`). 
- Good luck!

In [49]:
# load these packages
from pyspark.ml import feature
from pyspark.ml import clustering
from pyspark.ml import Pipeline, PipelineModel
from pyspark.sql import functions as fn
import numpy as np
from pyspark.sql import SparkSession, types
from pyspark.ml import feature, regression, evaluation, Pipeline
from pyspark.sql import functions as fn, Row
import matplotlib.pyplot as plt
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
import pandas as pd

# Part 2. Clustering and Latent Dirichlet Allocation

I would recommend to follow the notebook `unsupervised_learning.ipynb` first, shared through the IST 718 repository.

The following dataset contains information about the diet of European contries around 1970

In [50]:
protein_df = spark.read.csv('proteindata.txt', sep='\t', inferSchema=True, header=True)

## Question 1: (10 pts)

Performed a PCA transformation on the `protein_df` dataframe, using all features. Use two principal components. The pipeline transformation should have a standard scaler step where you only center the data before you pass it onto the PCA transformation. Store the pipeline transformation in `pipe_pca`. The transformation should produce a column named `pc` which has the two principal components requested.

In [51]:
# (10 pts) Create ds_programs_text_df here

pipe_pca = Pipeline(stages=[
    feature.VectorAssembler(inputCols=['RedMeat','WhiteMeat', 'Eggs','Milk','Fish','Cereals','Starch','Nuts','FruitVeg'],
                            outputCol='features')
    ,
    feature.StandardScaler(withMean=True,withStd=False,
                           inputCol='features', outputCol='centered_features')
    ,
    feature.PCA(k=2, inputCol='centered_features', outputCol='pc')
]).fit(protein_df)

In [52]:
#protein_df
pipe_pca.transform(protein_df).printSchema()

root
 |-- Country: string (nullable = true)
 |-- RedMeat: double (nullable = true)
 |-- WhiteMeat: double (nullable = true)
 |-- Eggs: double (nullable = true)
 |-- Milk: double (nullable = true)
 |-- Fish: double (nullable = true)
 |-- Cereals: double (nullable = true)
 |-- Starch: double (nullable = true)
 |-- Nuts: double (nullable = true)
 |-- FruitVeg: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- centered_features: vector (nullable = true)
 |-- pc: vector (nullable = true)



This is the schema of how the transformation should look like:

```python
pipe_pca.transform(protein_df).printSchema()
```

```console
root
 |-- Country: string (nullable = true)
 |-- RedMeat: double (nullable = true)
 |-- WhiteMeat: double (nullable = true)
 |-- Eggs: double (nullable = true)
 |-- Milk: double (nullable = true)
 |-- Fish: double (nullable = true)
 |-- Cereals: double (nullable = true)
 |-- Starch: double (nullable = true)
 |-- Nuts: double (nullable = true)
 |-- FruitVeg: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- centered_features: vector (nullable = true)
 |-- pc: vector (nullable = true)```

In [53]:
# (10 pts)
np.testing.assert_equal(pipe_pca.transform(protein_df).count(), 25)
np.testing.assert_equal(type(pipe_pca), PipelineModel)
np.testing.assert_equal(len(pipe_pca.transform(protein_df).first().pc), 2)

## Question 2 (10 pts)

Extract the absolute loadings from the PCA transformation in the pipeline and the appropriate feature names from the vector assembler. For each principal component, extract the top feature---the biggest absoluate loadings. Comment on what principal component 1 vs principal component 2 mean based on these top features.

In [54]:
# YOUR CODE HERE
principal_components = pipe_pca.stages[-1].pc.toArray()
principal_components

x = list(zip(['RedMeat','WhiteMeat', 'Eggs','Milk','Fish','Cereals','Starch','Nuts','FruitVeg'], 
         principal_components[:, 0], principal_components[:, 1]))
pc_loadings = pd.DataFrame(x, columns=['name','pc1','pc2'])

a = pc_loadings.apply({'name': lambda x: x, 'pc1':np.abs,'pc2': np.abs}, axis=0)
pc_loadings
pc1_pd = a.sort_values('pc1', ascending=False)
pc2_pd = a.sort_values('pc2', ascending=False)
print(pc1_pd.head(1))
print()
print(pc2_pd.head(1))
#Based on two prnicipal components, Cerials is the top feature with largest absoluate loading (0.86) in pc1 
#and Milk is the top feature with largest absoluate loading (0.83) in pc1
# This means that Cerials and Milk can best represent the dataset. 
#This means that people from most of the countries in the data set eat cereals or drink milk most of the times. 

      name       pc1       pc2
5  Cereals  0.860865  0.406169

   name       pc1       pc2
3  Milk  0.425376  0.830856


# Question 3 (10 pts)

Use the following `exploded_df` dataframe to explore which countries are at the extreme the principal component 1. In particular, create a dataframe `smallest_pc1_df` which contains the columns `country`, `pc_1`, and `pc_2` of the country with the smallest `pc_1`. Similarly, create `largest_pc1_df` with the largest `pc_1`.

In [55]:
@fn.udf(returnType=types.FloatType())
def vector_select(vector_col, element):
    return float(vector_col[element])

exploded_df = pipe_pca.transform(protein_df).select('country', 
                                      vector_select('pc', fn.lit(0)).alias('pc_1'),
                                      vector_select('pc', fn.lit(1)).alias('pc_2'))

In [56]:
exploded_df.show()

+---------+-----------+------------+
|  country|       pc_1|        pc_2|
+---------+-----------+------------+
|  Albania|  14.102253|  -1.3218285|
|  Austria| -5.4612746|   1.5477921|
|  Belgium| -6.0766153|  -1.4793622|
| Bulgaria|  26.115946|   3.3188622|
|Czechosl.|   3.317268|  -2.0923328|
|  Denmark|-13.8607855|   1.3738568|
| EGermany| -4.9024506|    -8.35954|
|  Finland| -12.262294|   11.290117|
|   France|  -6.345336|  0.67163473|
|   Greece|   9.036383|   3.0326154|
|  Hungary|  10.804568|  -2.3634377|
|  Ireland| -11.857348|    5.312214|
|    Italy|   6.308735|  -1.3141143|
| Netherl.| -11.808532|    2.132773|
|   Norway| -11.005402|-0.077100314|
|   Poland|  2.5261996|   2.9990442|
| Portugal| 0.78424096|  -16.753153|
|  Romania|  19.067295|   2.5912552|
|    Spain|  1.9230922|  -10.482929|
|   Sweden|-14.8419485|   0.7262486|
+---------+-----------+------------+
only showing top 20 rows



This should contain something similar to this:

```
exploded_df.show()
```

```
+---------+-----------+------------+
|  country|       pc_1|        pc_2|
+---------+-----------+------------+
|  Albania|  14.102253|  -1.3218285|
|  Austria| -5.4612746|   1.5477921|
|  Belgium| -6.0766153|  -1.4793622|
| Bulgaria|  26.115946|   3.3188622|
|Czechosl.|   3.317268|  -2.0923328|
|  Denmark|-13.8607855|   1.3738568|
| EGermany| -4.9024506|    -8.35954|
|  Finland| -12.262294|   11.290117|
|   France|  -6.345336|  0.67163473|
|   Greece|   9.036383|   3.0326154|
|  Hungary|  10.804568|  -2.3634377|
|  Ireland| -11.857348|    5.312214|
|    Italy|   6.308735|  -1.3141143|
| Netherl.| -11.808532|    2.132773|
|   Norway| -11.005402|-0.077100314|
|   Poland|  2.5261996|   2.9990442|
| Portugal| 0.78424096|  -16.753153|
|  Romania|  19.067295|   2.5912552|
|    Spain|  1.9230922|  -10.482929|
|   Sweden|-14.8419485|   0.7262486|
+---------+-----------+------------+
```

In [57]:
# YOUR CODE HERE

largest_pc1_df = spark.createDataFrame(exploded_df.orderBy('pc_1', ascending=False).take(1))
smallest_pc1_df = spark.createDataFrame(exploded_df.orderBy('pc_1', ascending=True).take(1))
#smallest_pc1_df.show()

In [58]:
# (10 pts)
np.testing.assert_equal(smallest_pc1_df.count(), 1)
np.testing.assert_equal(largest_pc1_df.count(), 1)
np.testing.assert_array_less(smallest_pc1_df.first().pc_1, largest_pc1_df.first().pc_1)

## Question 4 (10 pts)

In this question, you will find three clusters of the raw features (without any modification) of `protein_df` using K-means. Do this using a pipeline transformation called `pipe_cluster` which should produce a prediction column called `prediction`. **To reproduce your results, use default parameters of Kmeans and set the `seed` parameter to 0**

Produce a dataframe `protein_clustered_df` with the `pipe_cluster` transformation applied to `protein_df`.

In [59]:
# YOUR CODE HERE
pipe_cluster = Pipeline(stages=[
    feature.VectorAssembler(inputCols=['RedMeat','WhiteMeat', 'Eggs','Milk','Fish','Cereals','Starch','Nuts','FruitVeg'],
                            outputCol='features')
    , clustering.KMeans(k=3, featuresCol='features', predictionCol='prediction', seed=0)]).fit(protein_df)

protein_clustered_df = pipe_cluster.transform(protein_df)
protein_clustered_df.show()

+---------+-------+---------+----+----+----+-------+------+----+--------+--------------------+----------+
|  Country|RedMeat|WhiteMeat|Eggs|Milk|Fish|Cereals|Starch|Nuts|FruitVeg|            features|prediction|
+---------+-------+---------+----+----+----+-------+------+----+--------+--------------------+----------+
|  Albania|   10.1|      1.4| 0.5| 8.9| 0.2|   42.3|   0.6| 5.5|     1.7|[10.1,1.4,0.5,8.9...|         1|
|  Austria|    8.9|     14.0| 4.3|19.9| 2.1|   28.0|   3.6| 1.3|     4.3|[8.9,14.0,4.3,19....|         2|
|  Belgium|   13.5|      9.3| 4.1|17.5| 4.5|   26.6|   5.7| 2.1|     4.0|[13.5,9.3,4.1,17....|         2|
| Bulgaria|    7.8|      6.0| 1.6| 8.3| 1.2|   56.7|   1.1| 3.7|     4.2|[7.8,6.0,1.6,8.3,...|         1|
|Czechosl.|    9.7|     11.4| 2.8|12.5| 2.0|   34.3|   5.0| 1.1|     4.0|[9.7,11.4,2.8,12....|         2|
|  Denmark|   10.6|     10.8| 3.7|25.0| 9.9|   21.9|   4.8| 0.7|     2.4|[10.6,10.8,3.7,25...|         0|
| EGermany|    8.4|     11.6| 3.7|11.1| 5.4|  

In [60]:
# 10 pts
np.testing.assert_equal(type(pipe_cluster), PipelineModel)
np.testing.assert_equal(len(pipe_cluster.stages), 2)
np.testing.assert_equal(protein_clustered_df.count(), 25)
assert 'prediction' in protein_clustered_df.columns

## Question 5 (10 pts)

According to Wikipedia, the Mediterranean diet includes "proportionally high consumption of olive oil, legumes, unrefined cereals, fruits, and vegetables, moderate to high consumption of fish, moderate consumption of dairy products (mostly as cheese and yogurt), moderate wine consumption, and low consumption of non-fish meat products." https://en.wikipedia.org/wiki/Mediterranean_diet

We have some of these countries in our protein dataset: Italy, Spain, Portugal, and Greece.

With code and code comments, test the hypothesis that these countries all belong to the same cluster using the clustering you did in question 4.

In [61]:
# you can use the following list in your code
mediterranean_countries = ['Italy', 'Spain', 'Portugal', 'Greece']

In [63]:
# YOUR CODE HERE
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
spark = SparkSession.builder.appName('spark-intro').getOrCreate()

#select country and prediction from protein_clustered_df and store it into SQL table:

protein_filteredData = protein_clustered_df.select('country','prediction')
#protein_filteredData.show()
sqlContext.registerDataFrameAsTable(protein_filteredData, "Data")

# Convert the list of mediterranean countries into SQL table:
sampleContries_DF = spark.createDataFrame(list(map(lambda x: Row(country=x), mediterranean_countries)))
sqlContext.registerDataFrameAsTable(sampleContries_DF, "sampleCountries")

#Identify the prediction cluster of each mediterranean country by comparing it with protein_clustered_df using SQL join query 
#spark.sql("select * from sampleCountries").show()
output1 = spark.sql("select d.country, d.prediction from Data d join  sampleCountries c on d.country = c.country")
sqlContext.registerDataFrameAsTable(output1, "results")
spark.sql("select * from results").show()

#Display distinct prediction clusters for mediterranean countries
spark.sql("select count(*) as distinct_prediction_count, prediction from results group by prediction").show()

# The countries consist of 2 different clusters i.e. cluster number 1(with Greece) and 2 (with Italy, Spain and Portugal).
#This proves the hypothesis wrong of all the countries being in the same cluster.

+--------+----------+
| country|prediction|
+--------+----------+
|   Italy|         2|
|   Spain|         2|
|Portugal|         2|
|  Greece|         1|
+--------+----------+

+-------------------------+----------+
|distinct_prediction_count|prediction|
+-------------------------+----------+
|                        1|         1|
|                        3|         2|
+-------------------------+----------+

