#### Logistic Regression on Toy Data Set with Gradient Descent

In [2]:
# imports
import numpy as np
import pandas as pd
#pyspark dependencies
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.sql import SQLContext, functions as f
from pyspark.sql.types import *

In [3]:
# start Spark Session
from pyspark.sql import SparkSession
app_name = "toy"
master = "local[*]"
spark = SparkSession\
        .builder\
        .appName(app_name)\
        .master(master)\
        .getOrCreate()
sc = spark.sparkContext

In [247]:
#loading the data
#setting schema and reading in pre-processed data to pyspark dataframe
intFeatures = ['intFeature1','intFeature2','intFeature3','intFeature4']
catFeatures = ['catFeature5','catFeature6']
outcomeField = [StructField("click", IntegerType(), True)]
quantFields = [StructField(f, DoubleType(), True) for f in intFeatures]
qualFields = [StructField(f, StringType(), True) for f in catFeatures]
schema = StructType(outcomeField + quantFields + qualFields)

toyDf = spark.read \
    .schema(schema) \
    .format("csv") \
    .option("header", "true") \
    .load("toySample/*.csv")

In [248]:
toyDf.show(20)

+-----+-------------------+--------------------+--------------------+--------------------+-----------+-----------+
|click|        intFeature1|         intFeature2|         intFeature3|         intFeature4|catFeature5|catFeature6|
+-----+-------------------+--------------------+--------------------+--------------------+-----------+-----------+
|    0|0.38044728201922134|  1.0461280017194063|  0.8317161330745142|  0.3659735546106383|   25c83c98|   6f6d9be8|
|    1|0.38044728201922134| -1.2072044937875812| -1.0969510532590658|  -0.808690496702659|   25c83c98|   7e0ccccf|
|    0|0.38044728201922134| -1.2072044937875812|  1.7753909926499136|  0.3659735546106383|   25c83c98|   fbad5c96|
|    0|-1.3933721049834424| -1.2072044937875812|  1.9415838693847296|   1.681697323928138|   25c83c98|   7e0ccccf|
|    1|-1.3933721049834424| -1.2072044937875812|  1.1861284542225656|   1.681697323928138|   384874ce|   7e0ccccf|
|    1|-0.5064624114821106| -1.2072044937875812|  0.7709669997151446|0.021438776

In [249]:
def OneHotEncoder(dataframe,columns):
    '''takes a dataframe and corresponding list of columns
    to one-hot encode'''
    for c in columns:
        # collect unique levels in category
        levels = dataframe.select(c).distinct().rdd.flatMap(lambda x: x).collect()
        #generate dummy variables and associated values
        dummy_vals = [f.when(f.col(c) == level, 1).otherwise(0).alias("encoded_" + level) for level in levels]
        #update dataframe with new dummy columns (indicator features)
        
        dataframe = dataframe.select('*',*dummy_vals)
    #drop unencoded categorical columns from dataframe    
    dataframe = dataframe.drop(*columns)
    return dataframe 

In [250]:
#encode all categorical columns
categories = [c for c in toyDf.columns if 'cat' in c]
toy_df_encoded = OneHotEncoder(toyDf,categories)
print('there are now ' + str(len(toy_df_encoded.columns)) + ' columns')

there are now 42 columns


In [251]:
#notice sparsity from dummy variables
toy_df_encoded.show(1)

+-----+-------------------+------------------+------------------+------------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+-------------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+-----------------+----------------+----------------+----------------+----------------+----------------+
|click|        intFeature1|       intFeature2|       intFeature3|       intFeature4|encoded_bf9f7f48|encoded_5a3e1872|encoded_65be028e|encoded_2c6b8ded|encoded_89ff5705|encoded_3a136cf2|encoded_43b19349|encoded_afcf7897|encoded_f3474129|encoded_Rare_Bucket|encoded_b2241560|encoded_

In [253]:
#convert dataframe to RDD
toyRDD = toy_df_encoded.rdd.map(lambda x: (x[0],x[1:])).cache()

In [254]:
#setting coefficient of the "bias" as the mean click rate
meanClick = toyRDD.map(lambda x: (x[0])).mean()
feature_cols = len(toyRDD.take(1)[0][1])
coefs = np.array([meanClick] + [0.0]*(feature_cols))

