# CMU Machine Learning with Large Datasets
## Homework 4 - Machine Learning at Scale: Part B

This is a continuation from HW4 Part A. Note that we've included code that does data loading and preparation for you. You could take a brief look to learn about how to specify a schema when loading data, or just run them all and start from "Part B Begins".

Note that we will not be autograding this notebook because of the open-ended nature of it (although you will have to submit this notebook). To make grading easier and to learn about your thought process, throughout the notebook, we include questions you have to anwswer in your writeup. Whenver this happens, there is a ✰ symbol.

### 0. Start a Spark Session and Install Libraries

In [None]:
# You are highly recommended to select the "PySpark" kernel instead of python kernel,
# Otherwise you need to modify this cell to get pyspark working.

spark

sc = spark.sparkContext
print(f'num executors: {sc.getConf().get("spark.executor.instances")}')

Throughout this assignment, you will be generating plots. `Matplotlib` and other useful Python libraries do not come pre-installed on the cluster. Therefore, you will have to ssh into your master node (think about why it should be the master) using your keypair created earlier and install `matplotlib`. You might have to do this later again for other libraries you use, e.g. `Pandas`.

Run the below cell to ensure you installation was successful. If an error occurs, you might want to double check your installation.

In [3]:
import matplotlib.pyplot as plt

### 1. Data Loading and Preparation

Earlier, we have extracted relevant features from and converted format of the full raw Million Song Dataset. We now want to load our converted dataset from the S3 Storage.

Note that although you can load all chunks of the dataset using `*`, we recommend you only load in a subset while developing so that processing takes shorter time when you are just verifying your ideas.

**IMPORTANT**: to make grading consistent and to make those who failed to finish Part A still able to work on the rest of this homework, you are asked to use **our S3 bucket** (instead of the one you created in Part A).

In [4]:
from pyspark.sql.types import DoubleType, StringType, StructField, StructType

schema = StructType([StructField('song_hotttnesss', DoubleType(), True),
                       StructField('artist_familiarity', DoubleType(), True),
                       StructField('artist_hotttnesss', DoubleType(), True),
                       StructField('artist_id', StringType(), True),
                       StructField('artist_latitude', DoubleType(), True),
                       StructField('artist_location', StringType(), True),
                       StructField('artist_longitude', DoubleType(), True),
                       StructField('artist_name', StringType(), True),
                       StructField('title', StringType(), True),
                       StructField('danceability', DoubleType(), True),
                       StructField('duration', DoubleType(), True),
                       StructField('end_of_fade_in', DoubleType(), True),
                       StructField('energy', DoubleType(), True),
                       StructField('key', DoubleType(), True),
                       StructField('key_confidence', DoubleType(), True),
                       StructField('loudness', DoubleType(), True),
                       StructField('mode', DoubleType(), True),
                       StructField('mode_confidence', DoubleType(), True),
                       StructField('start_of_fade_out', DoubleType(), True),
                       StructField('tempo', DoubleType(), True),
                       StructField('time_signature', DoubleType(), True),
                       StructField('time_signature_confidence', DoubleType(), True),
                       StructField('artist_terms', StringType(), True),
                       StructField('artist_terms_freq', StringType(), True),
                       StructField('artist_terms_weight', StringType(), True),
                       StructField('year', DoubleType(), True)]
                   )

In [None]:
load_subset = True
s3_bucket_name = '10605-group37-s3'

if load_subset:
    df = spark.read.format("csv")\
        .option("header", "false")\
        .schema(schema)\
        .load(f"s3://{s3_bucket_name}/processed/A_1.csv")
else:
    df = spark.read.format("csv")\
        .option("header", "false")\
        .schema(schema)\
        .load(f"s3://{s3_bucket_name}/processed/*.csv")

print('loaded {} records'.format(df.count()))

Inspect the `df` we just created by running the below cell:

In [6]:
df.printSchema()

