# CMU 17-400/17-700 auto-graded notebook

Before you turn this assignment in, make sure everything runs as expected. First, **restart the kernel** (in the menubar, select Kernel$\rightarrow$Restart) and then **run all cells** (in the menubar, select Cell$\rightarrow$Run All).

Make sure you fill in any place that says `YOUR CODE HERE` or "YOUR ANSWER HERE."

---

# Homework 3: Logistic Regression & Gradient Descent Optimization


In [None]:
# Who did you collaborate with on this assignment? 
# if no one, collaborators should contain an empty string,
# else list your collaborators below

# collaborators = [""]
# YOUR CODE HERE
raise NotImplementedError()

In [None]:
try:
    collaborators
except:
    raise AssertionError("you did not list your collaborators, if any")

# Click-Through Rate Prediction
In this section you will go through the steps for creating a click-through rate (CTR) prediction pipeline. You will work with the Criteo Labs dataset.

## This section will cover:

* *Part 1:* Featurize categorical data using one-hot-encoding (OHE)

* *Part 2:* Construct an OHE dictionary

* *Part 3:* Parse CTR data and generate OHE features
 * *Visualization 1:* Feature frequency

* *Part 4:* CTR prediction and logloss evaluation
 * *Visualization 2:* ROC curve

* *Part 5:* Reduce feature dimension via feature hashing

