In [20]:
# This version contains feature hashing only, i.e. no OHE, 
# which means no baseline validation based on OHE for now.

In [21]:
from test_helper import Test
import numpy as np
from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.mllib.regression import LabeledPoint
import os.path
from collections import defaultdict
import hashlib
from math import log
from math import exp #  exp(-t) = e^-t

In [22]:
baseDir = os.path.join('data')
inputPath = os.path.join('cs190', 'dac_sample.txt')
fileName = os.path.join(baseDir, inputPath)

if os.path.isfile(fileName):
    rawData = (sc
               .textFile(fileName, 2)
               .map(lambda x: x.replace('\t', ',')))  # work with either ',' or '\t' separated data
    print rawData.take(1)

[u'0,1,1,5,0,1382,4,15,2,181,1,2,,2,68fd1e64,80e26c9b,fb936136,7b4723c4,25c83c98,7e0ccccf,de7995b8,1f89b562,a73ee510,a8cd5504,b2cb9c98,37c9c164,2824a5f6,1adce6ef,8ba8b39a,891b62e7,e5ba7672,f54016b9,21ddcdc9,b1252a9d,07b5194c,,3a171ecb,c5c50484,e8b83407,9727dd16']


In [23]:
weights = [.8, .1, .1]
seed = 42
# Use randomSplit with weights and seed
rawTrainData, rawValidationData, rawTestData = (rawData
                                                .randomSplit(weights, seed)) 
# Cache the data
rawTrainData.cache()
rawValidationData.cache()
rawTestData.cache()

nTrain = rawTrainData.count()
nVal = rawValidationData.count()
nTest = rawTestData.count()
print nTrain, nVal, nTest, nTrain + nVal + nTest
print rawData.take(1)

79911 10075 10014 100000
[u'0,1,1,5,0,1382,4,15,2,181,1,2,,2,68fd1e64,80e26c9b,fb936136,7b4723c4,25c83c98,7e0ccccf,de7995b8,1f89b562,a73ee510,a8cd5504,b2cb9c98,37c9c164,2824a5f6,1adce6ef,8ba8b39a,891b62e7,e5ba7672,f54016b9,21ddcdc9,b1252a9d,07b5194c,,3a171ecb,c5c50484,e8b83407,9727dd16']


In [24]:
def parseHashPoint(point, numBuckets):
    """Create a LabeledPoint for this observation using hashing.

    Args:
        point (str): A comma separated string where the first value is the label and the rest are 
            features.
        numBuckets: The number of buckets to hash to.

    Returns:
        LabeledPoint: A LabeledPoint with a label (0.0 or 1.0) and a SparseVector of hashed
            features.
    """
    splits = [s for s in point.split(',')]
    label = splits.pop(0)
    parsedPoint = [(i, splits[i]) for i in range(len(splits))]
    # parsedPoint is list of tuples of indexed values. The values are either 0 or 1. The index range is large; for example, here they go up to 32768 with the param passed in for the exercise.

    # hashFunction returns dict of int to float, i.e. the keys will be integers which represent the buckets that the features have been hashed to. The value for a given key will contain the count of the (featureID, value) tuples that have been hashed to that key. 
    # Q: How does this work?
    # Q: What is the count for?
    # Q: The same term in different observations will have different indexes. Does this seem arbitrary?
    # Q: How does the overall algorith work? Describe/explain it.
    hashDict = hashFunction(numBuckets, parsedPoint, printMapping=False)
    lp = LabeledPoint(label, SparseVector(numBuckets, hashDict))
    return lp
""""""

''

In [25]:
def hashFunction(numBuckets, rawFeats, printMapping=False):
    """Calculate a feature dictionary for an observation's features based on hashing.

    Note:
        Use printMapping=True for debug purposes and to better understand how the hashing works.

    Args:
        numBuckets (int): Number of buckets to use as features.
        rawFeats (list of (int, str)): A list of features for an observation.  Represented as
            (featureID, value) tuples.
        printMapping (bool, optional): If true, the mappings of featureString to index will be
            printed.

    Returns:
        dict of int to float:  The keys will be integers which represent the buckets that the
            features have been hashed to.  The value for a given key will contain the count of the 
            (featureID, value) tuples that have hashed to that key.
    """
    mapping = {}
    for ind, category in rawFeats:
        featureString = category + str(ind)
        mapping[featureString] = int(int(hashlib.md5(featureString).hexdigest(), 16) % numBuckets)
    if(printMapping): print mapping
    sparseFeatures = defaultdict(float)
    for bucket in mapping.values():
        sparseFeatures[bucket] += 1.0
    return dict(sparseFeatures)

# Reminder of the sample values:
# sampleOne = [(0, 'mouse'), (1, 'black')]
# sampleTwo = [(0, 'cat'), (1, 'tabby'), (2, 'mouse')]
# sampleThree =  [(0, 'bear'), (1, 'black'), (2, 'salmon')]

In [26]:
numBucketsCTR = 2 ** 15
hashTrainData = (rawTrainData
    .map(lambda point: parseHashPoint(point, numBucketsCTR)))
hashTrainData.cache()
hashValidationData = (rawValidationData
    .map(lambda point: parseHashPoint(point, numBucketsCTR)))
hashValidationData.cache()
hashTestData = (rawTestData
    .map(lambda point: parseHashPoint(point, numBucketsCTR)))
hashTestData.cache()

print hashTrainData.take(1)