<div align="center">
    <h1>------------------- Part B Begins ------------------- </h1>
</div>

### 2. Exploratory Data Analysis

Now we have our data mostly ready. It's a good time to take some deeper look to better understand what we are dealing with here.

First, show summary statistics of the features using `.summary()`. Hint: because we have many features, the output might be too long for a single line. The simplest way to resolve this mess is to copy paste the output to some editor (e.g. vscode) and check the result. Another way is you could select only a few features to print at a time so that things could fit into one line.

In [9]:
# YOUR CODE HERE

# YOUR CODE HERE

Now looking at the statistics, `danceability` and `energy` appear strange. ✰2.1(a) Explain why these features seem problematic in your writeup.

Now we would like to make some histogram plots to inspect the distribution of feature values. 

✰2.1(b) Plot histograms for `'song_hotttnesss', 'artist_familiarity', 'artist_hotttnesss', 'duration', 'tempo', 'year'`. 

Note that one of these features may appear weird. 

✰2.1(c) Explain what is weird about `year`'s distribution and what might cause this. Describe how you could filter `year` to make its histogram look more balanced. Hint: Choose a threshold and filter the `year` values.

✰2.1(d) Do it and include the new plot of this feature into your writeup.

Note: you may want to use the spark magic commands `%matplot plt` to show the plot. 

In [10]:
# YOUR CODE HERE

# YOUR CODE HERE

Some scatter pair plots would also be interesting to see the correlation between features. ✰2.1(e) Include the plots of the following pairs of features and describe your findings (e.g. what patterns you observe) in your writeup.
- `song_hotttnesss` against `artist_familiarity`
- `artist_latitude` against `artist_longitude`
- `song_hotttnesss` against `year`

Plotting all data points might explode matplotlib. Think about what simple technique you could use to visualize large datasets while retaining data distribution. ✰2.1(f) In your writeup, briefly describe what you did.

In [11]:
# YOUR CODE HERE

# YOUR CODE HERE

### 3. Data Cleaning

From 2, you should have had a basic understanding of the dataset. In this cleaning step, we are dropping `energy` and `danceability` features. 

✰2.2(a) In your writeup, justify why we are doing this.

In [12]:
# YOUR CODE HERE

# YOUR CODE HERE

For simplicity, we are dropping `year` values that are less or equal to 1920. Print out the number of samples before and after dropping these rows. 

✰2.2(b) In your writeup, compare these two numbers and explain the advantages and potential problem of doing this step. What other techniques could you use to potentially do better?

In [13]:
# YOUR CODE HERE

# YOUR CODE HERE

Now, let's do an NaN/null check to see if there are other problematic features. Print the number of entries that contain NaN/null for each feature. 

Hint: use pyspark sql `count(when(isnan(...)))` to count total NaN entries for a feature. Replace `isnan` with `isnull` to count null entries.

In [14]:
# count na
from pyspark.sql.functions import isnan, when, count, isnull

# YOUR CODE HERE

# YOUR CODE HERE

We should see two features that contain (a lot of) `null` in them. ✰2.2(c) Which two are they? 

For simplicity, we are dropping all records with `null` in these two features. Note that this drops a significant proportion of our dataset. Since we have a lot of data to fit some rather simple models, this might be ok. 

However, do note that this is not usually desirable in the real-word because we always want to retain as much data as possible, especially when training larger, more complex models. ✰2.2(d) In your writeup, explain what possible techniques could you employ to deal with this situation. Discuss the pros and cons of your proposed solution.

In [15]:
# YOUR CODE HERE

# YOUR CODE HERE

✰2.2(e) Finally in your writeup, report the percentage of records that survived our very aggressive data cleaning.

In [16]:
# counting code


### 4. Baseline

Now we have the data (almost) ready to do some preliminary modeling! 

We will be treating this problem as a classification problem, i.e. given some features, predict whether a song will be popular. We define a song to be popular if its `song_hotttnesss` value is above average. 

