In [1]:
from collections import namedtuple, defaultdict
import pandas as pd
import numpy as np
import hashlib
from math import log, exp

from pyspark.mllib.regression import LabeledPoint

# for viewing data samples
pd.options.display.max_rows = 5000

# base directory
DIR = '/Data/testfile_'

In [2]:
def hashFunction(numBuckets, rawFeats, printMapping=False):
    """Calculate a feature dictionary for an observation's features based on hashing.

    Note:
        Use printMapping=True for debug purposes and to better understand how the hashing works.

    Args:
        numBuckets (int): Number of buckets to use as features.
        rawFeats (list of (int, str)): A list of features for an observation.  Represented as
            (featureID, value) tuples.
        printMapping (bool, optional): If true, the mappings of featureString to index will be
            printed.

    Returns:
        dict of int to float:  The keys will be integers which represent the buckets that the
            features have been hashed to.  The value for a given key will contain the count of the
            (featureID, value) tuples that have hashed to that key.
    """
    mapping = {}
    for ind, category in rawFeats:
        featureString = category + str(ind)
        mapping[featureString] = int(int(hashlib.md5(featureString).hexdigest(), 16) % numBuckets)
    if(printMapping): print mapping
    sparseFeatures = defaultdict(float)
    for bucket in mapping.values():
        sparseFeatures[bucket] += 1.0
    return dict(sparseFeatures)


def parseHashPoint(point, numBuckets):
    """Create a LabeledPoint for this observation using hashing.

    Args:
        point (str): A comma separated string where the first value is the label and the rest are
            features.
        numBuckets: The number of buckets to hash to.

    Returns:
        LabeledPoint: A LabeledPoint with a label (0.0 or 1.0) and a SparseVector of hashed
            features.
    """
    splits = point.split(',')
    fields = [ (i,v) for i,v in enumerate(splits[1:]) ]
    vec = SparseVector(numBuckets, hashFunction(numBuckets, fields))
    return LabeledPoint(splits[0], vec)


def getP(x, w, intercept):
    """Calculate the probability for an observation given a set of weights and intercept.

    Note:
        We'll bound our raw prediction between 20 and -20 for numerical purposes.

    Args:
        x (SparseVector): A vector with values of 1.0 for features that exist in this
            observation and 0.0 otherwise.
        w (DenseVector): A vector of weights (betas) for the model.
        intercept (float): The model's intercept.

    Returns:
        float: A probability between 0 and 1.
    """
    rawPrediction = x.dot(w)+intercept

    # Bound the raw prediction value
    rawPrediction = min(rawPrediction, 20)
    rawPrediction = max(rawPrediction, -20)
    return 1.0/(1+exp(-rawPrediction))


def computeLogLoss(p, y):
    """Calculates the value of log loss for a given probabilty and label.

    Note:
        log(0) is undefined, so when p is 0 we need to add a small value (epsilon) to it
        and when p is 1 we need to subtract a small value (epsilon) from it.

    Args:
        p (float): A probabilty between 0 and 1.
        y (int): A label.  Takes on the values 0 and 1.

    Returns:
        float: The log loss value.
    """
    epsilon = 10e-12
    if p==0:
        p+=epsilon
    elif p==1:
        p-=epsilon
    if y==1:
        return -log(p)
    elif y==0:
        return -log(1-p)
    else:
        raise Exception('y not in {0,1}')


def getP(x, w, intercept):
    """Calculate the probability for an observation given a set of weights and intercept.

    Note:
        We'll bound our raw prediction between 20 and -20 for numerical purposes.

    Args:
        x (SparseVector): A vector with values of 1.0 for features that exist in this
            observation and 0.0 otherwise.
        w (DenseVector): A vector of weights (betas) for the model.
        intercept (float): The model's intercept.

    Returns:
        float: A probability between 0 and 1.
    """
    rawPrediction = x.dot(w)+intercept

    # Bound the raw prediction value
    rawPrediction = min(rawPrediction, 20)
    rawPrediction = max(rawPrediction, -20)
    return 1.0/(1+exp(-rawPrediction))

In [3]:
# named tuples for code readability 
Files = namedtuple('filelist',['categorical','date','numeric'])
Header = namedtuple('header',['categorical','date','numeric'])
Data = namedtuple('data',['categorical','date','numeric','outcome'])