[LabeledPoint(0.0, (32768,[1305,2883,3807,4814,4866,4913,6952,7117,9985,10316,11512,11722,12365,13893,14735,15816,16198,17761,19274,21604,22256,22563,22785,24855,25202,25533,25721,26487,26656,27668,28211,29152,29402,29873,30039,31484,32493,32708],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]))]


In [27]:
# TODO: Replace <FILL IN> with appropriate code
def evaluateResults(model, data):
    """Calculates the log loss for the data given the model.

    Args:
        model (LogisticRegressionModel): A trained logistic regression model.
        data (RDD of LabeledPoint): Labels and features for each observation.

    Returns:
        float: Log loss for the data.
    """
    return (data.map(lambda lp: computeLogLoss(getP(lp.features, model.weights, model.intercept), lp.label)).mean()) 

"""
logLossTrLR0 = evaluateResults(model0, OHETrainData)
print ('OHE Features Train Logloss:\n\tBaseline = {0:.3f}\n\tLogReg = {1:.3f}'
       .format(logLossTrBase, logLossTrLR0))
"""
""""""

''

In [28]:
def getP(x, w, intercept):
    """Calculate the probability for an observation given a set of weights and intercept. 

    Note:
        We'll bound our raw prediction between 20 and -20 for numerical purposes.

    Args:
        x (SparseVector): A vector with values of 1.0 for features that exist in this
            observation and 0.0 otherwise.
        w (DenseVector): A vector of weights (betas) for the model.
        intercept (float): The model's intercept.

    Returns:
        float: A probability between 0 and 1.
    """
    rawPrediction = x.dot(w) + intercept

    # Bound the raw prediction value
    rawPrediction = min(rawPrediction, 20)
    rawPrediction = max(rawPrediction, -20)
    return 1 / (1 + exp(-rawPrediction))

In [29]:
def computeLogLoss(p, y):
    """Calculates the value of log loss for a given probabilty and label.

    Note:
        log(0) is undefined, so when p is 0 we need to add a small value (epsilon) to it 
        and when p is 1 we need to subtract a small value (epsilon) from it.

    Args:
        p (float): A probabilty between 0 and 1.
        y (int): A label.  Takes on the values 0 and 1.

    Returns:
        float: The log loss value.
    """
    epsilon = 10e-12
    
    if p == 0: 
        p = p + epsilon
        
    elif p == 1:
        p = p - epsilon
        
    if y == 1:
        return -log(p)
    
    elif y == 0:
        return -log(1 - p)
    
""""""

''

In [30]:
numIters = 500
regType = 'l2'
includeIntercept = True

# Initialize variables using values from initial model training
bestModel = None
bestLogLoss = 1e10

In [31]:
# RW: Here we see a dependency on OHEValidationData
# Put this back in to compare against logLossValBase
"""
logLossValBase = (OHEValidationData
                  .map(lambda lp: computeLogLoss(classOneFracTrain, lp.label))
                  .mean())


logLossValLR0 = evaluateResults(model0, OHEValidationData)
print ('OHE Features Validation Logloss:\n\tBaseline = {0:.3f}\n\tLogReg = {1:.3f}'
       .format(logLossValBase, logLossValLR0))
"""
print ''




In [32]:
stepSizes = (1, 10)
regParams = (1e-6, 1e-3)
for stepSize in stepSizes:
    for regParam in regParams:
        model = (LogisticRegressionWithSGD
                 .train(hashTrainData, numIters, stepSize, regParam=regParam, regType=regType, 
                        intercept=includeIntercept))
        logLossVa = evaluateResults(model, hashValidationData)
        print ('\tstepSize = {0:.1f}, regParam = {1:.0e}: logloss = {2:.3f}'
               .format(stepSize, regParam, logLossVa))
        if (logLossVa < bestLogLoss):
            bestModel = model
            bestLogLoss = logLossVa
"""
# RW: Put this back in order to compare with logLossValBase based on OHE
print ('Hashed Features Validation Logloss:\n\tBaseline = {0:.3f}\n\tLogReg = {1:.3f}'
       .format(logLossValBase, bestLogLoss))
"""
""""""

	stepSize = 1.0, regParam = 1e-06: logloss = 0.470
	stepSize = 1.0, regParam = 1e-03: logloss = 0.470
	stepSize = 10.0, regParam = 1e-06: logloss = 0.448
	stepSize = 10.0, regParam = 1e-03: logloss = 0.450


''

In [33]:
# Log loss for the best model from (5d)
logLossTest = evaluateResults(bestModel, hashTestData)

"""
# RW: Put this back in for the logLossTestBaseLine with depends on OHE
# Log loss for the baseline model
logLossTestBaseline = (hashTestData.map(lambda lp: computeLogLoss(classOneFracTrain, lp.label)).mean())

print ('Hashed Features Test Log Loss:\n\tBaseline = {0:.3f}\n\tLogReg = {1:.3f}'
       .format(logLossTestBaseline, logLossTest))
"""
""""""

''

In [34]:
# TEST Evaluate on the test set (5e)
"""
RW: Put this back in for logLossTestBaseline which depends on OHE
Test.assertTrue(np.allclose(logLossTestBaseline, 0.537438),
                'incorrect value for logLossTestBaseline')
"""
Test.assertTrue(np.allclose(logLossTest, 0.455616931), 'incorrect value for logLossTest') 

1 test passed.