✰2.3(a) Explain in your writeup why treating this as a classification problem (instead of a regression problem) might be a sensible choice. 

Then, assign labels to the dataset with the above definition of "popularity".

In [17]:
from pyspark.sql.functions import col, expr, when

# assign labels
# YOUR CODE HERE

# YOUR CODE HERE

Now let's take a look at how balanced the two classes are. ✰2.3(b) Report what percentage of songs are assigned the "popular" label.

In [18]:
# YOUR CODE HERE

# YOUR CODE HERE

As how we've been dealing with the `year` feature in earlier assignments, we will shift the feature so that it starts from 0.

✰2.3(c) Explain why we want to do this.

In [19]:
# shift years
# YOUR CODE HERE

# YOUR CODE HERE

Finally, let's concatenate all features (using VectorAssembler) into a feature vector and scale it. 

✰2.3(d) Explain what scaling means and why we want to perform scaling before the learning step.

Note: we will only be using the numeric features for now. Excluding `song_hotttnesss` and the two features we dropped in the data cleaning step, there should be 19-3=16 of them.

In [20]:
# assembling vector

from pyspark.ml.feature import VectorAssembler

# YOUR CODE HERE

# YOUR CODE HERE

In [21]:
# scaling

from pyspark.ml.feature import StandardScaler

# YOUR CODE HERE

# YOUR CODE HERE

Now let's verify the resulting feature vector is of expected length 16.

In [22]:
# change your variable/column names accordingly
l = df_features.select('features').take(1)[0].features
print('vector len:',len(l))
assert len(l) == 16

Now, we are really ready to fit some models. 

First, do a train-test split on the dataset, with test ratio 0.2 and seed 10605.

In [23]:
# YOUR CODE HERE

# YOUR CODE HERE

We have to select a metric to evaluate our models on. For classification, potential choices include accuracy and AUC. ✰2.3(e) In your writeup, explain the difference between these two metrics and when AUC might be more useful than accuracy.

We will be going with AUC here. Instantiate an AUC Evaluator in the following cell.

In [24]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# YOUR CODE HERE

# YOUR CODE HERE

We will be fitting two models, logistic regression, and random forest, in the mandatory part. You have the chance go with fancier models in the last section to achieve higher accuracy to earn additional points.

Train a LR and a RF model with default hyperparameters. ✰2.3(f) Calculate the train and test AUC of both models and report them in your writeup.

In [25]:
# Logistic Regression
from pyspark.ml.classification import LogisticRegression

# YOUR CODE HERE

# YOUR CODE HERE

In [26]:
# Random Forest

from pyspark.ml.classification import RandomForestClassifier

# YOUR CODE HERE

# YOUR CODE HERE

### 5. Featurization: Bag-of-Words and TF-IDF