# declare train file locations 
train_files = Files(DIR+'train_categorical.csv.gz',
                DIR+'train_date.csv.gz',
                DIR+'train_numeric.csv.gz')
test_files = Files(DIR+'test_categorical.csv.gz',
                DIR+'test_date.csv.gz',
                DIR+'test_numeric.csv.gz')

In [4]:
# cache raw data 
train_raw = Data(sc.textFile(train_files.categorical).cache(),
                 sc.textFile(train_files.date).cache(),
                 sc.textFile(train_files.numeric).cache(),None)
test_raw = Data(sc.textFile(test_files.categorical).cache(),
                sc.textFile(test_files.date).cache(),
                sc.textFile(test_files.numeric).cache(),None)

# headers 
get_header = lambda x: list(pd.read_csv(x,nrows=0).columns.values)
train_headers = Header(get_header(train_files.categorical),
                       get_header(train_files.date),
                       get_header(train_files.numeric))
test_headers = Header(get_header(test_files.categorical),
                       get_header(test_files.date),
                       get_header(test_files.numeric))

In [5]:
# filter out first row
remove_header = lambda x: x.split(',')[0]!='Id'


# return outcome from numeric data
get_outcome = lambda (k,v): (k, int(v[-1]))
subtract_outcome = lambda (k,v): (k, v[:-1])


# split key and fields
id_split = lambda x: (int(x.split(',')[0]),x.split(',')[1:])
def header_key(x, headers):
    ''' split id from fields
        emit header as key
    '''
    id_int = x[0]
    fields = x[1]
    for i, f in enumerate(fields):
        h = headers[1:][i]
        yield (h, [(id_int, f)])


# gather numeric features
def convert_numeric(x):
    ''' Gather numeric features '''
    k,v = x
    num = float(v[0][1]) if len(v[0][1])>0 else 0
    return (k, [(v[0][0], num)])

In [20]:
# filter header from data and get outcome
train_filtered = Data(
    train_raw.categorical.filter(remove_header).map(id_split),
    train_raw.date.filter(remove_header).map(id_split),
    train_raw.numeric.filter(remove_header).map(id_split),
    train_raw.numeric.filter(remove_header).map(id_split).map(get_outcome)
)
test_filtered = Data(
    test_raw.categorical.filter(remove_header).map(id_split),
    test_raw.date.filter(remove_header).map(id_split),
    test_raw.numeric.filter(remove_header).map(id_split),
    None #unknown
)

In [21]:
# group data by column 
train_explode = Data(
    train_filtered.categorical.flatMap(lambda x: header_key(x, train_headers.categorical)).repartition(100),
    train_filtered.date.flatMap(lambda x: header_key(x, train_headers.date)).repartition(100),
    train_filtered.numeric.flatMap(lambda x: header_key(x, train_headers.numeric)).map(convert_numeric).repartition(100),
    train_filtered.outcome
)
test_explode = Data(
    test_filtered.categorical.flatMap(lambda x: header_key(x, train_headers.categorical)).repartition(100),
    test_filtered.date.flatMap(lambda x: header_key(x, train_headers.date)).repartition(100),
    test_filtered.numeric.flatMap(lambda x: header_key(x, train_headers.numeric)).map(convert_numeric).repartition(100),
    None #unknown
)

In [22]:
# create key-val RDDs
train_rdd = Data(
    train_filtered.categorical.repartition(100),
    train_filtered.date.repartition(100),
    train_filtered.numeric.map(subtract_outcome).repartition(100),
    train_filtered.outcome.repartition(100)
)
test_rdd = Data(
    test_filtered.categorical.repartition(100),
    test_filtered.date.repartition(100),
    test_filtered.numeric.repartition(100),
    None #unknown
)

In [23]:
# explore categorical columns
def groupby_col(x):
    k, v= x
    colval = v[0][1]
    return ((k,colval),1)
col_counts = train_explode.categorical.filter(lambda (k,x): len(x[0][1])>0).\
                          map(groupby_col).reduceByKey(lambda x,y: x+y).\
                          collect()
col_counts.sort(key=lambda x: x[0][1])
total_count = train_explode.categorical.count()

