# W261 Final Project

#### *Anusha Munjuluri, Arvindh Ganesan, Kim Vignola, Christina Papadimitriou*

### Notebook Set-up

In [166]:
# imports
import re
import time
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

In [167]:
# store path to notebook
PWD = !pwd
PWD = PWD[0]

In [168]:
# start Spark Session
from pyspark.sql import SparkSession
app_name = "final_project"
master = "local[*]"
spark = SparkSession\
        .builder\
        .appName(app_name)\
        .master(master)\
        .getOrCreate()
sc = spark.sparkContext

## 1. Question Formulation

## 2. Algorithm Explanation

### Data Loading and Pre-Processing

In [169]:
# take a look at the data
!head -n 1 data/train.txt

0	1	1	5	0	1382	4	15	2	181	1	2		2	68fd1e64	80e26c9b	fb936136	7b4723c4	25c83c98	7e0ccccf	de7995b8	1f89b562	a73ee510	a8cd5504	b2cb9c98	37c9c164	2824a5f6	1adce6ef	8ba8b39a	891b62e7	e5ba7672	f54016b9	21ddcdc9	b1252a9d	07b5194c		3a171ecb	c5c50484	e8b83407	9727dd16


In [170]:
# load the data
fullTrainRDD = sc.textFile('data/train.txt')
testRDD = sc.textFile('data/test.txt')

FIELDS = ['I1','I2','I3','I4','I5','I6','I7','I8','I9','I10','I11','I12','I13',
          'C1','C2','C3','C4','C5','C6','C7','C8','C9','C10','C11','C12','C13','C14',
          'C15','C16','C17','C18','C19','C20','C21','C22','C23','C24','C25','C26','Label']

In [None]:
# number of rows in train/test data
print(f"Number of records in train data: {fullTrainRDD.count()} ...")
print(f"Number of records in test data: {testRDD.count()} ...")

In [None]:
# Generate 80/20 (pseudo)random train/test split 
trainRDD, heldOutRDD = fullTrainRDD.randomSplit([0.8,0.2], seed = 1)
print(f"... held out {heldOutRDD.count()} records for evaluation and assigned {trainRDD.count()} for training.")

In [171]:
toyRDD, trainRDD2 = trainRDD.randomSplit([0.001,0.999], seed = 2)

In [172]:
toyRDD.take(5)