In our list of features, we have two features that contain text data, namely `title` and `artist_terms`. In the entity resolution task of HW1, we've seen how TF-IDF could help us featurize textual data. Let's do that here as well. [Bag-of-Words](https://en.wikipedia.org/wiki/Bag-of-words_model) is yet another technique (arguably simpler than TF-IDF) to featurize text features.


To get yourself familiar with both in Spark, let's treat titles and artist terms as mini documents and compute TF-IDF for `title` and BoW for `artist_terms`.

In [27]:
# tf-idf on title

from pyspark.ml.feature import Tokenizer, HashingTF, IDF

TF_IDF_NUM_FEATS = 5 # HashingTF(..., numFeatures=TF_IDF_NUM_FEATS)

# YOUR CODE HERE

# YOUR CODE HERE

Verify the resulting TF-IDF feature is indeed a vector of length `TF_IDF_NUM_FEATS`.

In [28]:
# change variable/column name to yours
col_name = 'title_features'
head = df_tf_idf.select(col_name).head() 
print(head)
assert type(head[col_name]) == pyspark.ml.linalg.SparseVector
assert len(head[col_name]) == TF_IDF_NUM_FEATS

Recall that `artist_terms` should be of type string array but we have not yet interpreted from a string literal. Let's do that now. 

Hint: start by defining a UDF to convert a single string literal to type `T.ArrayType(T.StringType())`.

In [29]:
from pyspark.sql.functions import udf
import pyspark.sql.types as T

# YOUR CODE HERE

# YOUR CODE HERE

Let's do a quick check things indeed worked out.

In [30]:
# change to your variable/column names
col_name = 'artist_terms_arr'
head = df_tags_arr.select(col_name).head()
print(head)
assert type(head[col_name]) == list

Now perform BoW on the array feature you just created.

✰2.4(a) In your writeup, explain what the `vocabSize` hyperparameter means in the context of Bag-of-Words. Remember you can tune this later in the last section.

In [31]:
# BoW on artist_terms

from pyspark.ml.feature import CountVectorizer

BOW_VOCAB_SIZE = 10 
BOW_MIN_DF = 2
# CountVectorizer(..., vocabSize=BOW_VOCAB_SIZE, minDF=BOW_MIN_DF)

# YOUR CODE HERE

# YOUR CODE HERE

Verify the result of BoW is indeed a vector of length `BOW_VOCAB_SIZE`.

In [32]:
# change variable/column name to yours
col_name = 'artist_terms_bow'
head = df_final.select(col_name).head() 
print(head)
assert type(head[col_name]) == pyspark.ml.linalg.SparseVector
assert len(head[col_name]) == BOW_VOCAB_SIZE

✰2.4(b) Other than featurizing texts, what other feature engineering would you do on the dataset? Briefly describe one in your writeup. 

Now with the new feature columns ready, let's assemble and scale our features once again as we did to prepare for training. 

This time, we should have 16+2=18 features with the two being TF-IDF and BoW features we just created. The total length of the resulting feature vector should be 31. ✰2.4(c) Explain where this number (31) comes from in your writeup.

In [33]:
# assembling vector

# YOUR CODE HERE

# YOUR CODE HERE

In [34]:
# scaling

# YOUR CODE HERE

# YOUR CODE HERE
df_scaled_features.toPandas().head()

Now let's verify the feature vector is of expected length 31.

In [35]:
# change your variable/column names accordingly
l = df_scaled_features.select('features').take(1)[0].features
print('vector len:',len(l))
assert len(l) == 31

### 6. Modeling with New Features

Now let's fit LR and RF on our new data. As before, first do a train-test split with test ratio 0.2 and seed 10605.

In [36]:
# YOUR CODE HERE

# YOUR CODE HERE

Now train the LR and RF model with default hyperparameters. ✰2.5(a) Evaluate train and test AUC for each model and report them in your writeup.

In [37]:
# Logistic Regression

# YOUR CODE HERE

# YOUR CODE HERE

In [38]:
# Random Forest

# YOUR CODE HERE

# YOUR CODE HERE

Recall that AUC is the area under the ROC curve. Now, plot the ROC curves for the four models (including two baselines) in **one single plot**. 

✰2.5(b) In your writeup:
- Include the plot (with legends)
- Explain how the ROC curve is derived and what it measures
- Explain, from the ROC curves, how do you discover which models are performing better than others, and in this case, which model performs the best.

In [39]:
# YOUR CODE HERE

# YOUR CODE HERE

### 7. Do Your Best

In all previous sections in this assignment and other assignments so far, we have almost specified everything you should do. You might be (and should be!) bored by now. This section gives you a chance to do whatever you want to improve the prediction AUC. 

You can do better data preprocessing, feature engineering, fit fancier models, perform hyperparameter tuning, etc. 

After you are satisfied with your model, ✰2.6 in your writeup, report 
- the hyperparameters,
- train and test AUC of your optimized model, and 
- the approach you took on top of the specified instructions to obtain this better result.

In [40]:
# have fun!


### 8. Don't forget to answer the reflection question on the writeup! ✰2.7