In [24]:
col_counts[:10]

[(('L3_S32_F3854', u'T-2147481664'), 1),
 (('L3_S32_F3854', u'T-21474819'), 1),
 (('L3_S32_F3854', u'T-2147482432'), 17),
 (('L3_S32_F3854', u'T-2147482688'), 23),
 (('L3_S32_F3854', u'T-2147482816'), 33),
 (('L1_S24_F1524', u'T-2147483648'), 1),
 (('L3_S42_F4039', u'T-2147483648'), 1),
 (('L3_S42_F4031', u'T-2147483648'), 1),
 (('L3_S42_F4035', u'T-2147483648'), 1),
 (('L1_S24_F1543', u'T-2147483648'), 1)]

In [25]:
total_count

213997860

In [26]:
# explore numeric columns
def groupby_col(x):
    k, v= x
    colval = v[0][1]
    return (k, colval)
col_sums = train_explode.numeric.map(groupby_col).reduceByKey(lambda x,y: x+y).collect()
col_totals = train_explode.numeric.map(lambda (k,v): (k,1)).reduceByKey(lambda x,y: x+y).collect()
col_sums.sort(); col_totals.sort()
col_means = [ (x[0], x[1]/y[1]) for x,y in zip(col_sums, col_totals) if x[0]==y[0]]
col_means.sort()

In [27]:
col_means[:10]

[('L0_S0_F0', -0.0010608006080060795),
 ('L0_S0_F10', 0.0022263022630226296),
 ('L0_S0_F12', 0.000194271942719427),
 ('L0_S0_F14', 0.0012243322433224321),
 ('L0_S0_F16', -0.0008491784917849178),
 ('L0_S0_F18', -0.0005524055240552416),
 ('L0_S0_F2', -0.0014821548215482154),
 ('L0_S0_F20', 0.0024378643786437853),
 ('L0_S0_F22', 0.0024317243172431715),
 ('L0_S0_F4', -5.7000570005692755e-05)]

In [28]:
train_explode.numeric.filter(lambda x: x[0]=='Response').first()

('Response', [(125, 0.0)])

In [43]:
# calculate baseline model
sum_response = train_explode.numeric.filter(lambda x: x[0]=='Response').\
                                     map(lambda x: x[1][0][1]).\
                                     reduce(lambda x,y: x+y)
count_response = train_explode.numeric.filter(lambda x: x[0]=='Response').count()
baseline = sum_response*1.0/count_response
logloss = train_filtered.outcome.map(lambda x: computeLogLoss(baseline, x[1])).sum() / train_filtered.outcome.count()
accuracy = 1-baseline

In [44]:
print 'Baseline log-loss: {}'.format(logloss)
print 'Baseline accuracy: {}'.format(accuracy)

Baseline log-loss: 0.0350342985348
Baseline accuracy: 0.994319943199


In [58]:
from pyspark.mllib.classification import LogisticRegressionWithSGD
def convertPointNumeric(x):
    k, v = x
    v1, response = v
    features = []
    for v_i in v1:
        if len(v_i)>0:
            features.append(float(v_i))
        else:
            features.append(0)
    return LabeledPoint(response, features)
    
data = train_rdd.numeric.join(train_rdd.outcome).map(convertPointNumeric)
training, val = data.randomSplit([0.6, 0.4])
train_params = {
    'iterations':300, 
    'step':10.0, 
    'miniBatchFraction':1.0, 
    'initialWeights':None, 
    'regParam':0.001, 
    'regType':'l2', 
    'intercept':True, 
    'validateData':False, 
    'convergenceTol':0.001
}
model1 = LogisticRegressionWithSGD().train(training, **train_params)
predictions = val.map(lambda x: (getP(x.features, model1.weights, model1.intercept), x.label))
model1_logloss = predictions.map(lambda x: computeLogLoss(x[0], x[1])).sum() / predictions.count()
model1_accuracy = predictions.map(lambda x: int(x[0]>0.5)==int(x[1])).sum()*1.0 / predictions.count()

In [59]:
print 'Model1 log-loss: {}'.format(model1_logloss)
print 'Model1 accuracy: {}'.format(model1_accuracy)

Model1 log-loss: 0.034410185753
Model1 accuracy: 0.994434931507