In [255]:
def LogLoss(RDD,W):
    """
    augments rdd and returns log loss
    - why we augment: add a vector 
    entry of 1 to correspond with the bias term 
    so that we can apply the model to the data point 
    using vector multiplication without the added 
    step of adding the bias.
    
    Args:
        dataframe - columns (target,features...)
        W       - (array) model coefficients with bias at index 0
    
    Reference
        def sigmoid(z):
            return 1 / (1 + np.exp(-z))
        z = np.dot(X, theta)
        h = sigmoid(z)
        def loss(h, y):
            return (-y * np.log(h) - (1 - y) * np.log(1 - h)).mean()
    """
    #helper function to compute sigmoid
    def sigmoid(z):
        return 1 / (1 + np.exp(-z))
    #generate augmented rdd of (features,target)
    
    augmentedData = RDD.map(lambda x: (np.append([1.0],x[1:]),x[0]))
    
    log_loss = augmentedData \
    .map(lambda x: (np.dot(x[0],W),x[1])) \
    .map(lambda x: (sigmoid(x[0]),x[1])) \
    .map(lambda x: (-x[1]*np.log(x[0]) - (1-x[1])*np.log(1-x[0]))) \
    .mean()
    
    return log_loss

In [256]:
LogLoss(toyRDD,coefs)

0.7644838394523965

In [257]:
N = sc.broadcast(toyRDD.count())

In [260]:
# function to perform a single GD step
def GDUpdate(RDD, W, learningRate = 0.1):
    """
    Perform one OLS gradient descent step/update.
    Args:
        dataRDD - records are tuples of (features_array, y)
        W       - (array) model coefficients with bias at index 0
    Returns:
        new_model - (array) updated coefficients, bias at index 0
        
    Reference: gradient = np.dot(X.T, (h - y)) / num_observations
        - see above LogLoss function for definition of h and y
    """
    # add a bias 'feature' of 1 at index 0 and convert to array
    
    #generate augmented rdd of (features,target)
    augmentedData = RDD.map(lambda x: (np.append([1.0],x[1:]),x[0]))
    
    #helper function to compute sigmoid
    def sigmoid(z):
        return 1 / (1 + np.exp(-z))

    #calculate gradient
    getVals = augmentedData \
            .map(lambda x: (np.dot(x[0],coefs),x[0],x[1])) \
            .map(lambda x: (sigmoid(x[0]),x[1],x[2])) \
            .collect()
    
    features = []
    predictions = []
    labels = []
    
    for v in getVals:
        features.append(v[1])
        predictions.append(v[0])
        labels.append(v[2])
    
    f = np.transpose(features)
    l = np.array(labels)
    p = np.array(predictions)
    
    gradient = np.dot(f,(p-l))/N.value
    
    #apply learning rate to gradient and generate new coefficients
    update = np.multiply(gradient,learningRate)
    
    #original model is the bias + assigned coefficients; update the model with the adjusted coefficients
    new_model = W - update
    ################## (END) YOUR CODE ################# 
   
    return new_model

In [261]:
nSteps = 20
model = coefs
print(f"BASELINE:  Loss = {LogLoss(toyRDD,model)}")
for idx in range(nSteps):
    print("----------")
    print(f"STEP: {idx+1}")
    model = GDUpdate(toyRDD, model)
    loss = LogLoss(toyRDD, model)
    print(f"Loss: {loss}")
    print(f"Model: {[round(w,3) for w in model]}")

BASELINE:  Loss = 0.7644838394523965
----------
STEP: 1
Loss: 0.7496780805769259
Model: [0.248, 0.003, 0.004, -0.0, -0.002, 0.0, 0.0, -0.0, -0.0, -0.0, 0.0, -0.002, 0.0, -0.0, -0.0, -0.0, -0.0, -0.02, -0.0, -0.0, -0.0, -0.001, -0.0, -0.0, -0.0, -0.0, -0.0, -0.0, -0.004, -0.0, -0.0, -0.0, -0.0, -0.0, -0.006, -0.001, -0.003, 0.0, -0.001, -0.001, -0.006, -0.011]
----------
STEP: 2
Loss: 0.7355282612251448
Model: [0.219, 0.006, 0.008, -0.001, -0.004, 0.0, 0.0, -0.0, -0.0, -0.0, 0.0, -0.005, 0.0, -0.0, -0.0, -0.0, -0.0, -0.04, -0.0, -0.001, -0.0, -0.002, -0.0, -0.0, -0.001, -0.0, -0.0, -0.0, -0.007, -0.0, -0.0, -0.0, -0.0, -0.0, -0.013, -0.001, -0.007, 0.0, -0.002, -0.002, -0.012, -0.022]
----------
STEP: 3
Loss: 0.7220378343443428
Model: [0.19, 0.009, 0.012, -0.001, -0.007, 0.0, 0.0, -0.0, -0.0, -0.0, 0.0, -0.007, 0.0, -0.0, -0.0, -0.001, -0.0, -0.06, -0.0, -0.001, -0.0, -0.003, -0.001, -0.0, -0.001, -0.0, -0.001, -0.0, -0.011, -0.0, -0.0, -0.0, -0.0, -0.0, -0.019, -0.002, -0.01, 0.0, -0.0