> Note that, for reference, you can look up the details of:
> * the relevant Spark methods in [PySpark's DataFrame API](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.DataFrame)
> * the relevant NumPy methods in the [NumPy Reference](http://docs.scipy.org/doc/numpy/reference/index.html)

In [None]:
from nose.tools import assert_equal, assert_true

In [None]:
# YOU CAN MOST LIKELY IGNORE THIS CELL. This is only of use for running this notebook locally.

# THIS CELL DOES NOT NEED TO BE RUN ON DATABRICKS. 
# Note that Databricks already creates a SparkContext for you, so this cell can be skipped.
import findspark
findspark.init()
import pyspark
from pyspark.sql import SQLContext
sc = pyspark.SparkContext(appName="hw")
sqlContext = SQLContext(sc)

print("spark context started")

## Part 1: Featurize categorical data using one-hot-encoding

### (1a) One-hot-encoding

We would like to develop code to convert categorical features to numerical ones. In order to build intuition, we will work with a unlabeled dataset with three data points, with each data point representing an animal. The first feature indicates the type of animal (bear, cat, mouse); the second feature describes the animal's color (black, tabby); and the third (optional) feature describes what the animal eats (mouse, salmon).

In a one-hot-encoding (OHE) scheme, we want to represent each tuple of `(featureID, category)` via its own binary feature.  We can do this in Python by creating a dictionary that maps each tuple to a distinct integer, where the integer corresponds to a binary feature. To start, manually enter the entries in the OHE dictionary associated with the sample dataset by mapping the tuples to consecutive integers starting from zero,  ordering the tuples first by featureID and next by category.

Later in this lab, we'll use OHE dictionaries to transform data points into compact lists of features that can be used in machine learning algorithms.

In [None]:
# By default, when a shuffle operation occurs with DataFrames, the post-shuffle partition
# count is 200. This is controlled by Spark configuration value spark.sql.shuffle.partitions.
# 200 is a little too high for this data set, so we set the post-shuffle partition count to
# twice the number of available threads in Community Edition.
sqlContext.setConf('spark.sql.shuffle.partitions', '6')  # Set default partitions for DataFrame operations

In [None]:
from collections import defaultdict
# Data for manual OHE
# Note: the first data point does not include any value for the optional third feature
sample_one = [(0, 'mouse'), (1, 'black')]
sample_two = [(0, 'cat'), (1, 'tabby'), (2, 'mouse')]
sample_three =  [(0, 'bear'), (1, 'black'), (2, 'salmon')]

def sample_to_row(sample):
    tmp_dict = defaultdict(lambda: None)
    tmp_dict.update(sample)
    return [tmp_dict[i] for i in range(3)]

sqlContext.createDataFrame(map(sample_to_row, [sample_one, sample_two, sample_three]),
                           ['animal', 'color', 'food']).show()
sample_data_df = sqlContext.createDataFrame([(sample_one,), (sample_two,), (sample_three,)], ['features'])
sample_data_df.show(truncate=False)

In [None]:
# # TODO: Replace <FILL IN> with appropriate code
# sample_ohe_dict_manual = {}
# sample_ohe_dict_manual[(0, 'bear')] = <FILL IN >
# sample_ohe_dict_manual[(0, 'cat')] = <FILL IN >
# sample_ohe_dict_manual[(0, 'mouse')] = <FILL IN >
# sample_ohe_dict_manual < FILL IN >
# sample_ohe_dict_manual < FILL IN >
# sample_ohe_dict_manual < FILL IN >
# sample_ohe_dict_manual < FILL IN >

# YOUR CODE HERE
raise NotImplementedError()

In [None]:
# TEST One-hot-encoding (1a)
assert_equal(sample_ohe_dict_manual[(0, 'bear')], 0)
assert_equal(sample_ohe_dict_manual[(0, 'cat')], 1)
assert_equal(sample_ohe_dict_manual[(0, 'mouse')], 2)
assert_equal(len(sample_ohe_dict_manual.keys()), 7)

### (1b) Sparse vectors

Data points can typically be represented with a small number of non-zero OHE features which are relative to the total number of features that occur in the dataset.  By leveraging this sparsity and using sparse vector representations for OHE data, we can reduce storage and computational burdens.  Below are a few sample vectors represented as dense numpy arrays.  Use [SparseVector](https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.linalg.SparseVector) to represent them in a sparse fashion, and verify that both the sparse and dense representations yield the same results when computing [dot products](http://en.wikipedia.org/wiki/Dot_product) (we will later use MLlib to train classifiers via gradient descent, and MLlib will need to compute dot products between SparseVectors and dense parameter vectors).

Use `SparseVector(size, *args)` to create a new sparse vector where size is the length of the vector and args are either:
1. A list of indices and a list of values corresponding to the indices. The indices list must be sorted in ascending order. For example, SparseVector(5, [1, 3, 4], [10, 30, 40]) will represent the vector [0, 10, 0, 30, 40]. The non-zero indices are 1, 3 and 4. On the other hand, SparseVector(3, [2, 1], [5, 5]) will give you an error because the indices list [2, 1] is not in ascending order. Note: you cannot simply sort the indices list, because otherwise the values will not correspond to the respective indices anymore.
2. A list of (index, value) pair. In this case, the indices need not be sorted. For example, SparseVector(5, [(3, 1), (1, 2)]) will give you the vector [0, 2, 0, 1, 0].

SparseVectors are much more efficient when working with sparse data because they do not store zero values (only store non-zero values and their indices). You'll need to create a sparse vector representation of each dense vector `a_dense` and `b_dense`.

In [None]:
import numpy as np
from pyspark.ml.linalg import SparseVector

In [None]:
# # TODO: Uncomment the lines below and replace <FILL IN> with appropriate code
# a_dense = np.array([0., 3., 0., 4.])
# a_sparse = <FILL IN >

# b_dense = np.array([0., 0., 0., 1.])
# b_sparse = <FILL IN >

# YOUR CODE HERE
raise NotImplementedError()

w = np.array([0.4, 3.1, -1.4, -.5])
print(a_dense.dot(w))
print(a_sparse.dot(w))
print(b_dense.dot(w))
print(b_sparse.dot(w))

In [None]:
# TEST Sparse Vectors (1b)
assert_true(isinstance(a_sparse, SparseVector), 'a_sparse needs to be an instance of SparseVector')
assert_true(b_dense.dot(w) == b_sparse.dot(w),
                'dot product of b_dense and w should equal dot product of b_sparse and w')
assert_true(a_sparse.numNonzeros() == 2, 'a_sparse should not store zero values')

### (1c) OHE features as sparse vectors

Now let's see how we can represent the OHE features for points in our sample dataset.  Using the mapping defined by the OHE dictionary from Part (1a), manually define OHE features for the three sample data points using SparseVector format.  In this case, all the features will have a value of 1.0.  For example, the `DenseVector` for a point with features 2 and 4 would be `[0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0]`.

In [None]:
# Reminder of the sample features
# sample_one = [(0, 'mouse'), (1, 'black')]
# sample_two = [(0, 'cat'), (1, 'tabby'), (2, 'mouse')]
# sample_three =  [(0, 'bear'), (1, 'black'), (2, 'salmon')]

In [None]:
# # TODO: Uncomment the lines below and replace <FILL IN> with appropriate code. Use SparseVector
# sample_one_ohe_feat_manual = <FILL IN >
# sample_two_ohe_feat_manual = <FILL IN >
# sample_three_ohe_feat_manual = <FILL IN >

# YOUR CODE HERE
raise NotImplementedError()

In [None]:
# TEST OHE Features as sparse vectors (1c)
assert_true(isinstance(sample_one_ohe_feat_manual, SparseVector),
                'sample_one_ohe_feat_manual needs to be a SparseVector')
assert_true(isinstance(sample_two_ohe_feat_manual, SparseVector),
                'sample_two_ohe_feat_manual needs to be a SparseVector')
assert_true(isinstance(sample_three_ohe_feat_manual, SparseVector),
                'sample_three_ohe_feat_manual needs to be a SparseVector')

assert_equal(sample_one_ohe_feat_manual[2], 1.0, 'incorrect value for sample_one_ohe_feat_manual')
assert_equal(sample_one_ohe_feat_manual[3], 1.0, 'incorrect value for sample_one_ohe_feat_manual')


### (1d) Define a OHE function

Next we will use the OHE dictionary from Part (1a) to programatically generate OHE features from the original categorical data.  First write a function called `one_hot_encoding` that creates OHE feature vectors in `SparseVector` format.  Then use this function to create OHE features for the first sample data point and verify that the result matches the result from Part (1c).

> Note: We'll pass in the OHE dictionary as a [Broadcast](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.Broadcast) variable, which will greatly improve performance when we call this function as part of a UDF. **When accessing a broadcast variable, you _must_ use `.value`.** For instance: `ohe_dict_broadcast.value`.

In [None]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code
def one_hot_encoding(raw_feats, ohe_dict_broadcast, num_ohe_feats):
    """Produce a one-hot-encoding from a list of features and an OHE dictionary.

    Note:
        You should ensure that the indices used to create a SparseVector are sorted.

    Args:
        raw_feats (list of (int, str)): The features corresponding to a single observation.  Each
            feature consists of a tuple of featureID and the feature's value. (e.g. sample_one)
        ohe_dict_broadcast (Broadcast of dict): Broadcast variable containing a dict that maps
            (featureID, value) to unique integer.
        num_ohe_feats (int): The total number of unique OHE features (combinations of featureID and
            value).

    Returns:
        SparseVector: A SparseVector of length num_ohe_feats with indices equal to the unique
            identifiers for the (featureID, value) combinations that occur in the observation and
            with values equal to 1.0.
    """
    # <FILL IN>
    # ohe_features = [<FILL IN>]
    # return SparseVector(<FILL IN>)

    # YOUR CODE HERE
    raise NotImplementedError()
  
# # Calculate the number of features in sample_ohe_dict_manual
# num_sample_ohe_feats = <FILL IN >
# sample_ohe_dict_manual_broadcast = sc.broadcast(sample_ohe_dict_manual)

# # Run one_hot_encoding() on sample_one.  Make sure to pass in the Broadcast variable.
# sample_one_ohe_feat = <FILL IN >  
  
# YOUR CODE HERE
raise NotImplementedError()

print(sample_one_ohe_feat)

In [None]:
# TEST Define an OHE Function (1d)
assert_true(sample_one_ohe_feat == sample_one_ohe_feat_manual,
                'sample_one_ohe_feat should equal sample_one_ohe_feat_manual')
assert_equal(sample_one_ohe_feat, SparseVector(7, [2, 3], [1.0, 1.0]),
                  'incorrect value for sample_one_ohe_feat')
assert_equal(one_hot_encoding([(1, 'black'), (0, 'mouse')], sample_ohe_dict_manual_broadcast,
                                   num_sample_ohe_feats), SparseVector(7, [2, 3], [1.0, 1.0]),
                  'incorrect definition for one_hot_encoding')

### (1e) Apply OHE to a dataset

Finally, use the function from Part (1d) to create OHE features for all 3 data points in the sample dataset.  You'll need to generate a [UDF](https://spark.apache.org/docs/1.6.1/api/python/pyspark.sql.html#pyspark.sql.functions.udf) that can be used in a `DataFrame` `select` statement.

> Note: Your implemenation of `ohe_udf_generator` needs to call your `one_hot_encoding` function.

In [None]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code
from pyspark.sql.functions import udf
from pyspark.ml.linalg import VectorUDT

def ohe_udf_generator(ohe_dict_broadcast):
    """Generate a UDF that is setup to one-hot-encode rows with the given dictionary.

    Note:
        We'll reuse this function to generate a UDF that can one-hot-encode rows based on a
        one-hot-encoding dictionary built from the training data.  Also, you should calculate
        the number of features before calling the one_hot_encoding function.

    Args:
        ohe_dict_broadcast (Broadcast of dict): Broadcast variable containing a dict that maps
            (featureID, value) to unique integer.

    Returns:
        UserDefinedFunction: A UDF can be used in `DataFrame` `select` statement to call a
            function on each row in a given column.  This UDF should call the one_hot_encoding
            function with the appropriate parameters.
    """
#     length = <FILL IN>
#     return udf(lambda x: <FILL IN>, VectorUDT())

# YOUR CODE HERE
raise NotImplementedError()

# sample_ohe_dict_udf = ohe_udf_generator(sample_ohe_dict_manual_broadcast)
# sample_ohe_df = sample_data_df.select( < FILL IN >)
# sample_ohe_df.show(truncate=False)

# YOUR CODE HERE
raise NotImplementedError()

In [None]:
# TEST Apply OHE to a dataset (1e)
sample_ohe_data_values = sample_ohe_df.collect()
assert_true(len(sample_ohe_data_values) == 3, 'sample_ohe_data_values should have three elements')
assert_equal(sample_ohe_data_values[0], (SparseVector(7, {2: 1.0, 3: 1.0}),),
                  'incorrect OHE for first sample')
assert_equal(sample_ohe_data_values[1], (SparseVector(7, {1: 1.0, 4: 1.0, 5: 1.0}),),
                  'incorrect OHE for second sample')
assert_equal(sample_ohe_data_values[2], (SparseVector(7, {0: 1.0, 3: 1.0, 6: 1.0}),),
                  'incorrect OHE for third sample')

## Part 2: Construct an OHE dictionary

### (2a) DataFrame with rows of `(featureID, category)`

To start, create a DataFrame of distinct `(feature_id, category)` tuples. In our sample dataset, the 7 items in the resulting DataFrame are `(0, 'bear')`, `(0, 'cat')`, `(0, 'mouse')`, `(1, 'black')`, `(1, 'tabby')`, `(2, 'mouse')`, `(2, 'salmon')`. Notably `'black'` appears twice in the dataset but only contributes one item to the DataFrame: `(1, 'black')`, while `'mouse'` also appears twice and contributes two items: `(0, 'mouse')` and `(2, 'mouse')`.  Use [explode](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.explode) and [distinct](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.distinct).

In [None]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code
from pyspark.sql.functions import explode
# sample_distinct_feats_df = (sample_data_df
#                               <FILL IN>)
# YOUR CODE HERE
raise NotImplementedError()
sample_distinct_feats_df.show()

In [None]:
# TEST DataFrame with rows of `(featureID, category)` (2a)
assert_equal(sorted(map(lambda r: r[0], sample_distinct_feats_df.collect())),
                  [(0, 'bear'), (0, 'cat'), (0, 'mouse'), (1, 'black'),
                   (1, 'tabby'), (2, 'mouse'), (2, 'salmon')],
                  'incorrect value for sample_distinct_feats_df')

### (2b) OHE Dictionary from distinct features

Next, create an RDD of key-value tuples, where each `(feature_id, category)` tuple in `sample_distinct_feats_df` is a key and the values are distinct integers ranging from 0 to (number of keys - 1).  Then convert this RDD into a dictionary, which can be done using the `collectAsMap` action.  Note that there is no unique mapping from keys to values, as all we require is that each `(featureID, category)` key be mapped to a unique integer between 0 and the number of keys.  In this exercise, any valid mapping is acceptable.  Use [zipWithIndex](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.zipWithIndex) followed by [collectAsMap](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.collectAsMap).

In our sample dataset, one valid list of key-value tuples is: `[((0, 'bear'), 0), ((2, 'salmon'), 1), ((1, 'tabby'), 2), ((2, 'mouse'), 3), ((0, 'mouse'), 4), ((0, 'cat'), 5), ((1, 'black'), 6)]`. The dictionary defined in Part (1a) illustrates another valid mapping between keys and integers.

> Note: We provide the code to convert the DataFrame to an RDD.

In [None]:
# # TODO: Uncomment the lines below and replace <FILL IN> with appropriate code
# sample_ohe_dict = (sample_distinct_feats_df
#                      .rdd
#                      .map(lambda r: tuple(r[0]))
#                      <FILL IN>)

# YOUR CODE HERE
raise NotImplementedError()
print(sample_ohe_dict)

In [None]:
# TEST OHE Dictionary from distinct features (2b)
assert_equal(sorted(sample_ohe_dict.keys()),
                  [(0, 'bear'), (0, 'cat'), (0, 'mouse'), (1, 'black'),
                   (1, 'tabby'), (2, 'mouse'), (2, 'salmon')],
                  'sample_ohe_dict has unexpected keys')
assert_equal(sorted(sample_ohe_dict.values()), list(range(7)), 'sample_ohe_dict has unexpected values')

### (2c) Automated creation of an OHE dictionary

Now use the code from Parts (2a) and (2b) to write a function that takes an input dataset and outputs an OHE dictionary.  Then use this function to create an OHE dictionary for the sample dataset, and verify that it matches the dictionary from Part (2b).

In [None]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code
def create_one_hot_dict(input_df):
    """Creates a one-hot-encoder dictionary based on the input data.

    Args:
        input_df (DataFrame with 'features' column): A DataFrame where each row contains a list of
            (featureID, value) tuples.

    Returns:
        dict: A dictionary where the keys are (featureID, value) tuples and map to values that are
            unique integers.
    """
#    <FILL IN>
#    distinct_feature_df = (<FILL IN>)
#    key_value_tuple_list = (<FILL IN)
#    return dict(key_value_tuple_list)

# YOUR CODE HERE
raise NotImplementedError()

sample_ohe_dict_auto = create_one_hot_dict(sample_data_df)

In [None]:
# TEST Automated creation of an OHE dictionary (2c)
assert_equal(sorted(sample_ohe_dict_auto.keys()),
                  [(0, 'bear'), (0, 'cat'), (0, 'mouse'), (1, 'black'),
                   (1, 'tabby'), (2, 'mouse'), (2, 'salmon')],
                  'sample_ohe_dict_auto has unexpected keys')
assert_equal(sorted(sample_ohe_dict_auto.values()), list(range(7)),
                  'sample_ohe_dict_auto has unexpected values')

## Part 3: Parse CTR data and generate OHE features

***Before we can proceed, you'll first need to obtain the sample data.  ***

The data is from a past [kaggle competition](https://www.kaggle.com/c/criteo-display-ad-challenge/overview). The original data was too big. So we randomly sampled the data for this assignment from the original dataset.

For data fileds:
* Label - Target variable that indicates if an ad was clicked (1) or not (0).
* I1-I13 - A total of 13 columns of integer features (mostly count features).
* C1-C26 - A total of 26 columns of categorical features. The values of these features have been hashed onto 32 bits for anonymization purposes.
The semantic of the features is undisclosed.
Just run the cell below.

In [None]:
# Run this cell for getting data from the github repo.
from pyspark import SparkFiles
from pyspark.sql import Row
url = "https://raw.githubusercontent.com/17-700/data/master/hw3/dac.txt"
sc.addFile(url)

raw_df = sc.textFile("file://" + SparkFiles.get("dac.txt")).map(lambda r: Row(r)).toDF(["text"])

In [None]:
raw_df.show()

### (3a) Loading and splitting the data

We are now ready to start working with the actual CTR data, and our first task involves splitting it into training, validation, and test sets.  Usually we use the [randomSplit method](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.randomSplit) with the specified weights and seed to create DFs storing each of these datasets. BUT randomSplit may generate non-deterministic results. So for the sake of testing, we manually split the data in to train, validation, test by the ratio of 0.8, 0.1, 0.1.

Then your work is to [cache](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.cache) each of these DFs, as we will access them multiple times in the remainder of this lab. Finally, compute the size of each dataset.

In [None]:
import pyspark.sql.functions as f
indexDf = raw_df.withColumn('index', f.monotonically_increasing_id())
total_count = raw_df.count()
train_count = int(total_count * 4 / 5)
dev_count = int(total_count / 5)
val_count = int(total_count / 10)
test_count = int(total_count / 10)
# first 20% rows
raw_dev_df = indexDf.sort('index').limit(dev_count)
# last 80% rows
raw_train_df = indexDf.sort('index', ascending=False).limit(train_count).drop('index')
# first 10% rows
raw_validation_df = raw_dev_df.sort('index').limit(val_count).drop('index')
# last 10% rows
raw_test_df = raw_dev_df.sort('index', ascending=False).limit(test_count).drop('index')

# # Cache and count the DataFrames
# n_train = raw_train_df.<FILL IN>
# n_val = raw_validation_df.<FILL IN>
# n_test = raw_test_df.<FILL IN>

# YOUR CODE HERE
raise NotImplementedError()
print(n_train, n_val, n_test, n_train + n_val + n_test)
raw_df.show(1)

In [None]:
# TEST Loading and splitting the data (3a)
assert_true(all([raw_train_df.is_cached, raw_validation_df.is_cached, raw_test_df.is_cached]),
                'you must cache the split data')
assert_equal(n_train, 80000, 'incorrect value for n_train')
assert_equal(n_val, 10000, 'incorrect value for n_val')
assert_equal(n_test, 10000, 'incorrect value for n_test')

### (3b) Extract features

We will now parse the raw training data in order to create a DataFrame that we can subsequently use to create an OHE dictionary. Note from the `show()` command in Part (3a) that each raw data point is a string containing several fields separated by some delimiters.  For now, we will ignore the first field (which is just the 0-1 label), and parse the remaining fields (or raw features).  To do this, complete the implemention of the `parse_point` function.

In [None]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code
def parse_point(point):
    """Converts a comma separated string into a list of (featureID, value) tuples.

    Note:
        featureIDs should start at 0 and increase to the number of features - 1.

    Args:
        point (str): A comma separated string where the first value is the label and the rest
            are features.

    Returns:
        list: A list of (featureID, value) tuples.
    """
#       <FILL IN>

# YOUR CODE HERE
raise NotImplementedError()

print(parse_point(raw_df.select('text').first()[0]))

In [None]:
# TEST Extract features (3b)
assert_equal(parse_point(raw_df.select('text').first()[0])[:3], [(0, u'0'), (1, u'127'), (2, u'1')],
                  'incorrect implementation of parse_point')

### (3c) Extracting features continued

Next, we'll create a `parse_raw_df` function that creates a `label` column for the first value in a data point and a `features` column for the rest.  The `features` column will be created using `parse_point_udf`, which we've provided and is based on your `parse_point` function.  Note that to name your columns you should use [alias](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Column.alias).  You can split the `text` field in `raw_df` using [split](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.split) and retrieve the first value of the resulting array with [getItem](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Column.getItem). Be sure to call [cast](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Column.cast) to cast the column value to `double`. Your `parse_raw_df` function should also cache the DataFrame it returns.

In [None]:
# TODO: Uncomment the lines below and replace <FILL IN> with the appropriate code
from pyspark.sql.functions import udf, split
from pyspark.sql.types import ArrayType, StructType, StructField, LongType, StringType

parse_point_udf = udf(parse_point, ArrayType(StructType([StructField('_1', LongType()),
                                                         StructField('_2', StringType())])))

def parse_raw_df(raw_df):
    """Convert a DataFrame consisting of rows of comma separated text into labels and features.

    Args:
        raw_df (DataFrame with a 'text' column): DataFrame containing the raw comma separated data.

    Returns:
        DataFrame: A DataFrame with 'label' and 'features' columns.
    """
#     return raw_df.select(<FILL IN>)
#           .<FILL IN>
#           .<FILL IN>
#           .<FILL IN>
#           .<FILL IN>
#           .<FILL IN>
#           .cache()

  
# YOUR CODE HERE
raise NotImplementedError()

# # Parse the raw training DataFrame
# parsed_train_df = <FILL IN>  

# YOUR CODE HERE
raise NotImplementedError()

from pyspark.sql.functions import (explode, col)
num_categories = (parsed_train_df
                   .select(explode('features').alias('features'))
                   .distinct()
                   .select(col('features').getField('_1').alias('featureNumber'))
                   .groupBy('featureNumber')
                   .sum()
                   .orderBy('featureNumber')
                   .collect())

print(num_categories[2][1])

In [None]:
# TEST Extract features (3c)
assert_true(parsed_train_df.is_cached, 'parse_raw_df should return a cached DataFrame')
assert_equal(num_categories[2][1], 1356, 'incorrect implementation of parse_point or parse_raw_df')


### (3d) Create an OHE dictionary from the dataset

Note that `parse_point` returns a data point in the format of a list of `(featureID, category)` tuples, which is the same format as the sample dataset studied in Parts 1 and 2 of this lab.  Using this observation, create an OHE dictionary from the parsed training data using the function implemented in Part (2c). Note that we will assume for simplicity that all features in our CTR dataset are categorical.

In [None]:
# # TODO: Uncomment the lines below and replace <FILL IN> with appropriate code
# ctr_ohe_dict = <FILL IN>
# num_ctr_ohe_feats = <FILL IN>
# YOUR CODE HERE
raise NotImplementedError()

print(ctr_ohe_dict[(0, '4')])    

In [None]:
# TEST Create an OHE dictionary from the dataset (3d)
assert_equal(num_ctr_ohe_feats, 215556, 'incorrect number of features in ctr_ohe_dict')
assert_true((0, '4') in ctr_ohe_dict, 'incorrect features in ctr_ohe_dict')

### (3e) Apply OHE to the dataset

Now let's use this OHE dictionary, by starting with the training data that we've parsed into `label` and `features` columns, to create one-hot-encoded features.  Recall that we created a function `ohe_udf_generator` that can create the UDF that we need to convert row into `features`.  Make sure that `ohe_train_df` contains a `label` and `features` column and is cached.

In [None]:
# # TODO: Uncomment the lines below and replace <FILL IN> with the appropriate code
# ohe_dict_broadcast = <FILL IN>
# ohe_dict_udf = <FILL IN>
# ohe_train_df = (parsed_train_df
#                   <FILL IN>)

# YOUR CODE HERE
raise NotImplementedError()

print(ohe_train_df.count())
print(ohe_train_df.take(1))

In [None]:
# TEST Apply OHE to the dataset (3e)
assert_true('label' in ohe_train_df.columns and 'features' in ohe_train_df.columns, 'ohe_train_df should have label and features columns')
assert_true(ohe_train_df.is_cached, 'ohe_train_df should be cached')
num_nz = sum(parsed_train_df.rdd.map(lambda r: len(r[1])).take(5))
num_nz_alt = sum(ohe_train_df.rdd.map(lambda r: len(r[1].indices)).take(5))
assert_equal(num_nz, num_nz_alt, 'incorrect value for ohe_train_df')

### Visualization 1: Feature frequency

We will now visualize the number of times each of the 233,941 OHE features appears in the training data. We first compute the number of times each feature appears, then bucket the features by these counts.  The buckets are sized by powers of 2, so the first bucket corresponds to features that appear exactly once ( \\( \scriptsize 2^0 \\) ), the second to features that appear twice ( \\( \scriptsize 2^1 \\) ), the third to features that occur between three and four ( \\( \scriptsize 2^2 \\) ) times, the fifth bucket is five to eight ( \\( \scriptsize 2^3 \\) ) times and so on. The scatter plot below shows the logarithm of the bucket thresholds versus the logarithm of the number of features that have counts that fall in the buckets.

In [None]:
from pyspark.sql.types import ArrayType, IntegerType
from pyspark.sql.functions import log

get_indices = udf(lambda sv: list(map(int, sv.indices)), ArrayType(IntegerType()))
feature_counts = (ohe_train_df
                   .select(explode(get_indices('features')))
                   .groupBy('col')
                   .count()
                   .withColumn('bucket', log('count').cast('int'))
                   .groupBy('bucket')
                   .count()
                   .orderBy('bucket')
                   .collect())


In [None]:
import matplotlib.pyplot as plt

x, y = zip(*feature_counts)
x, y = x, np.log(y)

def prepare_plot(xticks, yticks, figsize=(10.5, 6), hide_labels=False, grid_color='#999999',
                 grid_width=1.0):
    """Template for generating the plot layout."""
    plt.close()
    fig, ax = plt.subplots(figsize=figsize, facecolor='white', edgecolor='white')
    ax.axes.tick_params(labelcolor='#999999', labelsize='10')
    for axis, ticks in [(ax.get_xaxis(), xticks), (ax.get_yaxis(), yticks)]:
        axis.set_ticks_position('none')
        axis.set_ticks(ticks)
        axis.label.set_color('#999999')
        if hide_labels: axis.set_ticklabels([])
    plt.grid(color=grid_color, linewidth=grid_width, linestyle='-')
    map(lambda position: ax.spines[position].set_visible(False), ['bottom', 'top', 'left', 'right'])
    return fig, ax

# generate layout and plot data
fig, ax = prepare_plot(np.arange(0, 12, 1), np.arange(0, 14, 2))
ax.set_xlabel(r'$\log_e(bucketSize)$'), ax.set_ylabel(r'$\log_e(countInBucket)$')
plt.scatter(x, y, s=14**2, c='#d6ebf2', edgecolors='#8cbfd0', alpha=0.75)
display(fig)

### (3f) Handling unseen features

We naturally would like to repeat the process from Part (3e), to compute OHE features for the validation and test datasets.  However, we must be careful, as some categorical values will likely appear in new data that did not exist in the training data. To deal with this situation, update the `one_hot_encoding()` function from Part (1d) to ignore previously unseen categories, and then compute OHE features for the validation data.  Remember that you can parse a raw DataFrame using `parse_raw_df`.
> Note: you'll have to generate a new UDF using `ohe_udf_generator` so that the updated `one_hot_encoding` function is used.  And make sure to cache `ohe_validation_df`.

In [None]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code
def one_hot_encoding(raw_feats, ohe_dict_broadcast, num_ohe_feats):
    """Produce a one-hot-encoding from a list of features and an OHE dictionary.

    Note:
        You should ensure that the indices used to create a SparseVector are sorted, and that the
        function handles missing features.

    Args:
        raw_feats (list of (int, str)): The features corresponding to a single observation.  Each
            feature consists of a tuple of featureID and the feature's value. (e.g. sample_one)
        ohe_dict_broadcast (Broadcast of dict): Broadcast variable containing a dict that maps
            (featureID, value) to unique integer.
        num_ohe_feats (int): The total number of unique OHE features (combinations of featureID and
            value).

    Returns:
        SparseVector: A SparseVector of length num_ohe_feats with indices equal to the unique
            identifiers for the (featureID, value) combinations that occur in the observation and
            with values equal to 1.0.
    """

#     ohe_feature = <FILL IN>
#     return SparseVector(<FILL IN>)

# YOUR CODE HERE
raise NotImplementedError()

# ohe_dict_missing_udf = <FILL IN>
# ohe_validation_df = (<FILL IN>)

# YOUR CODE HERE
raise NotImplementedError()

ohe_validation_df.count()
display(ohe_validation_df) # replace with ohe_validate_df.show() if running outside of Databricks

In [None]:
# TEST Handling unseen features (3f)
from pyspark.sql.functions import size, sum as sqlsum

assert_true(ohe_validation_df.is_cached, 'you need to cache ohe_validation_df')
num_nz_val = (ohe_validation_df
                .select(sqlsum(size(get_indices('features'))))
                .first()[0])

nz_expected = 367573
assert_equal(num_nz_val, nz_expected, 'incorrect number of features: Got {0}, expected {1}'.format(num_nz_val, nz_expected))

## Part 4: CTR prediction and logloss evaluation

### (4a) Logistic regression

We are now ready to train our first CTR classifier.  A natural classifier for this setting is logistic regression, since it models the probability of a click-through event rather than returning a simple binary "yes" or "no". Also, when working with rare events like clicking-through, probabilistic predictions are usually more accurate.

First use [LogisticRegression](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.classification.LogisticRegression) from the pyspark.ml package to train a model using `ohe_train_df` with a given hyperparameter configuration.  `LogisticRegression.fit` returns a [LogisticRegressionModel](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.classification.LogisticRegressionModel).  Next, we'll use the `LogisticRegressionModel.coefficients` and `LogisticRegressionModel.intercept` to print out some details of the model's parameters.  Note that these are the names of the object's attributes and should be called using a syntax like `model.coefficients` for a given `model`.

In [None]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code
# Given hyperparameters
standardization = False
elastic_net_param = 0.0
reg_param = .01
max_iter = 20

from pyspark.ml.classification import LogisticRegression
# lr = (<FILL IN>)

# lr_model_basic = <FILL IN>

# YOUR CODE HERE
raise NotImplementedError()

print('intercept: {0}'.format(lr_model_basic.intercept))
print('length of coefficients: {0}'.format(len(lr_model_basic.coefficients)))
sorted_coefficients = sorted(lr_model_basic.coefficients)[:5]

In [None]:
# TEST Logistic regression (4a)
assert_true(np.allclose(lr_model_basic.intercept,  -1.1870497039599432, atol=1e-2), 'incorrect value for model intercept')
assert_true(np.allclose(sorted_coefficients,
                           [-0.10347285277044568, -0.10296978958368273, -0.10296978958368273, -0.10296978958368273, -0.10296978958368273], atol=1e-2),
                           'incorrect value for model coefficients')

### (4b) Log loss
Throughout this lab, we will use log loss to evaluate the quality of models.  Log loss is defined as: \\[ \scriptsize \ell_{log}(p, y) = \begin{cases} -\log (p) & \text{if } y = 1 \\\ -\log(1-p) & \text{if } y = 0 \end{cases} \\] where \\( \scriptsize p\\) is a probability between 0 and 1 and \\( \scriptsize y\\) is a label of either 0 or 1. Log loss is a standard evaluation criterion when predicting rare-events such as click-through rate prediction.

Write a function `add_log_loss` for a DataFrame, and evaluate it on sample inputs.  This operation does not require a UDF.  You can perform a conditional branching with DataFrame columns using [when](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.when).

In [None]:
# Some example data
example_log_loss_df = sqlContext.createDataFrame([(.5, 1), (.5, 0), (.99, 1), (.99, 0), (.01, 1),
                                                  (.01, 0), (1., 1), (.0, 1), (1., 0)], ['p', 'label'])
example_log_loss_df.show()

In [None]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code
from pyspark.sql.functions import when, log, col
epsilon = 1e-16

def add_log_loss(df):
    """Computes and adds a 'log_loss' column to a DataFrame using 'p' and 'label' columns.

    Note:
        log(0) is undefined, so when p is 0 we add a small value (epsilon) to it and when
        p is 1 we subtract a small value (epsilon) from it.

    Args:
        df (DataFrame with 'p' and 'label' columns): A DataFrame with a probability column
            'p' and a 'label' column that corresponds to y in the log loss formula.

    Returns:
        DataFrame: A new DataFrame with an additional column called 'log_loss'
    """
#     return df.withColum(<FILL IN>)

# YOUR CODE HERE
raise NotImplementedError()

add_log_loss(example_log_loss_df).show()

In [None]:
# TEST Log loss (4b)
log_loss_values = add_log_loss(example_log_loss_df).select('log_loss').rdd.map(lambda r: r[0]).collect()
assert_true(np.allclose(log_loss_values[:-2],
                            [0.6931471805599451, 0.6931471805599451, 0.010050335853501338, 4.60517018598808,
                             4.605170185988081, 0.010050335853501338, -0.0], atol=1e-2), 'log loss is not correct')
assert_true(not(any(map(lambda x: x is None, log_loss_values[-2:]))),
                'log loss needs to bound p away from 0 and 1 by epsilon')

### (4c)  Baseline log loss

Next we will use the function we have in Part (4b) to compute the baseline log loss of the training data. A very simple yet natural baseline model is that we always make the same prediction regardless of datapoints, therefore the predicted value would be equal to the fraction of training points that correspond to click-through events (i.e., where the label is one). Compute this value (which is simply the mean of the training labels), and then use it to compute the training log loss for the baseline model.

> Note: you'll need to add a `p` column to the `ohe_train_df` DataFrame so that it can be used in your function from Part (4b).  To represent a constant value as a column you can use the [lit](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.lit) function to wrap the value.

In [None]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code
# Note that our dataset has a very high click-through rate by design
# In practice click-through rate can be one to two orders of magnitude lower

from pyspark.sql.functions import lit
# class_one_frac_train = (<FILL IN>)

# YOUR CODE HERE
raise NotImplementedError()
print('Training class one fraction = {0:.3f}'.format(class_one_frac_train))

# log_loss_tr_base = (<FILL IN>)

# YOUR CODE HERE
raise NotImplementedError()
print('Baseline Train Logloss = {0:.3f}\n'.format(log_loss_tr_base))

In [None]:
# TEST Baseline log loss (4c)
expected_frac = 0.2339125
expected_log_loss = 0.5439608117656105
assert_true(np.allclose(class_one_frac_train, expected_frac, atol=1e-2), 'incorrect value for class_one_frac_train. Got {0}, expected {1}'.format(class_one_frac_train, expected_frac))
assert_true(np.allclose(log_loss_tr_base, expected_log_loss, atol=1e-2), 'incorrect value for log_loss_tr_base. Got {0}, expected {1}'.format(log_loss_tr_base, expected_log_loss))

### (4d) Predict probability

In order to compute the log loss for the model we trained in Part (4a), we need to generate predictions from this model. Write a function that computes the raw linear prediction from this logistic regression model and then passes it through a [sigmoid function](http://en.wikipedia.org/wiki/Sigmoid_function) \\( \scriptsize \sigma(t) = (1+ e^{-t})^{-1} \\) to return the model's probabilistic prediction. Then compute probabilistic predictions on the training data.

Note that when incorporating an intercept into our predictions, we simply add the intercept to the value of the prediction obtained from the weights and features.  Alternatively, if the intercept was included as the first weight, we would need to add a corresponding feature to our data where the feature has the value one.  This is not the case here.

In [None]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code
from pyspark.sql.types import DoubleType
from math import exp #  exp(-t) = e^-t

def add_probability(df, model):
    """Adds a probability column ('p') to a DataFrame given a model"""
    coefficients_broadcast = sc.broadcast(model.coefficients)
    intercept = model.intercept

    def get_p(features):
        """Calculate the probability for an observation given a list of features.

        Note:
            We'll bound our raw prediction between 20 and -20 for numerical purposes.

        Args:
            features: the features

        Returns:
            float: A probability between 0 and 1.
        """
#         # Compute the raw value
#         raw_prediction = <FILL IN>
#         # Bound the raw value between 20 and -20
#         raw_prediction = <FILL IN>
#         # Return the probability
#         <FILL IN>
        
# YOUR CODE HERE
raise NotImplementedError()

    get_p_udf = udf(get_p, DoubleType())
    return df.withColumn('p', get_p_udf('features'))

add_probability_model_basic = lambda df: add_probability(df, lr_model_basic)
training_predictions = add_probability_model_basic(ohe_train_df).cache()

training_predictions.show(5)

In [None]:
# TEST Predicted probability (4d)
expected = 18746.356946150863
got = training_predictions.selectExpr('sum(p)').first()[0]
assert_true(np.allclose(got, expected, atol=1e-2),
                'incorrect value for training_predictions. Got {0}, expected {1}'.format(got, expected))

### (4e) Evaluate the model

We are now ready to evaluate the performance of the model we trained in Part (4a). To do this, first write a general function that takes a model and a DataFrame as its input, and outputs the log loss. Note that the log loss for multiple observations should be the mean of all the individual log losses. Then run this function on the OHE training data, and compare the result with the baseline log loss.

In [None]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code
def evaluate_results(df, model, baseline=None):
    """Calculates the log loss for the data given the model.

    Note:
        If baseline has a value the probability should be set to baseline before
        the log loss is calculated.  Otherwise, use add_probability to add the
        appropriate probabilities to the DataFrame.

    Args:
        df (DataFrame with 'label' and 'features' columns): A DataFrame containing
            labels and features.
        model (LogisticRegressionModel): A trained logistic regression model. This
            can be None if baseline is set.
        baseline (float): A baseline probability to use for the log loss calculation.

    Returns:
        float: Log loss for the data.
    """
    
#     with_probability_df = <FILL IN>
#     with_log_loss_df = <FILL IN>
#     log_loss = <FILL IN>
#     return log_loss

# YOUR CODE HERE
raise NotImplementedError()


log_loss_train_model_basic = evaluate_results(ohe_train_df, lr_model_basic)
print('OHE Features Train Logloss:\n\tBaseline = {0:.3f}\n\tLogReg = {1:.3f}'
       .format(log_loss_tr_base, log_loss_train_model_basic))

In [None]:
# TEST Evaluate the model (4e)
expected_log_loss = 0.48971226940239815
assert_true(np.allclose(log_loss_train_model_basic, expected_log_loss, atol=1e-2),
                'incorrect value for log_loss_train_model_basic. Got {0}, expected {1}'.format(log_loss_train_model_basic, expected_log_loss))
expected_res = 0.6931471805600546
res = evaluate_results(ohe_train_df, None,  0.5)
assert_true(np.allclose(res, expected_res, atol=1e-2),
                'evaluate_results needs to handle baseline models. Got {0}, expected {1}'.format(res, expected_res))

### (4f) Log loss of validation dataset

Next, use the `evaluate_results` function and compute the log loss of validation dataset for both the baseline and logistic regression models. Notably, the baseline model for the validation dataset should still be based on the label fraction from the training dataset.

In [None]:
# # TODO: Uncomment the lines below and replace <FILL IN> with appropriate code
# log_loss_val_base = <FILL IN>

# log_loss_val_l_r0 = <FILL IN>

# YOUR CODE HERE
raise NotImplementedError()

print(('OHE Features Validation Logloss:\n\tBaseline = {0:.3f}\n\tLogReg = {1:.3f}'
       .format(log_loss_val_base, log_loss_val_l_r0)))

In [None]:
# TEST Validation log loss (4f)
expected_val_base = 0.6344644324013423
assert_true(np.allclose(log_loss_val_base, expected_val_base, atol=1e-2),
                'incorrect value for log_loss_val_base. Got {0}, expected {1}'.format(log_loss_val_base, expected_val_base))
expected_val_model_basic = 0.5793520014798194
assert_true(np.allclose(log_loss_val_l_r0, expected_val_model_basic, atol=1e-2),
                'incorrect value for log_loss_val_l_r0. Got {0}, expected {1}'.format(log_loss_val_l_r0, expected_val_model_basic))

### Visualization 2: ROC curve

We will now visualize the performance of our model.  We generate a plot called the ROC curve.  The ROC curve shows us the trade-off between the false positive rate and true positive rate, as we liberalizing the threshold required for a positive prediction.  The performance of a random model is represented by the dashed line in the plot.

In [None]:
labels_and_scores = add_probability_model_basic(ohe_validation_df).select('label', 'p')
labels_and_weights = labels_and_scores.collect()
labels_and_weights.sort(key=lambda x: x[1], reverse=True)
labels_by_weight = np.array([k for (k, v) in labels_and_weights])

length = labels_by_weight.size
true_positives = labels_by_weight.cumsum()
num_positive = true_positives[-1]
false_positives = np.arange(1.0, length + 1, 1.) - true_positives

true_positive_rate = true_positives / num_positive
false_positive_rate = false_positives / (length - num_positive)

# Generate layout and plot data
fig, ax = prepare_plot(np.arange(0., 1.1, 0.1), np.arange(0., 1.1, 0.1))
ax.set_xlim(-.05, 1.05), ax.set_ylim(-.05, 1.05)
ax.set_ylabel('True Positive Rate (Sensitivity)')
ax.set_xlabel('False Positive Rate (1 - Specificity)')
plt.plot(false_positive_rate, true_positive_rate, color='#8cbfd0', linestyle='-', linewidth=3.)
plt.plot((0., 1.), (0., 1.), linestyle='--', color='#d6ebf2', linewidth=2.)  # Baseline model
display(fig)

## Part 5: Reduce features' dimension via feature hashing

### (5a) Hash function

As we just saw, using an one-hot-encoding featurization can yield a model with good statistical accuracy.  However, the number of distinct categories across all features is quite large -- recall that we observed 233K categories in the training data in Part (3c).  Moreover, the full training dataset includes more than 33M distinct categories, and the training dataset itself is just a small subset of labeled data in real world.  Hence, featurizing via an one-hot-encoding representation could lead to a very large feature vector. To reduce the dimensionality of the feature space, we will use feature hashing.

Below is a hash function that we will use for this part of the lab.  We will first use this hash function with the three sample data points from Part (1a) to gain some intuition.  Implement the following code to hash those three sample points using two different values for `numBuckets` and observe the resulting hashed feature dictionaries.

In [None]:
from collections import defaultdict
import functools 
import hashlib

def hash_function(raw_feats, num_buckets, print_mapping=False):
    """Calculate a feature dictionary for an observation's features based on hashing.

    Note:
        Use print_mapping=True for debug purposes and to better understand how the hashing works.

    Args:
        raw_feats (list of (int, str)): A list of features for an observation.  Represented as
            (featureID, value) tuples.
        num_buckets (int): Number of buckets to use as features.
        print_mapping (bool, optional): If true, the mappings of featureString to index will be
            printed.

    Returns:
        dict of int to float:  The keys will be integers which represent the buckets that the
            features have been hashed to.  The value for a given key will contain the count of the
            (featureID, value) tuples that have hashed to that key.
    """
    mapping = { category + ':' + str(ind):
                int(int(hashlib.md5((category + ':' + str(ind)).encode('utf-8')).hexdigest(), 16) % num_buckets)
                for ind, category in raw_feats}
    if(print_mapping): print(mapping)

    def map_update(l, r):
        l[r] += 1.0
        return l

    sparse_features = functools.reduce(map_update, mapping.values(), defaultdict(float))
    return dict(sparse_features)

# Reminder of the sample values:
# sample_one = [(0, 'mouse'), (1, 'black')]
# sample_two = [(0, 'cat'), (1, 'tabby'), (2, 'mouse')]
# sample_three =  [(0, 'bear'), (1, 'black'), (2, 'salmon')]

In [None]:
# # TODO: Uncomment the lines below and replace <FILL IN> with appropriate code
# # Use four buckets
# samp_one_four_buckets = hash_function(sample_one, < FILL IN >, True)
# samp_two_four_buckets = hash_function(sample_two, < FILL IN >, True)
# samp_three_four_buckets = hash_function(sample_three, < FILL IN >, True)

# # Use one hundred buckets
# samp_one_hundred_buckets = hash_function(sample_one, < FILL IN >, True)
# samp_two_hundred_buckets = hash_function(sample_two, < FILL IN >, True)
# samp_three_hundred_buckets = hash_function(sample_three, < FILL IN >, True)

# YOUR CODE HERE
raise NotImplementedError()

print('\n\t\t 4 Buckets \t\t\t 100 Buckets')
print('Sample One:\t {0}\t\t\t {1}'.format(samp_one_four_buckets, samp_one_hundred_buckets))
print('Sample Two:\t {0}\t\t {1}'.format(samp_two_four_buckets, samp_two_hundred_buckets))
print('Sample Three:\t {0}\t {1}'.format(samp_three_four_buckets, samp_three_hundred_buckets))

In [None]:
# TEST Hash function (5a)
assert_equal(samp_one_four_buckets, {3: 2.0}, 'incorrect value for samp_one_four_buckets')
assert_equal(samp_three_hundred_buckets, {80: 1.0, 82: 1.0, 51: 1.0},
                  'incorrect value for samp_three_hundred_buckets')

### (5b) Create hashed features

Next, we will use this hash function to create hashed features for our CTR datasets. Use the given UDF to create a function that takes in a DataFrame and returns both labels and hashed features.  Then use it to create new training, validation and test datasets with hashed features.

In [None]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code
from pyspark.ml.linalg import Vectors
num_hash_buckets = 2 ** 15

# UDF that returns a vector of hashed features given an Array of tuples
tuples_to_hash_features_udf = udf(lambda x: Vectors.sparse(num_hash_buckets, hash_function(x, num_hash_buckets)), VectorUDT())

def add_hashed_features(df):
    """Return a DataFrame with labels and hashed features.

    Note:
        Make sure you cache the DataFrame that you are returning.

    Args:
        df (DataFrame with 'label' and 'features' column): A DataFrame containing labels and the features to be hashed.

    Returns:
        DataFrame: A DataFrame with a 'label' column and a 'features' column that contains a
            SparseVector of hashed features.
    """
#     return     <FILL IN>

# YOUR CODE HERE
raise NotImplementedError()

# hash_train_df = <FILL IN>
# hash_validation_df = <FILL IN>
# hash_test_df = <FILL IN>

# YOUR CODE HERE
raise NotImplementedError()

hash_train_df.show()

In [None]:
# TEST Creating hashed features (5b)
hash_train_df_feature_sum = sum(hash_train_df
                                  .rdd
                                  .map(lambda r: sum(r[1].indices))
                                  .take(10))
hash_validation_df_feature_sum = sum(hash_validation_df
                                       .rdd
                                       .map(lambda r: sum(r[1].indices))
                                       .take(10))
hash_test_df_feature_sum = sum(hash_test_df
                                 .rdd
                                 .map(lambda r: sum(r[1].indices))
                                 .take(10))

expected_train_sum = 6333443
assert_equal(hash_train_df_feature_sum, expected_train_sum,
                  'incorrect number of features in hash_train_df. Got {0}, expected {1}'.format(hash_train_df_feature_sum, expected_train_sum))

expected_validation_sum = 6340030
assert_equal(hash_validation_df_feature_sum, expected_validation_sum,
                  'incorrect number of features in hash_validation_df. Got {0}, expected {1}'.format(hash_validation_df_feature_sum, expected_validation_sum))

### (5c) Sparsity

Since we only have 33K hashed features versus 233K OHE features, we could expect our OHE features to be sparser. Verify this hypothesis by computing the average sparsity of the OHE and the hashed training datasets.

Note that if you have a `SparseVector` named `sparse`, calling `len(sparse)` returns the total number of features, not the number features with entries.  `SparseVector` objects have the attributes `indices` and `values` that contain information about non-zero features.  Continuing with our example, these can be accessed using `sparse.indices` and `sparse.values`, respectively.

In [None]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code
def vector_feature_sparsity(sparse_vector):
    """Calculates the sparsity of a SparseVector.

    Args:
        sparse_vector (SparseVector): The vector containing the features.

    Returns:
        float: The ratio of features found in the vector to the total number of features.
    """
#     return     <FILL IN>

# YOUR CODE HERE
raise NotImplementedError()

a_sparse_vector = Vectors.sparse(5, {0: 1.0, 3: 1.0})
a_sparse_vector_sparsity = vector_feature_sparsity(a_sparse_vector)
print('This vector should have sparsity 2/5 or .4.')
print('Sparsity = {0:.2f}.'.format(a_sparse_vector_sparsity))

In [None]:
# TEST Sparsity (5c)
assert_equal(a_sparse_vector_sparsity, .4,
                'incorrect value for a_sparse_vector_sparsity')

### (5d) Sparsity continued

Now we have a function to calculate vector sparsity, we'll wrap it up in a UDF and apply it to the entire DataFrame to obtain the average sparsity of features in that DataFrame.  We'll use this function to calculate the average sparsity of both the one-hot-encoded training DataFrame and  the hashed training DataFrame.

In [None]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code
feature_sparsity_udf = udf(vector_feature_sparsity, DoubleType())

def get_sparsity(df):
    """Calculates the average sparsity for the features in a DataFrame.

    Args:
        df (DataFrame with 'features' column): A DataFrame with sparse features.

    Returns:
        float: The average feature sparsity.
    """
#     return (<FILL IN>)

# YOUR CODE HERE
raise NotImplementedError()

# average_sparsity_ohe = <FILL IN>
# average_sparsity_hash = <FILL IN>

# YOUR CODE HERE
raise NotImplementedError()

print('Average OHE Sparsity: {0:.7e}'.format(average_sparsity_ohe))
print('Average Hash Sparsity: {0:.7e}'.format(average_sparsity_hash))

In [None]:
# TEST Sparsity (5d)
expected_ohe = 1.8092746e-04
assert_true(np.allclose(average_sparsity_ohe, expected_ohe, atol=1e-2),
                'incorrect value for average_sparsity_ohe. Got {0}, expected {1}'.format(average_sparsity_ohe, expected_ohe))
expected_hash = 1.1895943e-03
assert_true(np.allclose(average_sparsity_hash, expected_hash, atol=1e-2),
                'incorrect value for average_sparsity_hash. Got {0}, expected {1}'.format(average_sparsity_hash, expected_hash))

### (5e) Logistic model with hashed features

Now let's train a logistic regression model using the hashed training features. Use the given hyperparameters, train and fit the model, then evaluate the log loss on the training set.

In [None]:
# TODO: Uncomment the lines below and replace <FILL IN> with appropriate code
# Given hyperparameters
standardization = False
elastic_net_param = 0.7
reg_param = .001
max_iter = 20

# lr_hash = (<FILL IN>)

# lr_model_hashed = <FILL IN>

# YOUR CODE HERE
raise NotImplementedError()

print('intercept: {0}'.format(lr_model_hashed.intercept))
print(len(lr_model_hashed.coefficients))

log_loss_train_model_hashed = evaluate_results(hash_train_df, lr_model_hashed)
print(('OHE Features Train Logloss:\n\tBaseline = {0:.3f}\n\thashed = {1:.3f}'
       .format(log_loss_tr_base, log_loss_train_model_hashed)))

In [None]:
# TEST Logistic model with hashed features (5e)
expected =  0.481478172974873
assert_true(np.allclose(log_loss_train_model_hashed, expected, atol=1e-2),
                'incorrect value for log_loss_train_model_hashed. Got {0}, expected {1}'.format(log_loss_train_model_hashed, expected))

### (5f) Evaluate the performance on the test set

Finally, we will evaluate the model from Part (5e) on the test set.  Compare the resulting log loss with the baseline log loss on the test set, which can be computed in the same way where the validation log loss was computed in Part (4f).

In [None]:
# # TODO: Uncomment the lines below and replace <FILL IN> with appropriate code
# # Log loss for the best model from (5e)
# log_loss_test = <FILL IN>

# YOUR CODE HERE
raise NotImplementedError()

# ## Log loss for the baseline model
# class_one_frac_test = <FILL IN>
# print('Class one fraction for test data: {0}'.format(class_one_frac_test))
# log_loss_test_baseline = <FILL IN>

# YOUR CODE HERE
raise NotImplementedError()

print(('Hashed Features Test Log Loss:\n\tBaseline = {0:.3f}\n\tLogReg = {1:.3f}'
       .format(log_loss_test_baseline, log_loss_test)))

In [None]:
# TEST Evaluate on the test set (5f)
expected_test_baseline = 0.6257336255412367
assert_true(np.allclose(log_loss_test_baseline, expected_test_baseline, atol=1e-2),
                'incorrect value for log_loss_test_baseline. Got {0}, expected {1}'.format(log_loss_test_baseline, expected_test_baseline))
expected_test = 0.5748571343610652
assert_true(np.allclose(log_loss_test, expected_test, atol=1e-2),
                'incorrect value for log_loss_test. Got {0}, expected {1}'.format(log_loss_test, expected_test))

# Gradient Descent Optimization

In this part we will explore building a distributed version of minibatch SGD along with two other stochastic gradient descent optimization algorithms. Although the data and model that we will use to illustrate these concepts may be relatively simple, these techniques should generalize well to larger data scales along with more sophisticated models such as deep neural networks.

Our [dataset](https://archive.ics.uci.edu/ml/datasets/BlogFeedback) originates from blog posts. The raw contents of the blog posts were crawled and processed. Our taks to predict the number of comments of a blog post in the upcoming 24 hours based on its attributes combined with additional features derived from blog posts published in the last 72 hours. We will be using a small subset of this data. Read ["Feedback Prediction for Blogs"](http://www.cs.bme.hu/~buza/pdfs/gfkl2012_blogs.pdf) for more information about these kind of analyses.


## This section will cover:

*  *Part 1:* Loading and Parsing the Dataset
*  *Part 2:* Minibatch SGD
*  *Part 3:* Agagrad
*  *Part 4:* Adaptive Moment Estimation (Adam)

> Note that, for reference, you can look up the details of:
> * the relevant Spark methods in [PySpark's DataFrame API](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.DataFrame)
> * the relevant NumPy methods in the [NumPy Reference](http://docs.scipy.org/doc/numpy/reference/index.html)

## Part 1: Loading and Parsing the Dataset

### Exercise 1(a)

Let's start by first loading the dataset and examining the number of observations and fields in this dataset. We can use the DataFrame [count method](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.count) to check how many observations we have, and [fieldNames](https://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/types.html#StructType.fieldNames) on the DataFrame's `struct` field to get the field names. Use [first](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.first) to get the first row of the DataFrame.

In [None]:
from pyspark import SparkFiles

url = "https://raw.githubusercontent.com/17-700/data/master/hw3/blogData.csv"
sc.addFile(url)

blog_df = sqlContext.read.csv("file://" + SparkFiles.get("blogData.csv"), inferSchema=True)

In [None]:
# TODO: Uncomment the following lines and replace <FILL IN> with appropriate code
# num_points = <FILL IN>
# print (num_points)
# field_names = <FILL IN>
# sample_point = <FILL IN>
# print (sample_points)

# YOUR CODE HERE
raise NotImplementedError()

In [None]:
assert_equal(num_points, 4000, "incorrect value for num_points")
assert_equal(field_names, ["_c{}".format(i) for i in range(281)], "incorrect value for field_names")
assert_equal([40.30467, 53.845657, 0.0, 401.0, 15.0, 15.52416, 32.44188, 0.0, 377.0, 3.0, 14.044226, 32.615417, 0.0, 377.0, 2.0, 34.567566, 48.475178, 0.0, 378.0, 12.0, 1.4799345, 46.18691, -356.0, 377.0, 0.0, 1.0761671, 1.795416, 0.0, 11.0, 0.0, 0.4004914, 1.0780969, 0.0, 9.0, 0.0, 0.37755936, 1.07421, 0.0, 9.0, 0.0, 0.972973, 1.704671, 0.0, 10.0, 0.0, 0.022932023, 1.521174, -8.0, 9.0, 0.0, 2.0, 2.0, 0.0, 2.0, 2.0, 0.0, 0.0, 0.0, 0.0, 0.0, 10.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0],
            [x for x in sample_point])


The next step is to prepare the data for machine learning. Since all of our data is numeric, this should be a simple task. Our target value (the number of comments in the next 24 hours) is contained in the last field of the dataframe. We can use a [VectorAssembler](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.VectorAssembler) and its [transform](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.Transformer.transform) method to convert the remaining fields into a feature vector.

### Exercise 1(b)

* Rename the field corresponding to the target field to "label"
* Convert the remaining fields into a feature vector using a [VectorAssembler](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.VectorAssembler)
* Use the [take](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.take) method to extract the first 5 lines of the vectorized Dataframe and print each line

In [None]:
# TODO: Uncomment the lines and replace <FILL_IN> with the appropriate code

from pyspark.ml.feature import VectorAssembler

# vectorizer = VectorAssembler()
# vectorizer.setInputCols(<FILL_IN>)
# vectorizer.setOutputCol(<FILL_IN>)
# vectorized_df = vectorizer.transform(<FILL IN>).<FILL_IN>
# sample_points = <FILL_IN>

# YOUR CODE HERE
raise NotImplementedError()

for sample_point in sample_points:
    print(sample_point)

In [None]:
assert_equal(vectorized_df.schema.fieldNames(), ["features", "label"], "incorrect schema for vectorized dataframe")
assert_equal(len(sample_points), 5, "incorrect length for sample_points")
assert_equal(vectorized_df.first().label, 1.0, "incorrect label for first row of vectorized dataframe")
assert_equal(len(list(filter(lambda x: x != 0, [x for x in sample_points[0].features]))), 43,
             "incorrect feature length for first sample_points entry")


Looking at our sample points, we observe that our feature vectors seem rather sparse (we could use the `get_sparsity` method we derived in the previous homework section to verify this). Another way of dealing with this sparsity besides the techniques used in the previous section is to use [Principal Component Analysis (PCA)](https://en.wikipedia.org/wiki/Principal_component_analysis) as a dimensionality reduction tool. Spark ML makes this straightforward for us to apply through the [PCA](https://spark.apache.org/docs/latest/ml-features.html#pca) transformation.

### Exercise 1(c)

* Use PCA to project the feature vectors down to a 20-dimensional space.
* Use the [take](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.take) method to extract the first 5 lines of the vectorized Dataframe and print each line

In [None]:
# TODO: Uncomment the lines and replace <FILL_IN> with the appropriate code

from pyspark.ml.feature import PCA

# pca = <FILL_IN>
# pca_df = pca.fit(<FILL_IN>).transform(<FILL IN>).<FILL_IN>
# sample_pca_points = <FILL_IN>

# YOUR CODE HERE
raise NotImplementedError()

for sample_pca_point in sample_pca_points:
    print(sample_pca_points)

In [None]:
assert_equal(pca_df.schema.fieldNames(), ["features", "label"], "incorrect schema for PCA dataframe")
assert_equal(len(sample_pca_points), 5, "incorrect length for sample_pca_points")
assert_equal(1.0, pca_df.first().label, "incorrect label for first row of PCA dataframe")
assert_true(np.allclose(sample_pca_points[0].features.array,
                        np.array([-3.23313766e+01, -8.32128388e+02, -1.06007278e+02, -4.05039027e+02,
                                  8.51011986e+00,  5.24211169e+00, -9.93285358e+00, -7.12650327e+00,
                                  -1.86928731e+00, -3.45218111e+01,  6.76336167e+00, -1.02970993e+00,
                                  -3.18646624e-02, -1.66425793e-01,  9.44414030e-01, -3.65068755e-01,
                                  6.39599245e-01, -8.74570077e-01, -3.96366118e-01, -2.54296755e-01]), atol=1e-2),
                        "incorrect feature for first sample_pca_point entry")


### Exercise 1(d)

Now that our feature space is reduced, let's scale our feature vectors, which should improve the stability and speed up the convergence of our gradient descent algorithms. Once again, Spark ML provides a useful [StandardScaler](https://spark.apache.org/docs/latest/ml-features.html#standardscaler) method that makes this process straightforward.

Finally, we'll also append a bias term of 1 to our feature vectors so that we can calculate an intercept term when performing linear regression.

* Implement `scale_dataframe` by using [StandardScaler](https://spark.apache.org/docs/latest/ml-features.html#standardscaler) configured with `withMean = True` to normalize the feature vectors
* Also append a bias term of 1 to each feature vector

In [None]:
from pyspark.ml.feature import StandardScaler
from pyspark.sql.functions import col, udf
from pyspark.ml.linalg import Vectors, VectorUDT

def scale_dataframe(dataframe):
    """Calculate the fraction of variance explained by the top `k` eigenvectors.

    Args:
        dataframe: a dataframe containing a "features" field to normalize and a "label" field
        
    Returns:
        dataframe: the dataframe with a normalized "features" field appended with a bias term with the original "label" field
    """
    # scaler = StandardScaler(<FILL_IN>)
    # scaled_df = scaler.fit(<FILL_IN>).transform(<FILL_IN>).<FILL_IN>
    # df_with_bias = <FILL_IN>
    # return <FILL IN>
    
    # YOUR CODE HERE
    raise NotImplementedError()

example_data = sqlContext.createDataFrame([(Vectors.dense(1, 2, 3), 1),
                                           (Vectors.dense(3, 1, 2), 2),
                                           (Vectors.dense(2, 3, 1), 3)], ["features", "label"])
scale_dataframe(example_data).show(truncate=False)
scale_dataframe(pca_df).first().features.array

In [None]:
scaled_features = [row['features'].array for row in scale_dataframe(example_data).collect()]
assert_true(np.allclose(scaled_features, np.array([[-1.,  0.,  1.,  1.], [ 1., -1.,  0.,  1.], [ 0.,  1., -1.,  1.]]), atol=1e-2),
                        "incorrect feature for scaled example data")

assert_true(np.allclose(scale_dataframe(pca_df).first().features.array,
                        np.array([ 0.71003709,  0.43869673, -0.42505435, -1.16038263,  0.00869739,
                                  -0.02160378,  1.1597074 ,  1.04233605, -0.20461856, -0.39373467,
                                  -0.1819323 , -0.83310861,  0.23984184, -0.018611  ,  0.10128653,
                                  -0.692232  ,  1.11750318, -1.06069006, -0.94329821, -0.47101456,
                                  1.        ]), atol=1e-2),
                        "incorrect feature for first entry of scaled pca dataframe")


## Part 2: Minibatch SGD

As we explored in Homework 2, gradient descent is an optimization algorithm used to minimize a cost function by iteratively moving in the direction of steepest descent as defined by the negative of the gradient. Although it can work well, the method we implemented in the previous homework involves computing the gradient over the entire dataset before updating the weights at each iteration, which can be prohibitive when dealing with large datasets where the cost of each independent variable iteration becomes O(n).

Stochastic gradient descent instead uses the cost gradient of a single random example at each iteration before updating the weights, which allows us to quickly make progress and oftentimes converge more rapidly in practice at the cost of noisier error updates. Minibatch SGD falls somewhere in between both extremes, where a cost gradient calculated on a batch of examples is used to update the model weights at each step. It also provides attractive efficiency properties as further detailed [here](http://d2l.ai/chapter_optimization/minibatch-sgd.html).

### Exercise 2(a)

Let's start by first evaluating minibatch SGD using the feature vectors that we've reduced using PCA. We'll split our data using an 80/20 train and test split. We'll also cache our splits to speed up evaluation.

In [None]:
import pyspark.sql.functions as f

def get_train_test_split(dataframe):
    index_df = dataframe.withColumn('index', f.monotonically_increasing_id())
    split100 = index_df.count()
    split20 = int(split100/5)
    split80 = split100 - split20
    
    # take first 20% rows
    test_df = index_df.sort('index').limit(split20).drop('index')
    
    # take last 80% rows
    train_df = index_df.sort('index', ascending=False).limit(split80).drop('index')
    
    return train_df, test_df
    
train_pca_df, test_pca_df = get_train_test_split(scale_dataframe(pca_df))

# Cache and count the DataFrames
# n_pca_train = <FILL IN>
# n_pca_test = <FILL IN>

# YOUR CODE HERE
raise NotImplementedError()
print(n_pca_train, n_pca_test, n_pca_train + n_pca_test)

In [None]:
# TEST Loading and splitting the data (3a)
assert_true(all([train_pca_df.is_cached, test_pca_df.is_cached]),
                'you must cache the split pca data')
assert_equal(n_pca_train, 3200, 'incorrect value for n_pca_train')
assert_equal(n_pca_test, 800, 'incorrect value for n_pca_test')


### Exercise 2(b)

Our implementation of minibatch SGD will be similar to the implementation of gradient descent in Homework 2 with a few changes to make the training process more efficient. One improvement that we can make is to calculate the gradient and the loss of the data point evaluated in an iteration over a single pass of the dataset. Remember that we can predict by computing the dot product between weights and an observation's features, and that the gradient summand can be written \\( \scriptsize (\mathbf{w}^\top \mathbf{x} - y) \mathbf{x}\\). We can return the squared loss for each evaluated data point so that we can calculate the root mean squared error in a later section.

In [None]:
from pyspark.ml.linalg import DenseVector

# TODO: Uncomment the following lines and replace <FILL IN> with appropriate code
def gradient_and_loss(weights, data_point):
    """Calculates the gradient summand and loss for a given weight and data point.

    Note:
        `DenseVector` behaves similarly to a `numpy.ndarray` and they can be used interchangably
        within this function.  For example, they both implement the `dot` method.

    Args:
        weights (DenseVector): An array of model weights (betas).
        data_point (features, label): A single observation consisting of a feature vector and a label.

    Returns:
        tuple: A (gradient, loss) tuple. The gradient summand (DenseVector) should be of the same
        length as `weights.`
    """
    # prediction = <FILL_IN>
    # loss = <FILL_IN>
    # gradient_summand = <FILL_IN>
    # return gradient_summand, loss
    
    # YOUR CODE HERE
    raise NotImplementedError()

example_w = DenseVector([1, 1, 1])
# gradient_summand = (dot([1 1 1], [3 1 4]) - 2) * [3 1 4] = (8 - 2) * [3 1 4] = [18 6 24]
# loss = dot([1 1 1], [3 1 4])^2 = (8 - 2)^2 = 36
example_data = sqlContext.createDataFrame([(Vectors.dense(3, 1, 4), 2.0)], ["features", "label"])

print(example_data.rdd.map(lambda x: gradient_and_loss(example_w, x)).first())

example_w = DenseVector([.24, 1.2, -1.4])
example_data = sqlContext.createDataFrame([(Vectors.dense(-1.4, 4.2, 2.1), 3.0)], ["features", "label"])

print(example_data.rdd.map(lambda x: gradient_and_loss(example_w, x)).first())

In [None]:
example_w = DenseVector([1, 1, 1])
example_data = sqlContext.createDataFrame([(Vectors.dense(3, 1, 4), 2.0)], ["features", "label"])
summand_one, loss_one = example_data.rdd.map(lambda x: gradient_and_loss(example_w, x)).first()
assert_true(np.allclose(summand_one, [18., 6., 24.], atol=1e-2), 'incorrect value for summand_one')
assert_equal(loss_one, 36.0, 'incorrect value for loss_one')

example_w = DenseVector([.24, 1.2, -1.4])
example_data = sqlContext.createDataFrame([(Vectors.dense(-1.4, 4.2, 2.1), 3.0)], ["features", "label"])
summand_two, loss_two = example_data.rdd.map(lambda x: gradient_and_loss(example_w, x)).first()
assert_true(np.allclose(summand_two, [1.7304, -5.1912, -2.5956], atol=1e-2), 'incorrect value for summand_two')
assert_equal(loss_two, 1.5276960000000006, 'incorrect value for loss_two')


### Exercise 2(c)

We will now define the aggregation that we will call in the inner loop of our minibatch SGD algorithm to get the summed gradient and squared loss in a single dataset pass. Implement `seq_op` and `comb_op` such that the aggregation returns a tuple of form `(total_gradient, total_square_loss, batch_size)`.

In [None]:
def seq_op(weights, aggregrated, observation):
    """
    Calculates the gradient and squared loss for a single data point and adds it to the running accumulated result.
    Args:
        weights (broadcast of DenseVector): broadcast of the weights vector
        aggregated (DenseVector, double, int): tuple containing the running sum of the gradient, squared loss,
        and number of data points evaluated so far
    Returns:
        (DenseVector, double, int): updated running sum of the gradient, squared loss, and number of data points
        evaluated so far
    """
    # return <FILL_IN>
    
    # YOUR CODE HERE
    raise NotImplementedError()

def comb_op(aggregationA, aggregationB):
    """
    Combines the running accumulated result in two partitions into a single result.
    Args:
        aggregationA (DenseVector, double, int): tuple containing the running sum of the gradient, squared loss,
        and number of data points evaluated so far in a partition
        aggregationB (DenseVector, double, int): tuple containing the running sum of the gradient, squared loss,
        and number of data points evaluated so far in a partition
    Returns:
        (DenseVector, double, int): combined sum
    """
    # return <FILL_IN>

    # YOUR CODE HERE
    raise NotImplementedError()

weights_broadcast = sc.broadcast(DenseVector([1, 1, 1]))
example_data = sqlContext.createDataFrame([(Vectors.dense(3, 1, 4), 1.0),
                                          (Vectors.dense(2, 1, 5), 2.0),
                                          (Vectors.dense(4, 1, 7), 3.0),
                                          (Vectors.dense(4, 1, 0), 4.0)], ["features", "label"])

print(example_data.rdd.aggregate((np.zeros(3), 0.0, 0), lambda x, y: seq_op(weights_broadcast, x, y), comb_op))


In [None]:
gradient, loss, count = example_data.rdd.aggregate((np.zeros(3), 0.0, 0), lambda x, y: seq_op(weights_broadcast, x, y), comb_op)
assert_true(np.allclose(gradient, [73.,  23., 121.], atol=1e-2), 'incorrect value for gradient')
assert_equal(loss, 167.0, 'incorrect value for loss')
assert_equal(count, 4, 'incorrect value for count')


### Exercise 2(d)

We're now ready to put everything together and construct our minibatch SGD algorithm! At each iteration, we will sample a fraction of our dataset and compute the summed loss and gradient of our selected minibatch using the aggregation function we previously defined. Our update rule is similar to that we defined in Homework 2 for gradient except that we now divide the gradient sum used to update the weights by our batch size instead of the total number of observations in the training set.

* Sample the training set without replacement, using the specified minibatch fraction and using the zero-indexed iteration number as the random seed
* Calcuate the root mean squared loss as the training error at each iteration

In [None]:
# TODO: Uncomment the following lines and replace <FILL IN> with appropriate code
def linreg_minibatch_sgd(train_data, minibatch_fraction, num_iters):
    """Calculates the weights and error for a linear regression model trained with minibatch SGD.

    Note:
        `DenseVector` behaves similarly to a `numpy.ndarray` and they can be used interchangably
        within this function.  For example, they both implement the `dot` method.

    Args:
        train_data (RDD of (features, vector)): The labeled data for use in training the model.
        minibatch_fraction (float): fraction of the dataset to use for each minibatch
        num_iters (int): The number of iterations of gradient descent to perform.

    Returns:
        (np.ndarray, np.ndarray): A tuple of (weights, training errors).  Weights will be the
            final weights (one weight per feature) for the model, and training errors will contain
            an error (RMSE) for each iteration of the algorithm.
    """
    # The number of features in the training data
    d = len(train_data.first().features)
    w = np.zeros(d)
    alpha = 0.1
    
    # We will compute and store the training error after each iteration
    error_train = np.zeros(num_iters)

#     for i in range(num_iters):
#         weights_broadcast = <FILL_IN> # Broadcast the current weights vector
#         # Get a minibatch using the sample method. Don't use replacement and use the current iteration as the seed
#         train_batch = <FILL_IN>
        
#         # Get the gradient, loss and batch size
#         gradient, loss, batch_size = train_batch.aggregate(<FILL_IN>)
#         batch_gradient = <FILL_IN>
       
#         # Calculate the root mean squared error for this iteration 
#         error_train[i] = <FILL_IN>

#         # Update the weights
#         alpha_i = alpha / np.sqrt(i+1)
#         w -= alpha_i * batch_gradient
    
#     return w, error_train
  
    # YOUR CODE HERE
    raise NotImplementedError()

# create a toy dataset with n = 10, d = 3, and then run 5 iterations of minibatch SGD
# note: the resulting model will not be useful; the goal here is to verify that
# linreg_minibatch_sgd is working properly
example_n = 20
example_d = 3

truncate = udf(lambda v: Vectors.dense(list(v)[:example_d]), VectorUDT())
example_data = train_pca_df.limit(example_n).select(truncate("features").alias("features"), "label")

example_data.show(2, truncate=False)
example_num_iters = 5
example_weights, example_error_train = linreg_minibatch_sgd(example_data.rdd, 0.5, example_num_iters)
print (example_weights)
print (example_error_train)

In [None]:
assert_true(np.allclose(example_weights, [-3.58685976, 3.96030386, -0.46859036], atol=1e-2), 'incorrect value for example_weights')
assert_true(np.allclose(example_error_train, [14.550405, 28.330522, 13.57563332, 9.83343336, 11.02731603], atol=1e-2),
            'incorrect value for example_error_train')


### Exercise 2(e)

Now let's train our linear regression model on all of our training data and evaluate its accuracy on the test set (this might take a few minutes). One (but not the only) way of getting the test set RMSE is to reuse the aggregate function that we previously defined to get the total squared loss on the test set. Although the gradient calculations are unneccessary, it shouldn't matter too much for our purposes.

* Run minibatch SGD with a minibatch fraction of 0.2 against the entire training set for 100 iterations
* Calculate the test RMSE at after training is complete

In [None]:
# TODO: Uncomment the following lines and replace <FILL IN> with appropriate code

num_iters = 100
minibatch_fraction = 0.2

# minibatch_weights, minibatch_error_train = linreg_minibatch_sgd(<FILL_IN>)
# _, minibatch_test_loss, _ = test_pca_df.rdd.aggregate(<FILL_IN>)
# minibatch_test_rmse = <FILL_IN>

# YOUR CODE HERE
raise NotImplementedError()

print(minibatch_weights)
print(minibatch_error_train)
print(minibatch_test_rmse)

In [None]:
assert_true(np.allclose(minibatch_weights, np.array([-1.58843999, -5.72548809,  2.05269507,  3.07769617,  9.04400967,
                                                     5.49920315,  2.69786858,  3.41055626, -0.41300786,  0.6864585 ,
                                                     -0.24572472, -0.43805686,  1.47487744,  0.03779474, -1.12206336,
                                                     0.634931  , -0.83604169,  0.348343  ,  0.61246073, -0.73123336,
                                                     9.5519935 ]), atol=1e-2), 'incorrect value for minibatch_weights')
assert_equal(len(minibatch_error_train), 100, 'incorrect length for minibatch_error_train')
assert_true(np.allclose(minibatch_test_rmse, 13.832791135070456, atol=1e-2), 'incorrect value for minibatch_test_rmse')


### Visualization 1: Minibatch Train Error

We can visualize the evolution of the training error against the number of iterations. Compared to gradient descent, we can see how progress is indeed "noisier."

In [None]:
fig, ax = prepare_plot(np.arange(0, num_iters, 10), np.arange(17, 35))
ax.set_ylim(17, 35)
plt.plot(range(num_iters), minibatch_error_train)
ax.set_xticklabels(map(str, range(0, num_iters, 10)))
ax.set_xlabel('Iteration'), ax.set_ylabel(r'Training Error')
display(fig)


### Exercise 2(f)

Finally, let's explore how minibatch SGD performs against our original dataset (before we reduced the feature dimensionality with PCA).

* Create new scaled train_df and test_df datasets from `vectorized_df` using the previously defined `get_train_test_split` and `scale_dataframe functions`
* Cache the new train and test dataframes
* Run minibatch SGD with a minibatch fraction of 0.2 against the entire training set for 10 iterations

In [None]:
# TODO: Uncomment the following lines and replace <FILL IN> with appropriate code

num_iters = 10
minibatch_fraction = 0.2

# train_df, test_df = <FILL_IN>
# n_train = <FILL_IN>
# n_test = <FILL_IN>
# minibatch_full_weights, minibatch_full_train = <FILL_IN>

# YOUR CODE HERE
raise NotImplementedError()

print(n_train, n_test, n_train + n_test)
print(minibatch_full_train)

In [None]:
assert_true(all([train_df.is_cached, test_df.is_cached]),
                'you must cache the split data')
assert_equal(n_train, 3200, 'incorrect value for n_train')
assert_equal(n_test, 800, 'incorrect value for n_test')
assert_equal(len(train_df.first().features), 281, 'incorrect feature length in train_df')
assert_equal(len(test_df.first().features), 281, 'incorrect feature length in test_df')
assert_equal(len(minibatch_full_weights), 281, 'incorrect length for minibatch_full_weights')

assert_true(np.allclose(minibatch_full_train, np.array([3.09847438e+01, 3.60587988e+01, 5.20752518e+01, 8.45603279e+01,
                                                     4.04831699e+02, 2.42182783e+03, 7.90918727e+03, 2.38528959e+04,
                                                     6.66401140e+04, 1.68829245e+05]), atol=1e-2),
            'incorrect value for minibatch_full_train')


### Visualization 2: Minibatch Train Error when using all Features

We can visualize the evolution of the training error (in log scale) against the number of iterations. We see that our training error is now actually rising (it will grow to infinity with more iterations). There are several ways of conditioning gradient descent to make it more stable with sparse data, some of which we will explore in the following sections.

In [None]:
fig, ax = prepare_plot(np.arange(num_iters), np.arange(3, 14))
ax.set_ylim(3, 14)
plt.plot(range(num_iters), np.log(minibatch_full_train))
ax.set_xticklabels(map(str, range(num_iters)))
ax.set_xlabel('Iteration'), ax.set_ylabel(r'log(Training Error)')
display(fig)
print(minibatch_full_train)

## Part 3: Adagrad

[AdaGrad](https://jmlr.org/papers/volume12/duchi11a/duchi11a.pdf) (for adaptive gradient algorithm) is a modified stochastic gradient descent algorithm developed in 2011 with per-parameter learning rates. Intuitively, it increases the learning rate for sparser parameters and decreases the rate for ones that are more frequent. For this reason, it is well-suited for dealing with sparse data and improves the robustness of SGD in these scenarios. Examples of such applications include natural language processing and image recognition. [Jeff Dean et al](http://papers.nips.cc/paper/4687-large-scale-distributed-deep-networks.pdf) used it to train large-scale neural networks at Google, and, among other things, teach them to [recognize cats in Youtube videos](https://www.wired.com/2012/06/google-x-neural-network/).

The modifications needed to adapt our existing gradient descend method to use Adagrad are fairly simple. Similar to minibatch SGD, we update our weight vector at each iteration by using the gradient and a learning rate parameter. However, we now also divide the general learning rate at iteration t for every parameter *θ*<sub>*i*</sub> based on the past gradients that have been computed for *θ*<sub>*i*</sub>. More formally, our update rule becomes:

\\( \mathbf{g}_t = \frac{1}{|\mathcal{B}|} \sum_{i \in \mathcal{B}_t}(\mathbf{w}^\top \mathbf{x} - y) \mathbf{x}\\)\
\\( \mathbf{s}_t = \mathbf{s}_{t-1} + \mathbf{g}_t^2\\)\
\\( \mathbf{w}_t = \mathbf{w}_{t-1} - \frac{\alpha}{\sqrt{\mathbf{s}_t + \epsilon}} \cdot \mathbf{g}_t.\\)

Where |B| is our mini-batch size, *α* is our configured learning rate and *ϵ* is a small term used to avoid dividing by zero.

See [here](http://d2l.ai/chapter_optimization/adagrad.html) for more details and background.


### Exercise 3(a)

Let's start by writing our new update function. Fill out the below function to return new weights and parameter update weights. 

In [None]:
# TODO: Uncomment the following lines and replace <FILL IN> with appropriate code

epsilon = 1e-8
alpha = 1

def update_weights_adagrad(prev_w, prev_s, gradient):
    """Calculates the weights and parameter update weights specified by the Adagrad update step.
    Args:
        prev_w (DenseVector): The previous model weights
        prev_s (DenseVector): The previous parameter update weights
        gradient (DenseVector): Gradient for this iteration

    Returns:
        (np.ndarray, np.ndarray): A tuple of (new_weights, new_parameter_update_weights).
    """
    # s = <FILL_IN>
    # w = <FILL_IN>
    # return w, s

    # YOUR CODE HERE
    raise NotImplementedError()


test_gradient = Vectors.dense([1, 3, 2])
test_w = [10, 8, 9]
test_s = [1, 4, 9]

w, s = update_weights_adagrad(test_w, test_s, test_gradient)

In [None]:
assert_true(np.allclose(w, np.array([9.29289322, 7.16794971, 8.4452998]), atol=1e-2), 'incorrect value for w')
assert_true(np.allclose(s.array, np.array([2.0, 13.0, 13.0]), atol=1e-2), 'incorrect value for s')


### Exercise 3(b)

We can now train using Adagrad by reusing most of what that we wrote to implement minibatch SGD. The main difference is that we will now use the function we implemented in the previous exercise to update the weights during each iteration. Complete the function below.

In [None]:
# TODO: Uncomment the following lines and replace <FILL IN> with appropriate code
def linreg_adagrad(train_data, minibatch_fraction, num_iters):
    """Calculates the weights and error for a linear regression model trained with Adagrad.

    Note:
        `DenseVector` behaves similarly to a `numpy.ndarray` and they can be used interchangably
        within this function.  For example, they both implement the `dot` method.

    Args:
        train_data (RDD of (features, vector)): The labeled data for use in training the model.
        minibatch_fraction (float): fraction of the dataset to use for each minibatch
        num_iters (int): The number of iterations of gradient descent to perform.

    Returns:
        (np.ndarray, np.ndarray): A tuple of (weights, training errors).  Weights will be the
            final weights (one weight per feature) for the model, and training errors will contain
            an error (RMSE) for each iteration of the algorithm.
    """
    # The number of features in the training data
    d = len(train_data.first().features)
    w = np.zeros(d)
    s = np.zeros(d)
    
    # We will compute and store the training error after each iteration
    error_train = np.zeros(num_iters)

#     for i in range(num_iters):
#         weights_broadcast = <FILL_IN> # Broadcast the current weights vector
#         # Get a minibatch using the sample method. Don't use replacement and use the current iteration as the seed
#         train_batch = <FILL_IN>
        
#         # Get the gradient, loss and batch size
#         gradient, loss, batch_size = train_batch.aggregate(<FILL_IN>)
#         batch_gradient = <FILL_IN>
        
#         # Calculate the root mean squared error for this iteration 
#         error_train[i] = <FILL_IN>

#         # Update the weights
#         w, s = update_weights_adagrad(<FILL_IN>)
    
#     return w, error_train
  
    # YOUR CODE HERE
    raise NotImplementedError()

# create a toy dataset with n = 10, d = 3, and then run 5 iterations of minibatch SGD
# note: the resulting model will not be useful; the goal here is to verify that
# linreg_adagrad is working properly
example_n = 20
example_d = 3

truncate = udf(lambda v: Vectors.dense(list(v)[:example_d]), VectorUDT())
example_data = train_pca_df.limit(example_n).select(truncate("features").alias("features"), "label")

example_data.show(2, truncate=False)
example_num_iters = 5
example_weights, example_error_train = linreg_adagrad(example_data.rdd, 0.5, example_num_iters)
print (example_weights)
print (example_error_train)

In [None]:
assert_true(np.allclose(example_weights, [-2.03703969, 2.25568569, -1.7138502], atol=1e-2), 'incorrect value for example_weights')
assert_true(np.allclose(example_error_train, [14.550405, 30.85333793, 10.46677688, 8.48714209, 11.35877978], atol=1e-2),
            'incorrect value for example_error_train')


### Exercise 3(c)

Let's try to once again train using our original training dataset without feature reduction (this will once again take a while).

* Run gradient descent with Adagrad with a minibatch fraction of 0.2 against the entire training set for 100 iterations
* Calculate the test RMSE at after training is complete

In [None]:
# TODO: Uncomment the following lines and replace <FILL IN> with appropriate code

num_iters = 100
minibatch_fraction = 0.2

# adagrad_weights, adagrad_error_train = linreg_adagrad(<FILL_IN>)
# _, adagrad_test_loss, _ = test_df.rdd.aggregate(<FILL_IN>)
# adagrad_test_rmse = <FILL_IN>

# YOUR CODE HERE
raise NotImplementedError()

print(adagrad_weights)
print(adagrad_error_train)
print(adagrad_test_rmse)

In [None]:
assert_equal(len(adagrad_weights), 281, 'incorrect length for adagrad_weights')
assert_equal(len(adagrad_error_train), 100, 'incorrect length for adagrad_error_train')
assert_true(np.allclose(adagrad_test_rmse, 14.350947808742774, atol=1e-2), 'incorrect value for adagrad_test_rmse')



### Visualization 3: Adagrad Train Error

Let's visualize the evolution of the training error against the number of iterations. We should be able to see the training error properly decrease as in our first visualization!

In [None]:
fig, ax = prepare_plot(np.arange(0, num_iters, 10), np.arange(17, 67, 10))
ax.set_ylim(17, 67)
plt.plot(range(num_iters), adagrad_error_train)
ax.set_xticklabels(map(str, range(0, num_iters, 10)))
ax.set_xlabel('Iteration'), ax.set_ylabel(r'Training Error')
display(fig)

### Visualization 4: Adagrad Feature Weights

One way of exploring how Adagrad handles our data sparsity is to look at the magnitude of the trained feature weigths. In the plot below, we sort the feature weights by magnitude and display the top 20.

In [None]:
top_count = 20
top_indices = np.argsort(np.abs(adagrad_weights))[-top_count:][::-1]

plt.figure(figsize=(9, 4))
plt.bar([str(x) for x in top_indices], np.abs(adagrad_weights[top_indices]))
plt.show()

## Part 4: Adaptive Moment Estimation (Adam)

[Adaptive Moment Estimation (Adam)](https://arxiv.org/pdf/1412.6980.pdf) is another method that computes adaptive learning rates for each parameter. In addition to storing an exponentially decaying average of past squared gradients as in Adagrad, Adam also keeps an exponentially decaying average of past gradients, similar to momentum. Adam has become a popular choice in machine learning as one of the more robust and effecitve optimization algorithms, especially in deep learning.

One of the main ideas behind Adam is to uses exponential weighted moving averages (also known as leaky averaging) to obtain an estimate of both the momentum and also the second moment of the gradient. More formally, it uses the state variables:

\\( \mathbf{v}_t = \beta_1 \mathbf{v}_{t-1} + (1 - \beta_1) \mathbf{g}_t\\)\
\\( \mathbf{s}_t = \beta_2 \mathbf{s}_{t-1} + (1 - \beta_2) \mathbf{g}_t^2.\\)

Where *g* is defined as in Adagrad and *β*<sub>1</sub> and *β*<sub>2</sub> are chosen nonnegative weighting parameters. As *v*<sub>*t*</sub> abd *s*<sub>*t*</sub> are initialized to zero, they are biased toward zero during the initial time steps, especially when the decay rates *β*<sub>1</sub> and *β*<sub>2</sub> are small. To counteract this, we normalize the state variables as:

\\( \hat{\mathbf{v}}_t = \frac{\mathbf{v}_t}{1 - \beta_1^t}\\)\
\\( \hat{\mathbf{s}}_t = \frac{\mathbf{s}_t}{1 - \beta_2^t}.\\)

Our update rule at every step then becomes:

\\( \mathbf{w}_t = \mathbf{w}_{t-1} - \frac{\alpha \hat{\mathbf{v}}_t}{\sqrt{\hat{\mathbf{s}}_t} + \epsilon}\\)


See [here](http://d2l.ai/chapter_optimization/adam.html) for more details and background.


### Exercise 4(a)

Let's start by writing the update for Adam. Fill out the below function to return new weights and parameter update weights. 

In [None]:
# TODO: Uncomment the following lines and replace <FILL IN> with appropriate code

epsilon = 10e-6
alpha = 1
beta1 = 0.9
beta2 = 0.999

def update_weights_adam(prev_w, prev_v, prev_s, gradient, t):
    """Calculates the weights and parameter update weights specified by the Adagrad update step.
    Args:
        prev_w (DenseVector): The previous model weights
        prev_v (DenseVector): The previous parameter update weights
        prev_s (DenseVector): The previous parameter update weights
        gradient (DenseVector): Gradient for this iteration
        t (int): iteration number (zero-indexed)

    Returns:
        (np.ndarray, np.ndarray): A tuple of (new_weights, new_v, new_s).
    """
#     v = <FILL_IN>
#     s = <FILL_IN>
#     v_hat = <FILL_IN>
#     s_hat = <FILL_IN>
#     w = <FILL_IN>
#     return w, v, s

    # YOUR CODE HERE
    raise NotImplementedError()


test_gradient = Vectors.dense([1, 3, 2])
test_w = Vectors.dense([10, 8, 9])
test_v = Vectors.dense([2, 5, 3])
test_s = Vectors.dense([1, 4, 9])

w, v, s = update_weights_adam(test_w, test_v, test_s, test_gradient, 2)

In [None]:
assert_true(np.allclose(w, np.array([9.6162, 7.5155, 8.8047]), atol=1e-2), 'incorrect value for w')
assert_true(np.allclose(v.array, np.array([1.9, 4.8, 2.9]), atol=1e-2), 'incorrect value for v')
assert_true(np.allclose(s.array, np.array([1.0, 4.005, 8.995]), atol=1e-2), 'incorrect value for s')


### Exercise 4(b)

As with Adagrad, we can train using Adam by reusing most of what we wrote in previous sections. Once again, the main difference is that we will now use the function we implemented in the previous exercise to update the weights during each iteration. Complete the function below.

In [None]:
# TODO: Uncomment the following lines and replace <FILL IN> with appropriate code
def linreg_adam(train_data, minibatch_fraction, num_iters):
    """Calculates the weights and error for a linear regression model trained with Adam.

    Note:
        `DenseVector` behaves similarly to a `numpy.ndarray` and they can be used interchangably
        within this function.  For example, they both implement the `dot` method.

    Args:
        train_data (RDD of (features, vector)): The labeled data for use in training the model.
        minibatch_fraction (float): fraction of the dataset to use for each minibatch
        num_iters (int): The number of iterations of gradient descent to perform.

    Returns:
        (np.ndarray, np.ndarray): A tuple of (weights, training errors).  Weights will be the
            final weights (one weight per feature) for the model, and training errors will contain
            an error (RMSE) for each iteration of the algorithm.
    """
    # The number of features in the training data
    d = len(train_data.first().features)
    w = np.zeros(d)
    v = np.zeros(d)
    s = np.zeros(d)
    
    # We will compute and store the training error after each iteration
    error_train = np.zeros(num_iters)

#     for i in range(num_iters):
#         weights_broadcast = <FILL_IN> # Broadcast the current weights vector
#         # Get a minibatch using the sample method. Don't use replacement and use the current iteration as the seed
#         train_batch = <FILL_IN>
        
#         # Get the gradient, loss and batch size
#         gradient, loss, batch_size = train_batch.aggregate(<FILL_IN>)
#         batch_gradient = <FILL_IN>
        
#         # Calculate the root mean squared error for this iteration 
#         error_train[i] = <FILL_IN>

#         # Update the weights
#         w, v, s = update_weights_adam(<FILL_IN>)
    
#     return w, error_train
  
    # YOUR CODE HERE
    raise NotImplementedError()

# create a toy dataset with n = 10, d = 3, and then run 5 iterations of minibatch SGD
# note: the resulting model will not be useful; the goal here is to verify that
# linreg_adam is working properly
example_n = 20
example_d = 3

truncate = udf(lambda v: Vectors.dense(list(v)[:example_d]), VectorUDT())
example_data = train_pca_df.limit(example_n).select(truncate("features").alias("features"), "label")

example_data.show(2, truncate=False)
example_num_iters = 5
example_weights, example_error_train = linreg_adam(example_data.rdd, 0.5, example_num_iters)
print (example_weights)
print (example_error_train)

In [None]:
assert_true(np.allclose(example_weights, [-3.5072271, 3.97331401, -2.83269863], atol=1e-2), 'incorrect value for example_weights')
assert_true(np.allclose(example_error_train, [14.550405, 30.85335126, 10.40982057, 8.19053844, 11.06324425], atol=1e-2),
            'incorrect value for example_error_train')


### Exercise 3(c)

Let's train using our original training dataset without feature reduction one more time (this will once again take a while).

* Run gradient descent with Adam with a minibatch fraction of 0.2 against the entire training set for 100 iterations
* Calculate the test RMSE at after training is complete

In [None]:
# TODO: Uncomment the following lines and replace <FILL IN> with appropriate code

num_iters = 100
minibatch_fraction = 0.2

# adam_weights, adam_error_train = linreg_adam(<FILL_IN>)
# _, adam_test_loss, _ = test_df.rdd.aggregate(<FILL_IN>)
# adam_test_rmse = <FILL_IN>

# YOUR CODE HERE
raise NotImplementedError()

print(adam_weights)
print(adam_error_train)
print(adam_test_rmse)

In [None]:
assert_equal(len(adam_weights), 281, 'incorrect length for adagrad_weights')
assert_equal(len(adam_error_train), 100, 'incorrect length for adam_error_train')
assert_true(np.allclose(adam_test_rmse, 14.24153151766628, atol=1e-2), 'incorrect value for adam_test_rmse')


### Visualization 5: Adam Train Error

Let's plot once the train error once again. It's natural to see some noisiness at the beginning, but training error should eventually decrease.

In [None]:
fig, ax = prepare_plot(np.arange(0, num_iters, 10), np.arange(17, 97, 10))
ax.set_ylim(17, 97)
plt.plot(range(num_iters), adam_error_train)
ax.set_xticklabels(map(str, range(0, num_iters, 10)))
ax.set_xlabel('Iteration'), ax.set_ylabel(r'Training Error')
display(fig)

### Visualization 6: Adam Feature Weights

As with Adagrad, we plot the top 20 feature weights sorted by magnitude. This feature set should have significant overlap with Adagrad.

In [None]:
top_count = 20
top_indices = np.argsort(np.abs(adam_weights))[-top_count:][::-1]

plt.figure(figsize=(8, 4))
plt.bar([str(x) for x in top_indices], np.abs(adam_weights[top_indices]))
plt.show()

### Manually Graded Question (10 points)
(Enter your answer into the cell below)

If you had to recommend one of these approaches for use on a large dataset, which do you believe would be the best choice and why?

Your answer here

### Extra Credit (also manually graded, up to 20 points)

#### Part 1 (5 points):
(Enter your answer into the cell below)

Explain a situation where Adagrad performs better than Adam (or vice-versa).

Your answer here

#### Part 2 (15 points):
(Enter your answer into the cell below)

Build a reproducible example demonstrating a scenario where one approach performs better than the other. You may use example data from [Kaggle](https://www.kaggle.com/) or the [UCI Machine Learning repository](https://archive.ics.uci.edu/ml/index.php) or from somewhere else entirely.

In [None]:
# Your answer here

Congratulations, you're now hopefully more familiar with some of more widely deployed optimization algorithms :) Although these techniques may seem slightly overkill when performing linear regression on the datsets we've examined, they still represent the state of the art when working with more non-trivial optimization problems, especially in deep learning.