# W261 Final Project
__`MIDS w261: Machine Learning at Scale | UC Berkeley School of Information | Spring 201`__ 


Alla Hale, Armand Kok, Daniel Olmstead, Adam Yang

The analysis below is a Click Through Rate prediction on the Criteo advertising data made public as part of a [Kaggle competition](https://www.kaggle.com/c/criteo-display-ad-challenge) in 2014.


# Notebook Set-Up

In [210]:
# Imports 
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import time

from pyspark.sql import SQLContext
from pyspark.sql import types
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler, StandardScaler, VectorIndexer, Normalizer
from pyspark.ml.linalg import VectorUDT
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression

%matplotlib inline
plt.style.use('ggplot')

In [211]:
%reload_ext autoreload
%autoreload 2

In [212]:
# store path to notebook
PWD = !pwd
PWD = PWD[0]

In [213]:
# start Spark Session
from pyspark.sql import SparkSession
app_name = "hw5_notebook"
master = "local[*]"
spark = SparkSession\
        .builder\
        .appName(app_name)\
        .master(master)\
        .getOrCreate()
sc = spark.sparkContext

## Get the Data

In [5]:
!mkdir data

In [6]:
# A different way to download the data... do we need this?
# !wget -q -O data/dac.tar.gz https://s3-eu-west-1.amazonaws.com/kaggle-display-advertising-challenge-dataset/dac.tar.gz

In [19]:
# Download the data to cluster
!curl https://s3-eu-west-1.amazonaws.com/kaggle-display-advertising-challenge-dataset/dac.tar.gz --output data/dac.tar.gz 

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 4364M  100 4364M    0     0  19.7M      0  0:03:40  0:03:40 --:--:-- 21.1M 0     0  19.8M      0  0:03:39  0:02:33  0:01:06 21.0M


In [8]:
# Extract the files on the cluster
!tar -xvzf data/dac.tar.gz --directory /data

tar: Ignoring unknown extended header keyword 'SCHILY.dev'
tar: Ignoring unknown extended header keyword 'SCHILY.ino'
tar: Ignoring unknown extended header keyword 'SCHILY.nlink'
readme.txt
tar: Ignoring unknown extended header keyword 'LIBARCHIVE.creationtime'
tar: Ignoring unknown extended header keyword 'SCHILY.dev'
tar: Ignoring unknown extended header keyword 'SCHILY.ino'
tar: Ignoring unknown extended header keyword 'SCHILY.nlink'
test.txt
tar: Ignoring unknown extended header keyword 'SCHILY.dev'
tar: Ignoring unknown extended header keyword 'SCHILY.ino'
tar: Ignoring unknown extended header keyword 'SCHILY.nlink'
train.txt


In [9]:
# Check if the extracted files are there
!ls data/

dac.tar.gz  readme.txt	test.txt  train.txt


In [26]:
# Move the files to the bucket
!gsutil cp train.txt gs://w261-final-hoky/data/
!gsutil cp readme.txt gs://w261-final-hoky/data/
!gsutil cp test.txt gs://w261-final-hoky/data/    

Copying file://train.txt [Content-Type=text/plain]...
==> NOTE: You are uploading one or more large file(s), which would run          
significantly faster if you enable parallel composite uploads. This
feature can be enabled by editing the
"parallel_composite_upload_threshold" value in your .boto
configuration file. However, note that if you do this large files will
be uploaded as `composite objects
<https://cloud.google.com/storage/docs/composite-objects>`_,which
means that any user who downloads such objects will need to have a
compiled crcmod installed (see "gsutil help crcmod"). This is because
without a compiled crcmod, computing checksums on composite objects is
so slow that gsutil disables downloads of composite objects.

| [1 files][ 10.4 GiB/ 10.4 GiB]   46.6 MiB/s                                   
Operation completed over 1 objects/10.4 GiB.                                     
Copying file://readme.txt [Content-Type=text/plain]...
/ [1 files][  1.9 KiB/  1.9 KiB]          

# 1. Question Formulation

When it comes to making money off the internet, few things drive revenue like display advertisement. Getting the right product in front of the right people can be beneficial to the brand and consumer alike, but to do so is no easy task. Criteo works with over 4,000 e-commerce companies around the world and utilizes an algorithmic machine learning approach on an endless stream of user and advertisement data in an attempt to show the right ads to any given user. As an extension of this goal, CriteoLabs had shared a week's worth of data as a machine learning challenge to develop an algorithm which can accurately predict the click-through-rate. The click-through-rate simply describes the probability that a given user on a given webpage, would click on a given ad. The idea of a click-through-rate can be further expressed by looking at the data provided by CriteoLabs:

In [None]:
train_RDD.take(1)

Criteo labs provided a week's worth of data where each row is similar to the one shown above. Each row of data contains tab delimited values where the first value represents the actual label where 1 means the user clicked on the provided advertisement and 0 means the user did not click on the provided advertisement. Then we are provided with 13 integer columns that mostly represent count features as well as 26 columns that represent categorical features. For anonymization purposes, the values of these categorical features have been hashed onto 32 bits. We are not told what each of the 39 features represent because Criteo would like to keep their feature selection a secret. However, it is implied that together, the 39 features represent a certain user, the webpage that the user is on, as well as a certain ad that the user is exposed to. With these 39 features, our goal is to come up with a machine learning algorithm in order to predict the probability that the ad will be clicked by the user on that webpage (click-through-rate). Along the development phase of our machine learning algorithm, we will be highlighting the following course concepts that was relevant to this task: Insert Course Concepts Here.

# 2. Algorithm Explanation

## Logistic Regression

In this work, we will be implementing a logistic regression model to predict the click-through-rate based on the data provided by Criteo Labs.

The probability, $p$, of belonging to a given class is given by equation 1, where $\mathbf{w}$ is the vector of weights, $\mathbf{x}$ is the vector of observations, and $b$ is the bias term.

\begin{equation}\tag{1}
p=\frac{1}{1+\exp \left(-\mathbf{w}^{T} \cdot \mathbf{x}+b\right)}
\end{equation}

To estimate the weights for a logistic regression, we use maximum likelihood estimation and maximize the log likelihood

\begin{equation}\tag{2}
\begin{aligned} 
I(W) &=\ln \prod_{i} P_{i} \\ 
     &=\ln \prod_{i}\left(\frac{1}{1+\exp \left(-\mathbf{w}^{T} \cdot \mathbf{x}_{i}+b\right)}\right)^{\frac{1+y_{i}}{2}}\left(1-\frac{1}{1+\exp \left(-\mathbf{w}^{T} \cdot \mathbf{x}_{i}+b\right)}\right)^{\frac{1-y_{i}}{2}}, 
\end{aligned}
\end{equation}

which is equal to minimizing the log loss function, $I(W)$, in equation 3, where $y$ is a label.

\begin{equation}\tag{3}
I(W)=\sum_{i} \log \left(1+\exp \left(-y \mathbf{w}^{T} \mathbf{x}_{i}\right)\right)
\end{equation}

Since this is a convex function, we can use gradient descent to find the vector, $\mathbf{w}$. The gradient is shown in vector notation in equation 4.

\begin{equation}\tag{4}
\nabla \mathbf{w}=-\sum_{i} y\left(1-\frac{1}{1+\exp \left(-y\left(\mathbf{w}^{T} \mathbf{x}_{i}+b\right)\right)}\right) \cdot \mathbf{x}_{i}
\end{equation}

The vector of weights is initially set to $\mathbf{0}$, and iteratively updated until convergence, according to equation 5.
\begin{equation}\tag{5}
\mathbf{w}=\mathbf{w}-\eta \cdot \nabla \mathbf{w}
\end{equation}

Equations for the $i^{th}$ component of the vector and the gradient, corresponding to equations 3, 4, and 5, are shown below in equations 6, 7, and 8, respectively.

\begin{equation}\tag{6}
I(W)=\sum_{i} Y^{l}\left(w_{0}+\sum_{i}^{n} w_{i} X_{i}^{l}\right)-\ln \left(1+\exp \left(w_{0}+\sum_{i}^{n} w_{i} X_{i}^{l}\right)\right)
\end{equation}

\begin{equation}\tag{7}
\frac{\partial l(W)}{\partial w_{i}}=\sum_{l} X_{i}^{l}\left(Y^{l}-\hat{P}\left(Y^{l}=1 | X^{l}, W\right)\right)
\end{equation}

\begin{equation}\tag{8}
w_{i} \leftarrow w_{i}+\eta \sum_{l} X_{i}^{l}\left(Y^{l}-\hat{P}\left(Y^{l}=1 | X^{l}, W\right)\right)
\end{equation}

Likely, we will regularize the gradient descent so that we do not overfit to our data. In order to do this, we introduce a regularization coefficient $\lambda$ multiplied either by the L1 norm (Lasso) or the L2 norm (Ridge).

The objective function and gradient are shown for lasso regression below.

Objective function

\begin{equation}\tag{9}
I(W)= \sum_{i} \log \left(1+\exp \left(-y\left(\mathbf{w}^{T} \mathbf{x}_{i}+b\right)\right)\right)+\lambda|\mathbf{w}|
\end{equation}

Gradient

\begin{equation}\tag{10}
\nabla \mathbf{w}=-\sum_{i}y\left(1-\frac{1}{1+\exp \left(-y\left(\mathbf{w}^{T} \mathbf{x}_{i}+b\right)\right)}\right) \cdot \mathbf{x}_{i}+\lambda \text{sign}(\mathbf{w})
\end{equation}

And again for ridge regression below.

Objective function

\begin{equation}\tag{11}
I(W)= \sum_{i} \log \left(1+\exp \left(-y\left(\mathbf{w}^{T} \mathbf{x}_{i}+b\right)\right)\right)+\lambda \mathbf{w}^{2}
\end{equation}

Gradient

\begin{equation}\tag{12}
\nabla \mathbf{w}=-\sum_{i}y\left(1-\frac{1}{1+\exp \left(-y\left(\mathbf{w}^{T} \mathbf{x}_{i}+b\right)\right)}\right) \cdot \mathbf{x}_{i}+\lambda \mathbf{w}
\end{equation}

In all cases, the update to $\mathbf{w}$ remains the same as shown in equation 5.

## FFM

In this work, we will be implementing a field-aware factorization machine to predict the click-through-rate based on the data provided by CriteoLabs.

In [171]:
# Toy Example
!mkdir Data

In [214]:
# Create toy example based on movie ratings
# Each row is a single rating by a single user for a single movie
toy= {'rating': [5,3,1,4,1,1,1,5,1,4,1,5,4], \
      'user': [1,1,1,2,2,3,3,3,4,4,5,5,5], \
      'movie': ['StarWars1','Inception','TheNotebook',\
                'StarWars1','TheNotebook',\
                'StarWars1','Inception','TheNotebook',\
                'StarWars1','TheNotebook',\
                'Inception','Godfather','TheNotebook']}
toy_df = pd.DataFrame(data=toy)
toy_df

Unnamed: 0,rating,user,movie
0,5,1,StarWars1
1,3,1,Inception
2,1,1,TheNotebook
3,4,2,StarWars1
4,1,2,TheNotebook
5,1,3,StarWars1
6,1,3,Inception
7,5,3,TheNotebook
8,1,4,StarWars1
9,4,4,TheNotebook


In [215]:
def convert_to_ffm(df):
    '''This function converts to the libFFM format.
    args:
        df- dataframe with data, columns are fields
    returns:
        datastring- string with one line for each record in the following format
            label field:feature:value
    '''
    
    movieDict = {'StarWars1': '1', 'Inception': '2', 'Godfather': '3', 'TheNotebook':'4'}
    
    # initialize datastring
    datastring = ''
    
    for index, row in df.iterrows():
        datastring += str(row[0]) + ' ' + '1:' + str(row[1]) + ':1' + ' ' + '2:' + movieDict[row[2]] + ':1' + '\n'
    return datastring
        
toy_libFFM = convert_to_ffm(toy_df)
print(toy_libFFM)

5 1:1:1 2:1:1
3 1:1:1 2:2:1
1 1:1:1 2:4:1
4 1:2:1 2:1:1
1 1:2:1 2:4:1
1 1:3:1 2:1:1
1 1:3:1 2:2:1
5 1:3:1 2:4:1
1 1:4:1 2:1:1
4 1:4:1 2:4:1
1 1:5:1 2:2:1
5 1:5:1 2:3:1
4 1:5:1 2:4:1



We would like to to predict the labels (movie ratings) for each of the users for each of the movies. 
We can do this with a factorization machine (FM) or field-aware factorization machine (FFM).

The above example is a regression. Ours is a classification problem.

In [216]:
toy2= {'click': [0,0,1,0,1,1,1,0,1,0,1,0,0], \
      'userID': ['Alla','Alla','Alla',\
                 'Daniel','Daniel',\
                 'Armand','Armand','Armand',
                 'Adam', 'Adam',\
                 'Alice', 'Alice', 'Alice'], \
      'movie_ad': ['StarWars1','Inception','TheNotebook',\
                'StarWars1','TheNotebook',\
                'StarWars1','Inception','TheNotebook',\
                'StarWars1','TheNotebook',\
                'Inception','Godfather','TheNotebook']}
toy2_df = pd.DataFrame(data=toy2)
toy2_df

Unnamed: 0,click,userID,movie_ad
0,0,Alla,StarWars1
1,0,Alla,Inception
2,1,Alla,TheNotebook
3,0,Daniel,StarWars1
4,1,Daniel,TheNotebook
5,1,Armand,StarWars1
6,1,Armand,Inception
7,0,Armand,TheNotebook
8,1,Adam,StarWars1
9,0,Adam,TheNotebook


In [217]:
toy2_libFFM = convert_to_ffm(toy2_df)
print(toy2_libFFM)

0 1:Alla:1 2:1:1
0 1:Alla:1 2:2:1
1 1:Alla:1 2:4:1
0 1:Daniel:1 2:1:1
1 1:Daniel:1 2:4:1
1 1:Armand:1 2:1:1
1 1:Armand:1 2:2:1
0 1:Armand:1 2:4:1
1 1:Adam:1 2:1:1
0 1:Adam:1 2:4:1
1 1:Alice:1 2:2:1
0 1:Alice:1 2:3:1
0 1:Alice:1 2:4:1



In [204]:
#!/usr/bin/python
#
# Created by Albert Au Yeung (2010)
#
# An implementation of matrix factorization
#
try:
    import numpy
except:
    print ("This implementation requires the numpy module.")
    exit(0)

###############################################################################

"""
@INPUT:
    R     : a matrix to be factorized, dimension N x M
    P     : an initial matrix of dimension N x K
    Q     : an initial matrix of dimension M x K
    K     : the number of latent features
    steps : the maximum number of steps to perform the optimisation
    alpha : the learning rate
    beta  : the regularization parameter
@OUTPUT:
    the final matrices P and Q
"""
def matrix_factorization(R, P, Q, K, steps=5000, alpha=0.0002, beta=0.02):
    Q = Q.T
    for step in range(steps):
        for i in range(len(R)):
            for j in range(len(R[i])):
                if R[i][j] > 0:
                    eij = R[i][j] - numpy.dot(P[i,:],Q[:,j])
                    for k in range(K):
                        P[i][k] = P[i][k] + alpha * (2 * eij * Q[k][j] - beta * P[i][k])
                        Q[k][j] = Q[k][j] + alpha * (2 * eij * P[i][k] - beta * Q[k][j])
        eR = numpy.dot(P,Q)
        e = 0
        for i in range(len(R)):
            for j in range(len(R[i])):
                if R[i][j] > 0:
                    e = e + pow(R[i][j] - numpy.dot(P[i,:],Q[:,j]), 2)
                    for k in range(K):
                        e = e + (beta/2) * ( pow(P[i][k],2) + pow(Q[k][j],2) )
        if e < 0.001:
            break
    return P, Q.T

###############################################################################

if __name__ == "__main__":
    R = [
         [5,3,0,1],
         [4,0,0,1],
         [1,1,0,5],
         [1,0,0,4],
         [0,1,5,4],
        ]

    R = numpy.array(R)

    N = len(R)
    M = len(R[0])
    K = 2

    P = numpy.random.rand(N,K)
    Q = numpy.random.rand(M,K)

    nP, nQ = matrix_factorization(R, P, Q, K)

# 3. EDA and Discussion of Challenges

In [33]:
# load the data
# Define the schema prior to loading the data
schema = StructType([StructField("label", IntegerType(), True),
                     StructField("I1", IntegerType(), True),
                     StructField("I2", IntegerType(), True),
                     StructField("I3", IntegerType(), True),
                     StructField("I4", IntegerType(), True),
                     StructField("I5", IntegerType(), True),
                     StructField("I6", IntegerType(), True),
                     StructField("I7", IntegerType(), True),
                     StructField("I8", IntegerType(), True),
                     StructField("I9", IntegerType(), True),
                     StructField("I10", IntegerType(), True),
                     StructField("I11", IntegerType(), True),
                     StructField("I12", IntegerType(), True),
                     StructField("I13", IntegerType(), True),
                     StructField("C1", StringType(), True),
                     StructField("C2", StringType(), True),
                     StructField("C3", StringType(), True),
                     StructField("C4", StringType(), True),
                     StructField("C5", StringType(), True),
                     StructField("C6", StringType(), True),
                     StructField("C7", StringType(), True),
                     StructField("C8", StringType(), True),
                     StructField("C9", StringType(), True),
                     StructField("C10", StringType(), True),
                     StructField("C11", StringType(), True),
                     StructField("C12", StringType(), True),
                     StructField("C13", StringType(), True),
                     StructField("C14", StringType(), True),
                     StructField("C15", StringType(), True),
                     StructField("C16", StringType(), True),
                     StructField("C17", StringType(), True),
                     StructField("C18", StringType(), True),
                     StructField("C19", StringType(), True),
                     StructField("C20", StringType(), True),
                     StructField("C21", StringType(), True),
                     StructField("C22", StringType(), True),
                     StructField("C23", StringType(), True),
                     StructField("C24", StringType(), True),
                     StructField("C25", StringType(), True),
                     StructField("C26", StringType(), True)])


In [48]:
# Load toy data into dataframe
toy_df = spark.read.parquet("toyData/*.parquet")

In [51]:
# Show toy_data
pd.DataFrame(toy_df.take(30), columns=toy_df.columns).transpose()


Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,15,16,17,18,19,20,21,22,23,24
label,0,1,0,0,1,0,0,1,0,0,...,0,0,0,1,0,0,0,1,0,0
I1,1,0,-999,0,1,-999,-999,1,-999,-999,...,-999,0,0,-999,-999,-999,-999,5,0,-999
I2,17,23,0,3,0,-1,2,4,-1,0,...,1,0,68,0,1,1,-1,297,0,31
I3,3,47,1,13,5,-999,1,19,-999,7,...,3,2,16,16,4,4,17,5,4,1
I4,6,6,2,1,3,-999,3,17,-999,18,...,3,3,3,8,4,2,6,3,-999,-999
I5,5,13044,20,2940,171,2975,3031,36,8740,3424,...,109391,1344,3735,0,1837,66163,31979,8,1573,28527
I6,6,2546,-999,83,34,1,76,7,12,22,...,-999,5,64,-999,9,-999,94,1,42,-999
I7,1,0,0,1,3,14,1,6,120,1,...,0,30,1,0,2,0,1,34,14,0
I8,5,7,2,16,34,1,3,22,0,21,...,3,7,3,7,4,2,7,3,8,2
I9,6,851,2,83,67,8,3,92,64,21,...,39,361,5,8,5,11,74,147,112,8


## Transform the data

In [119]:
def makeStages(df):
    
    cat_feats = ["C1", "C5", "C6", "C8", "C9", "C10", "C14", "C17", "C19", "C20", "C22", "C23", "C25"]
    num_feats = [i[0] for i in df.dtypes if i[1].startswith("int")]
    num_feats.remove("label")

    # Pipeline step 1: one hot encoding for the categorical variables
    stages = []
    for c in cat_feats:
        # cast each record in in categorical column c to an index
        stridx = StringIndexer(inputCol=c, outputCol = c + "idx").setHandleInvalid("keep")
        # one hot encode the indexed categorical column
        encoder = OneHotEncoderEstimator(inputCols=[stridx.getOutputCol()], outputCols=[c + "classVec"]).setDropLast(False)
        stages += [stridx, encoder]

    # Pipeline step 2: Standardize the numerical features
    for n in num_feats:
        num_assembler = VectorAssembler(inputCols=[n], outputCol=n+"classVec")
        num_normalizer = Normalizer(inputCol=num_assembler.getOutputCol(), outputCol=n+"normalized")
        stages += [num_assembler, num_normalizer]

    # Pipeline step 3: index the label column
    label_stridx = StringIndexer(inputCol="label", outputCol="label_transformed")
    stages += [label_stridx]

    # Pipeline step 4: put all features into one column as type of vector
    assembler_inputs = [c + "classVec" for c in cat_feats] + [n+"normalized" for n in num_feats]
    assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")
    stages += [assembler]
    
    return stages

In [120]:
def runThroughPipeline(df, stages):
    # fit the pipeline to do the series of fit/transform defined in stages
    start = time.time()
    pipeline = Pipeline(stages=stages)
    pipelineModel = pipeline.fit(df)
    transformed_df = pipelineModel.transform(df)
    print(f"... completed job in {time.time() - start} seconds")
    
    return transformed_df

In [121]:
# Make the stages
stages = makeStages(toy_df)

In [122]:
# Check the stages
stages

[StringIndexer_4ecabe8ad59480da0e0c,
 OneHotEncoderEstimator_42738ea23e4b65159e0d,
 StringIndexer_46a1b1e614e339e62f9a,
 OneHotEncoderEstimator_4feba99d88b83ec2dce4,
 StringIndexer_4cc4ae772499e6c3ebe9,
 OneHotEncoderEstimator_4192907e907c3a4099cf,
 StringIndexer_4fefa741ca775e45958e,
 OneHotEncoderEstimator_4d9ca136d6147cf61803,
 StringIndexer_4972b397d4f8b38db25e,
 OneHotEncoderEstimator_4223b6c478941271deae,
 StringIndexer_4aab8b9fbffc390ce393,
 OneHotEncoderEstimator_43e5a74aaf7e489a5733,
 StringIndexer_4d9184d5400f0a53a6f2,
 OneHotEncoderEstimator_40efb0f3d4e8a283fb00,
 StringIndexer_48f58be77755e5d30cc4,
 OneHotEncoderEstimator_4ad5a272fa5bbdb0d45e,
 StringIndexer_486ea3e9ef6edf984aae,
 OneHotEncoderEstimator_48d99418c07f8f6ffcef,
 StringIndexer_43b492ff09d666a3e91e,
 OneHotEncoderEstimator_46d58c8915cf0b56ff95,
 StringIndexer_42fb91b4035800c8d5a4,
 OneHotEncoderEstimator_4bd7a5de8feed944ed11,
 StringIndexer_4227835fc94a72b39d69,
 OneHotEncoderEstimator_4506bab06ed4dd0c69d2,
 Str

In [123]:
# Transform the df
# Transformed_df has the new 'features' column, which is a column of sparse vectors
transformed_df = runThroughPipeline(toy_df, stages)

... completed job in 5.466933012008667 seconds


In [124]:
# Look at one example as a spark df
transformed_df.take(1)

[Row(label=0, I1=1, I2=17, I3=3, I4=6, I5=5, I6=6, I7=1, I8=5, I9=6, I10=1, I11=1, I12=-999, I13=6, C1='be589b51', C2='04e09220', C3='71947b86', C4='bf9e41b6', C5='25c83c98', C6='fe6b92e5', C7='83bc92bc', C8='0b153874', C9='a73ee510', C10='ce214b9f', C11='314e05ae', C12='8529d3b4', C13='fc80e4fc', C14='b28479f6', C15='b21f08fe', C16='33a1f420', C17='d4bb7bd8', C18='e161d23a', C19='C19_no_value', C20='C20_no_value', C21='f22e0924', C22='C22_no_value', C23='32c7478e', C24='ded4aac9', C25='C25_no_value', C26='C26_no_value', C1idx=2.0, C1classVec=SparseVector(10, {2: 1.0}), C5idx=0.0, C5classVec=SparseVector(4, {0: 1.0}), C6idx=1.0, C6classVec=SparseVector(6, {1: 1.0}), C8idx=0.0, C8classVec=SparseVector(7, {0: 1.0}), C9idx=0.0, C9classVec=SparseVector(3, {0: 1.0}), C10idx=15.0, C10classVec=SparseVector(22, {15: 1.0}), C14idx=1.0, C14classVec=SparseVector(6, {1: 1.0}), C17idx=1.0, C17classVec=SparseVector(9, {1: 1.0}), C19idx=0.0, C19classVec=SparseVector(9, {0: 1.0}), C20idx=0.0, C20class

In [125]:
# Look at 5 examples as a pandas df
pd.options.display.max_rows=100
pd.DataFrame(transformed_df.take(5), columns=transformed_df.columns).transpose()

Unnamed: 0,0,1,2,3,4
label,0,1,0,0,1
I1,1,0,-999,0,1
I2,17,23,0,3,0
I3,3,47,1,13,5
I4,6,6,2,1,3
I5,5,13044,20,2940,171
I6,6,2546,-999,83,34
I7,1,0,0,1,3
I8,5,7,2,16,34
I9,6,851,2,83,67


In [126]:
pd.options.display.max_rows=50

In [127]:
# Look closely at one of the sparse vector of features
pd.DataFrame(transformed_df.take(5), columns=transformed_df.columns)['features'][0]

SparseVector(113, {2: 1.0, 10: 1.0, 15: 1.0, 20: 1.0, 27: 1.0, 45: 1.0, 53: 1.0, 59: 1.0, 67: 1.0, 76: 1.0, 81: 1.0, 85: 1.0, 90: 1.0, 100: 1.0, 101: 1.0, 102: 1.0, 103: 1.0, 104: 1.0, 105: 1.0, 106: 1.0, 107: 1.0, 108: 1.0, 109: 1.0, 110: 1.0, 111: -1.0, 112: 1.0})

In [105]:
# clean up dataframe and select only needed columns
cols = toy_df.columns
selectedCols = ["features", "label_transformed"] + cols
transformed_df = transformed_df.select(selectedCols)
pd.DataFrame(transformed_df.take(5), columns=transformed_df.columns).transpose()

Unnamed: 0,0,1,2,3,4
features,"(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, ...","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, ...","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
label_transformed,0,1,0,0,1
label,0,1,0,0,1
I1,1,0,-999,0,1
I2,17,23,0,3,0
I3,3,47,1,13,5
I4,6,6,2,1,3
I5,5,13044,20,2940,171
I6,6,2546,-999,83,34
I7,1,0,0,1,3


Now we have a dataframe that has the columns we need, the labels and features.

### Clean version of Transformation Pipeline

In [137]:
# train/test split
def trainTestSplit(df, split=0.8):
    '''Splits df into train and test with split ratio.
    Args: 
        df- Spark dataframe with all data
        split - portion of data into the train set
    Output:
        train_df - Spark dataframe with train split (80 %)
        test_df - Spark dataframe with test split (20 %)
    '''
    start = time.time()
    train_df, test_df = df.randomSplit([split, 1-split], seed=261)
    print("Train set count: ", train_df.count())
    print("Test set count:", test_df.count())
    print(f"... completed job in {time.time() - start} seconds")
    return train_df, test_df

In [165]:
def makeStages(df):
    '''TODO: Docstring'''
    cat_feats = ["C1", "C5", "C6", "C8", "C9", "C10", "C14", "C17", "C19", "C20", "C22", "C23", "C25"]
    num_feats = [i[0] for i in df.dtypes if i[1].startswith("int")]
    num_feats.remove("label")

    # Pipeline step 1: one hot encoding for the categorical variables
    stages = []
    for c in cat_feats:
        # cast each record in in categorical column c to an index
        stridx = StringIndexer(inputCol=c, outputCol = c + "idx").setHandleInvalid("keep")
        # one hot encode the indexed categorical column
        encoder = OneHotEncoderEstimator(inputCols=[stridx.getOutputCol()], outputCols=[c + "classVec"]).setDropLast(False)
        stages += [stridx, encoder]

    # Pipeline step 2: Standardize the numerical features
    for n in num_feats:
        num_assembler = VectorAssembler(inputCols=[n], outputCol=n+"classVec")
        num_scaler = StandardScaler(inputCol=num_assembler.getOutputCol(), outputCol=n+"scaled")
        stages += [num_assembler, num_scaler]

    # Pipeline step 3: index the label column
    label_stridx = StringIndexer(inputCol="label", outputCol="label_transformed")
    stages += [label_stridx]

    # Pipeline step 4: put all features into one column as type of vector
    assembler_inputs = [c + "classVec" for c in cat_feats] + [n+"scaled" for n in num_feats]
    assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")
    stages += [assembler]
    
    return stages

def runThroughPipeline(df, stages):
    
    # fit the pipeline to do the series of fit/transform defined in stages
    start = time.time()
    pipeline = Pipeline(stages=stages)
    pipelineModel = pipeline.fit(df)
    transformed_df = pipelineModel.transform(df)
    
    # clean up dataframe and select only needed columns
    #TODO, do we need to include raw data?
    cols = df.columns
    selectedCols = ["features", "label_transformed"] + cols
#     selectedCols = ["features", "label_transformed"]
    transformed_df = transformed_df.select(selectedCols)
    
    print(f"... completed data transformation in {time.time() - start} seconds")
    
    return transformed_df

In [166]:
# Split the data
train_toy_df, test_toy_df = trainTestSplit(toy_df)

Train set count:  18
Test set count: 7
... completed job in 0.5415830612182617 seconds


In [167]:
# Make the stages
stages = makeStages(train_toy_df)

# Transform the df
# Transformed_df has the new 'features' column, which is a column of sparse vectors with one-hot encoded 
# vectors for categorical variables and normalized integer features.
transformed_df = runThroughPipeline(train_toy_df, stages)


... completed data transformation in 9.591092109680176 seconds


In [168]:
pd.DataFrame(transformed_df.take(5), columns=transformed_df.columns).transpose()

Unnamed: 0,0,1,2,3,4
features,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, ...","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, ...","(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, ...","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, ...","(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, ..."
label_transformed,0,0,0,0,0
label,0,0,0,0,0
I1,-999,-999,-999,-999,0
I2,-1,0,0,2,0
I3,-999,1,7,1,3
I4,-999,2,18,3,0
I5,2975,20,3424,3031,21621
I6,1,-999,22,76,-999
I7,14,0,1,1,-999


In [155]:
def fitLogReg(df):
    start = time.time()
    model = LogisticRegression(featuresCol = 'features', labelCol = 'label_transformed', maxIter=10)
    fitted_model = model.fit(df)
    print(f"... completed job in {time.time() - start} seconds")
    return fitted_model

In [156]:
def evaluateModel(model, true_labels):
    '''Evaluates the model for accuracy, etc.
    '''

In [170]:
start = time.time()
model = LogisticRegression(featuresCol = 'features', labelCol = 'label_transformed', maxIter=10)
fitted_model = model.fit(transformed_df)
print(f"... completed job in {time.time() - start} seconds")


Py4JJavaError: An error occurred while calling o30077.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 648.0 failed 1 times, most recent failure: Lost task 1.0 in stage 648.0 (TID 2267, localhost, executor driver): java.lang.IllegalAccessError: tried to access field org.apache.spark.sql.execution.BufferedRowIterator.partitionIndex from class org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1$project_NestedClass_0
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1$project_NestedClass_0.wholestagecodegen_init_0_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1$project_NestedClass_0.wholestagecodegen_init_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.init(Unknown Source)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:611)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:608)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1092)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1083)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:809)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2131)
	at org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1092)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.fold(RDD.scala:1086)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1131)
	at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:518)
	at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:488)
	at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:278)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:118)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:82)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalAccessError: tried to access field org.apache.spark.sql.execution.BufferedRowIterator.partitionIndex from class org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1$project_NestedClass_0
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1$project_NestedClass_0.wholestagecodegen_init_0_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1$project_NestedClass_0.wholestagecodegen_init_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.init(Unknown Source)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:611)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:608)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1092)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1083)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:809)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


In [28]:
# print out model accuracy
summary = fitted_model.summary
#accuracy = summary.accuracy
#print(accuracy)

# 4. Algorithm Implementation

# 5. Application of Course Concepts