['1\t5\t2\t\t\t1382\t17\t78\t25\t76\t0\t9\t\t\t05db9164\t942f9a8d\t56472604\t53a5d493\t25c83c98\t\t49b74ebc\t6c41e35e\ta73ee510\te113fc4b\tc4adf918\t08531bcb\t85dbe138\t1adce6ef\tae97ecc3\t76b06ec3\te5ba7672\t1f868fdd\t9437f62f\ta458ea53\tff4c70b8\t\t32c7478e\tda89b7d5\t7a402766\tc7beb94e',
 '1\t2\t12\t8\t3\t937\t7\t36\t6\t73\t1\t10\t\t4\t05db9164\tbce95927\t02391f51\tb9c629a9\t25c83c98\t7e0ccccf\t9971a939\t6a698541\ta73ee510\t2124a520\t3ac87d37\t2397259a\tcde3ec68\t07d13a8f\tfec218c0\td37efe8c\te5ba7672\t04d863d5\t21ddcdc9\t5840adea\tb6119319\t\t423fab69\t45ab94c8\te8b83407\tb13f4ade',
 '0\t\t51\t74\t\t39039\t65\t1\t0\t5\t\t0\t\t\t05db9164\t0468d672\t1c74e7a5\ta98187d1\t25c83c98\tfe6b92e5\t306a1d05\tf504a6f4\ta73ee510\t3b08e48b\t788bd9f4\t6f0b856c\t71b17693\t07d13a8f\tb4512bcd\t6b56c939\t07c540c4\t0c4e94df\t21ddcdc9\t5840adea\tf7d23965\t\t93bad2c0\t61e3864e\tea9a246c\tfcd456fa',
 '0\t\t0\t2\t4\t3288\t101\t3\t23\t37\t\t1\t\t21\t05db9164\td833535f\tb00d1501\td16679b9\t4cf72387\t7e0ccccf

In [173]:
# helper functions
def parse(line):
    """
    Map line --> tuple of (features, label)
    """
    fields = np.array(line.split('\t'))
    features,label = fields[1:14], fields[0]
    return(features, label)

def edit_data_types(line):
    """
    Map tuple of (features, label) --> tuple of (formated features, label)
    
    * '' is replaced with 'null'
    * numerical fields are converted to integers
    """
    features, label = line[0], line[1]
    formated_features = []
    for i, value in enumerate(features):
        if value == '':
            formated_features.append(np.nan)
        else:
            if i < 13:
                formated_features.append(float(value)) 
            else:
                formated_features.append(value)
    return (formated_features, label)

In [174]:
#trainRDDCached = trainRDD.map(parse).map(edit_data_types).cache()
toyRDDCached1 = toyRDD.map(parse).map(edit_data_types).cache()

In [175]:
print(toyRDDCached1.take(1))

[([5.0, 2.0, nan, nan, 1382.0, 17.0, 78.0, 25.0, 76.0, 0.0, 9.0, nan, nan], '1')]


In [176]:
sample = np.array(toyRDDCached1.map(lambda x: np.append(x[0], [x[1]])).takeSample(False, 1000))
sample_df = pd.DataFrame(np.array(sample), columns = ['I1','I2','I3','I4','I5','I6','I7','I8','I9','I10','I11','I12','I13', 'Label'])

In [177]:
columns = (['I1','I2','I3','I4','I5','I6','I7','I8','I9','I10','I11','I12','I13', 'Label'])
#columns = ['I1', 'I2', 'I3', 'I4', 'I5', 'I6', 'I7', 'I8', 'I9', 'I10', 'I11', 'I12', 'I13']
sample_numeric = sample_df.reindex(columns=columns)
sample_numeric[columns] = sample_numeric[columns].astype(np.float)

In [178]:
"""Get means and standard deviations. Ideally we should do this in the RDD vs. pandas"""

means = []
stdevs = []

for i in sample_numeric.columns[0:13]:
    mean = np.nanmean(sample_numeric[i])
    means.append(mean)
    std = np.nanstd(sample_numeric[i])
    stdevs.append(std)
        
print(means)
print(stdevs)


[3.6761061946902656, 129.768, 19.083969465648856, 7.478535353535354, 24031.09182643794, 113.30208333333333, 16.173821989528797, 12.457, 102.717277486911, 0.6495575221238938, 2.8293193717277485, 1.1415929203539823, 8.308080808080808]
[9.04753693892924, 460.60738615007034, 35.28575565553717, 8.70109965628185, 92782.65943349876, 343.9023789473689, 49.37351119685013, 12.67218019916068, 232.37267788885407, 0.7196513282846247, 5.737100628411079, 5.262771665595769, 11.208584163174447]


In [179]:
"""Get medians for each class. Ideally we should do this in the RDD vs. pandas"""

median1 = np.array(sample_numeric[sample_numeric['Label'] == 1.0].median().tolist())
print(median1)

median0 = np.array(sample_numeric[sample_numeric['Label'] == 0.0].median().tolist())
print(median1)

[2.0000e+00 4.0000e+00 6.0000e+00 4.0000e+00 1.1855e+03 1.4000e+01
 7.0000e+00 8.0000e+00 3.2000e+01 1.0000e+00 2.0000e+00 0.0000e+00
 3.0000e+00 1.0000e+00]
[2.0000e+00 4.0000e+00 6.0000e+00 4.0000e+00 1.1855e+03 1.4000e+01
 7.0000e+00 8.0000e+00 3.2000e+01 1.0000e+00 2.0000e+00 0.0000e+00
 3.0000e+00 1.0000e+00]


In [180]:
# helper functions
def parse(line):
    """
    Map line --> tuple of (features, label)
    """
    fields = np.array(line.split('\t'))
    features,label = fields[1:14], fields[0]
    return(features, label)


def update_nans(line):
    """
    Map tuple of (features, label) --> tuple of (formated features, label)
    
    * '' is replaced with 'null'
    * numerical fields are converted to integers
    """
    
    #median1 = np.array([2.0, 3.5, 4.0, 4.0, 1362.0, 13.5, 8.0, 7.0, 42.5, 1.0, 2.0, 0.0, 3.0, 1.0])
    #median0 = np.array([0.0, 2.0, 7.0, 5.0, 3539.0, 46.5, 2.0, 8.0, 38.5, 0.0, 1.0, 0.0, 5.0, 0.0])
        
    features, label = line[0], float(line[1])
    formated_features = []
    for i, value in enumerate(features):
        if value == '' and label == 1.0:
            formated_features.append(float(median1[i]))
        elif value == '' and label == 0.0:
            formated_features.append(float(median0[i]))
        else:
            if i < 13:
                formated_features.append(float(value)) 
            else:
                formated_features.append(value)
    return (formated_features, label)

In [181]:
toyRDDCached = toyRDD.map(parse).map(update_nans)

In [182]:
# part d - helper function to normalize the data (FILL IN THE MISSING CODE BELOW)
def normalize(dataRDD):
    
    featureMeans = np.array(means)
    featureStdev = np.array(stdevs)
    
    #sc.broadcast(featureMeans)
    #sc.broadcast(featureStdevs)
    
    ################ YOUR CODE HERE #############
        
    normedRDD = dataRDD.map(lambda x: ((x[0]-featureMeans)/featureStdev, x[1]))
    
    ################ FILL IN YOUR CODE HERE #############
    
    return normedRDD

In [184]:
normedRDD.take(3)

[(array([ 0.27080439, -0.26654119, -0.32657669, -0.41851167, -0.24265445,
         -0.38302342,  0.86688289,  0.89503667, -0.15117633, -0.84217242,
          1.05136664, -0.40703114, -0.54504306]), 1.0),
 (array([-0.12509599, -0.24048226, -0.24962123, -0.54366042, -0.24878794,
         -0.42130147,  0.26723367, -0.5235247 , -0.16401474,  0.60118154,
          1.22386501, -0.40703114, -0.43805518]), 1.0),
 (array([-0.38902958, -0.13885242,  1.02014377, -0.29336292,  0.27637699,
         -0.19928876, -0.23247401, -0.97149145, -0.45501867, -0.84217242,
         -0.50111868, -0.40703114, -0.3310673 ]), 0.0)]

## 3. EDA & Discussion of Challenges

In [None]:
# sample = np.array(trainRDDCached.map(lambda x: np.append(x[0], [x[1]])).takeSample(False, 1000))
# sample_df = pd.DataFrame(np.array(sample), columns = FIELDS)

In [None]:
sample_df.iloc[:,0:21].describe(include = "all")

In [None]:
sample_df.iloc[:,21:39].describe(include = "all")

In [None]:
# # Take a subset of the dataframe with only numeric features
# sample_numeric = sample_df[FIELDS[0:13]]
# columns = ['I1', 'I2', 'I3', 'I4', 'I5', 'I6', 'I7', 'I8', 'I9', 'I10', 'I11', 'I12', 'I13']
# sample_numeric = sample_num.reindex(columns=columns)
# sample_numeric[columns] = sample_numeric[columns].astype(np.float)

In [None]:
# Take a look at histograms for each feature (RUN THIS CELL AS IS)
sample_numeric.hist(figsize=(23,15), bins=15)
#sample_numeric[FIELDS[:-1]].hist(figsize=(15,15), bins=15)
plt.show()

In [None]:
# part b -  plot boxplots of each feature vs. the outcome (RUN THIS CELL AS IS)

fig, ax_grid = plt.subplots(5, 3, figsize=(23,15))
y = sample_df['Label']
for idx, feature in enumerate(FIELDS[0:13]):
    x = sample_num[feature]
    sns.boxplot(x, y, ax=ax_grid[idx//3][idx%3], orient='h', linewidth=.5)
    ax_grid[idx//3][idx%3].invert_yaxis()
fig.suptitle("BoxPlots by Label", fontsize=15, y=0.9)
plt.show()

In [None]:
corr = sample_numeric[FIELDS[:13]].corr()
fig, ax = plt.subplots(figsize=(15, 13))
mask = np.zeros_like(corr, dtype=np.bool)
mask[np.triu_indices_from(mask)] = True
cmap = sns.diverging_palette(10, 240, as_cmap=True)
sns.heatmap(corr, mask=mask, cmap=cmap, center=0, linewidths=.5)
plt.title("Correlations between features")
plt.show()

## 4. Algorithm Implementation

In [None]:
# part e - define your baseline model here
BASELINE = np.array([1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])

In [None]:
# part d - write function to compute loss (FILL IN MISSING CODE BELOW)
def LRLoss(cachedRDD, W):
    """
    Compute mean squared error.
    Args:
        dataRDD - each record is a tuple of (features_array, y)
        W       - (array) model coefficients with bias at index 0
    """
    augmentedData = cachedRDD.map(lambda x: (np.append([1.0], x[0]), x[1]))
    ################## YOUR CODE HERE ##################
    
    
    loss = augmentedData.map(lambda x: np.log(1.0 + np.exp(np.multiply(-x[1], (np.dot(W, x[0])))))).mean()

    #loss = augmentedData.map(lambda x: np.log(1.0 + np.exp(np.multiply(-x[1], (np.dot(W, x[0][1:]) + x[0][0]))))).sum()

    ################## (END) YOUR CODE ##################
    return loss

In [None]:
LRLoss(normedRDD, BASELINE)

In [None]:
W = np.array([meanQuality, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])
learningRate = 0.1
#augmentedData = toyRDDnumeric.map(lambda x: (np.append([1.0], x[0]), x[1])).cache()
#grad = augmentedData.map(lambda x: np.dot(np.multiply(-x[1], (1.0 - 1.0/(1.0 + np.exp(np.multiply(-x[1], (np.dot(W, x[0][1:]) + x[0][0]))))), x[0]))).sum()
#new_model = W - (learningRate * grad)


grad = augmentedData.map(lambda x: (-x[1] * (1.0-(1.0/(1.0 + (np.exp(np.multiply(-x[1], (np.dot(W, x[0][1:]) + x[0][0])))))))) * x[0][1:]).sum()
print(grad)

new_model = W - (learningRate * grad)
print(new_model)
#grad = np.dot(loss_2, x[0]).sum()

In [None]:
# part b - function to perform a single GD step
def GDUpdate(dataRDD, W, learningRate):
    """
    Perform one OLS gradient descent step/update.
    Args:
        dataRDD - records are tuples of (features_array, y)
        W       - (array) model coefficients with bias at index 0
    Returns:
        new_model - (array) updated coefficients, bias at index 0
    """
    # add a bias 'feature' of 1 at index 0
    augmentedData = dataRDD.map(lambda x: (np.append([1.0], x[0]), x[1])).cache()
    
    ################## YOUR CODE HERE ################# 
    
    grad = augmentedData.map(lambda x: (-x[1] * (1.0-(1.0/(1.0 + (np.exp(np.multiply(-x[1], (np.dot(W, x[0]))))))))) * x[0]).mean()

    new_model = W - (learningRate * grad)

    ################## (END) YOUR CODE ################# 

    return new_model

In [None]:
GDUpdate(normedRDD, BASELINE, learningRate=learningRate)

In [None]:
# part c - take a look at a few Gradient Descent steps (RUN THIS CELL AS IS)
nSteps = 5
model = BASELINE
print(f"BASELINE:  Loss = {LRLoss(normedRDD,model)}")
for idx in range(nSteps):
    print("----------")
    print(f"STEP: {idx+1}")
    model = GDUpdate(normedRDD, model)
    loss = LRLoss(normedRDD, model)
    print(f"Loss: {loss}")
    print(f"Model: {[round(w,3) for w in model]}")

In [None]:
# linear_weights = nb.feature_log_prob_[1] - nb.feature_log_prob_[0]

# top_negative_features = np.argsort(linear_weights)[0:10]

# top_positive_features = np.flip(np.argsort(linear_weights)[-10:],0)

In [None]:
# part b - OLS gradient descent function
def GradientDescent(trainRDD, testRDD, wInit, nSteps = 20, 
                    learningRate = 0.1, verbose = False):
    """
    Perform nSteps iterations of OLS gradient descent and 
    track loss on a test and train set. Return lists of
    test/train loss and the models themselves.
    """
    # initialize lists to track model performance
    train_history, test_history, model_history = [], [], []
    
    # perform n updates & compute test and train loss after each
    model = wInit
    for idx in range(nSteps): 
        
        ############## YOUR CODE HERE #############
        
        model = GDUpdate(trainRDD, model)
        training_loss = OLSLoss(trainRDD, model)
        test_loss = OLSLoss(testRDD, model)
                
        ############## (END) YOUR CODE #############
        
        # keep track of test/train loss for plotting
        train_history.append(training_loss)
        test_history.append(test_loss)
        model_history.append(model)
        
        # console output if desired
        if verbose:
            print("----------")
            print(f"STEP: {idx+1}")
            print(f"training loss: {training_loss}")
            print(f"test loss: {test_loss}")
            print(f"Model: {[round(w,3) for w in model]}")
    return train_history, test_history, model_history

In [None]:
# plot error curves - RUN THIS CELL AS IS
def plotErrorCurves(trainLoss, testLoss, title = None):
    """
    Helper function for plotting.
    Args: trainLoss (list of MSE) , testLoss (list of MSE)
    """
    fig, ax = plt.subplots(1,1,figsize = (16,8))
    x = list(range(len(trainLoss)))[1:]
    ax.plot(x, trainLoss[1:], 'k--', label='Training Loss')
    ax.plot(x, testLoss[1:], 'r--', label='Test Loss')
    ax.legend(loc='upper right', fontsize='x-large')
    plt.xlabel('Number of Iterations')
    plt.ylabel('Mean Squared Error')
    if title:
        plt.title(title)
    plt.show()

In [None]:
# run 50 iterations (RUN THIS CELL AS IS)
wInit = BASELINE
trainRDD, testRDD = normedRDD.randomSplit([0.8,0.2], seed = 2018)
start = time.time()
MSEtrain, MSEtest, models = GradientDescent(trainRDD, testRDD, wInit, nSteps = 50)
print(f"\n... trained {len(models)} iterations in {time.time() - start} seconds")

In [None]:
# code from async

def logisticReg_GD_Spark(data,y,w=None,eta=0.05,iter_num=500,regPara=0.01, stopCriteria=0.0001,reg="Lasso"): 
    #eta learning rate 
    #regPara 
    dataRDD = sc.parallelize(np.append(y[:,None],data,axis=1)).cache() 
    if w is None: 
       w = np.random.normal(size=data.shape[1]+1) 
    for i in range(iter_num): 
       w_broadcast = sc.broadcast(w) 
       g = dataRDD.map(lambda x: −x[0]*{1−1/(1+np.exp(−x[0]) 
       *np.dot(w_broadcast.value,np.append(x[1:],1))))) \ 
             *np.append(x[1:],1)).reduce[lambda x,y:x+y)/data.shape[0] 
             # Gradient of logloss 
       if reg == "Ridge": 
          wreg = w*1 
          wreg[−1] = 0 #last value of weight vector is bias term; 
          ignore in regularization 
       elif reg == "Lasso": 
          wreg = w*1 
          wreg[−1] = 0 #last value of weight vector is bias term; 
          ignore in regularization 
          wreg = (wreg>0).astype(int)*2−1 
       else: 
          wreg = np.zeros(w.shape[0]) 
       wdelta = eta*(g+regPara*wreg) #gradient: hinge loss + regularized term 
       if sum(abs(wdelta))<=stopCriteria*sum(abs(w)): # converged as updates 
       to weight vector are small 
          break 
       w = w − wdelta 
    return w

In [None]:
# Generate 80/20 (pseudo)random train/test split 
trainRDD, heldOutRDD = fullTrainRDD.randomSplit([0.8,0.2], seed = 1)
print(f"... held out {heldOutRDD.count()} records for evaluation and assigned {trainRDD.count()} for training.")

In [None]:
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.mllib.util import MLUtils

# Load and parse the data file into an RDD of LabeledPoint.
data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt')
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a DecisionTree model.
#  Empty categoricalFeaturesInfo indicates all features are continuous.
model = DecisionTree.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={},
                                     impurity='gini', maxDepth=5, maxBins=32)

# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(
    lambda lp: lp[0] != lp[1]).count() / float(testData.count())
print('Test Error = ' + str(testErr))
print('Learned classification tree model:')
print(model.toDebugString())

# Save and load model
model.save(sc, "target/tmp/myDecisionTreeClassificationModel")
sameModel = DecisionTreeModel.load(sc, "target/tmp/myDecisionTreeClassificationModel")

In [None]:
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.mllib.util import MLUtils

# Load and parse the data file into an RDD of LabeledPoint.
data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt')
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a DecisionTree model.
#  Empty categoricalFeaturesInfo indicates all features are continuous.
model = DecisionTree.trainRegressor(trainingData, categoricalFeaturesInfo={},
                                    impurity='variance', maxDepth=5, maxBins=32)

# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testMSE = labelsAndPredictions.map(lambda lp: (lp[0] - lp[1]) * (lp[0] - lp[1])).sum() /\
    float(testData.count())
print('Test Mean Squared Error = ' + str(testMSE))
print('Learned regression tree model:')
print(model.toDebugString())

# Save and load model
model.save(sc, "target/tmp/myDecisionTreeRegressionModel")
sameModel = DecisionTreeModel.load(sc, "target/tmp/myDecisionTreeRegressionModel")

## 5. Application of Course Concepts