# Final Project - Pre-Processing

In [None]:
import re
import ast
import time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import pyspark.sql.functions as F
from pyspark.sql import types, Row, Column
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline

In [None]:
# start Spark Session
from pyspark.sql import SparkSession
app_name = "finalProject"
master = "local[*]"
spark = SparkSession\
        .builder\
        .appName(app_name)\
        .master(master)\
        .getOrCreate()
sc = spark.sparkContext

In [None]:
# read in raw text set and write to parquet
#train = spark.read.option('header', 'false').csv('gs:/notebooks/train.txt', sep='\t')
#train.write.format('parquet').save('data/train.parquet')

In [None]:
MODULO_NUMBER = 100000

In [None]:
# read in parqet
train = spark.read.parquet('data/train.parquet')

In [None]:
# rename label column
train = train.withColumnRenamed('_c0', 'label')

# remove underscores
for c in train.columns[1:]:
    train = train.withColumnRenamed(c, c.strip('_'))

for i,c in enumerate(train.columns[1:14]):
    newName = 'n' + str(i)
    train = train.withColumnRenamed(c, newName)
    
for i,c in enumerate(train.columns[14:]):
    newName = 'c' + str(i)
    train = train.withColumnRenamed(c, newName)

train.columns

In [None]:
# cast numerical is float
for c in train.columns[:14]:
    train = train.withColumn(c, train[c].cast('float'))
train.printSchema()

In [None]:
# grab a sample
s = train.sample(False, 0.001)
#s = train
s.count()

In [None]:
s.head()

# Train/Test Split

In [None]:
# on sample
trainSample, testSample = s.randomSplit([9.0, 1.0], 666)
trainSample = trainSample.cache()
testSample = testSample.cache()

In [None]:
trainSample.count(), testSample.count()

In [None]:
# on full
train, test = train.randomSplit([9.0, 1.0], 666)
train = train.cache()
test = test.cache()

In [None]:
%%time
train.count(), test.count()

# Normalize numerical data

In [None]:
def normalizeNumeric(trainDf, testDf):
    stats = trainDf[trainDf.columns[1:14]].describe()
    maxes = np.array(stats[stats['summary'] == 'max'].collect())[0][1:]
    mins = np.array(stats[stats['summary'] == 'min'].collect())[0][1:]
    maxes = [float(m) for m in maxes]
    mins = [float(m) for m in mins]
    
    for i,c in enumerate(trainDf.columns[1:14]):
        trainDf = trainDf.withColumn(c, (trainDf[c] - mins[i]) / (maxes[i] - mins[i]))
        testDf = testDf.withColumn(c, (testDf[c] - mins[i]) / (maxes[i] - mins[i]))
        
    # NEED TO FIGURE THIS OUT FIRST
    trainDf = trainDf.na.fill(0, subset=trainDf.columns[1:14])
    testDf = testDf.na.fill(0, subset=testDf.columns[1:14])
    
    return trainDf.cache(), testDf.cache()

## on sample

In [None]:
# on sample
trainSample, testSample = normalizeNumeric(trainSample, testSample)

In [None]:
trainSample.head()

In [None]:
testSample.head()

## on full

In [None]:
#%%time
train, test = normalizeNumeric(train, test)

In [None]:
train.head()

In [None]:
test.head()

# Categorical manipulation

In [None]:
def createFeatureVector2(trainDf, testDf):
    
    
    def findInfrequentValues(c, n=10):
        # c is the column that we are operating on
        counts = trainDf.groupBy(c).count()
        infrequentValues = counts.filter(counts['count'] <= n)
        s = infrequentValues.agg(F.collect_set(c)).collect()[0][0]
        return s
    
    def replaceInfrequentValues(row_value):
        if row_value in infreq_values:
            return "infreq"
        else:
            return str(row_value)
        
    #def replaceInfreqWrapper(infreq_list):
    #    return F.udf(lambda l: replaceInfrequentValues(l, infreq_list))
    
    #replaceInfreqWrapper=F.udf(lambda x: replaceInfrequentValues(x, infreq_values), types.StringType())
        
    replace_infreq_udf = F.udf(replaceInfrequentValues)

    # create hash function for binning categorical variables
    def hashValues(row):
        if row != None:
            # return integer value of hex label, modulo by 10000 (keep only the last 4 digits)
            return str(int('0x' + row, 16) % 100000)
        else:
            return str(row)
    
    # create the udf object from the helper function
    hash_udf = F.udf(hashValues)
    
    # hash all hex strings in both train and test
    for c in trainDf.columns[14:]:
        infreq_values = findInfrequentValues(trainDf[c])
        trainDf = trainDf.withColumn(c, replace_infreq_udf(trainDf[c]))
        testDf = testDf.withColumn(c, replace_infreq_udf(testDf[c]))
    return trainDf.cache(), testDf.cache()

In [None]:
def createFeatureVector(trainDf, testDf):
    
    
    def findInfrequentValues(c, n=10):
        # c is the column that we are operating on
        counts = trainDf.groupBy(c).count()
        infrequentValues = counts.filter(counts['count'] <= n)
        s = infrequentValues.agg(F.collect_set(c)).collect()[0][0]
        return s
    
    # this value infreq_values is GLOBAL, rather than a param passed into the function
    # i really don't love that, but I can't figure out how to do it otherwise
    def replaceInfrequentValues(row_value):
        if row_value in infreq_values:
            return "infreq"
        else:
            return row_value
        
    #def replaceInfreqWrapper(infreq_list):
    #    return F.udf(lambda l: replaceInfrequentValues(l, infreq_list))
    
    #replaceInfreqWrapper=F.udf(lambda x: replaceInfrequentValues(x, infreq_values), types.StringType())
        
    replace_infreq_udf = F.udf(replaceInfrequentValues)

    # create hash function for binning categorical variables
    def hashValues(row):
        if row == "infreq":
            return str(row)
        elif row != None:
            # return integer value of hex label, modulo by 10000 (keep only the last 4 digits)
            return str(int('0x' + row, 16) % 100000)
        else:
            return str(row)
    
    # create the udf object from the helper function
    hash_udf = F.udf(hashValues)
    
    # hash all hex strings in both train and test
    for c in trainDf.columns[14:]:
        infreq_values = findInfrequentValues(trainDf[c])
        trainDf = trainDf.withColumn(c, replace_infreq_udf(trainDf[c]))
        testDf = testDf.withColumn(c, replace_infreq_udf(testDf[c]))
        unique_values = trainDf.agg(F.countDistinct(trainDf[c]))
        #if unique_values > 100000:
        #    trainDf = trainDf.withColumn(c, hash_udf(trainDf[c]))
        #    testDf = testDf.withColumn(c, hash_udf(testDf[c]))
        
    # index the hash values into categories
    for c in trainDf.columns[14:]:
        newCol = c + '_idx'
        indexer = StringIndexer(inputCol=c, outputCol=newCol, handleInvalid='keep')
        f = indexer.fit(trainDf)
        trainDf = f.transform(trainDf)
        testDf = f.transform(testDf)
        
    # One-hot encode the categorical indices
    inputCols = trainDf.columns[40:]
    outputCols = [c.strip('_idx') + '_OHE' for c in inputCols]
    encoder = OneHotEncoderEstimator(inputCols=inputCols, outputCols=outputCols)
    e = encoder.fit(trainDf)
    trainDf = e.transform(trainDf)
    testDf = e.transform(testDf)
    
    # assemble all features into single SparseVector column
    cols = [c for c in trainDf.columns if 'n' in c or 'OHE' in c]
    v = VectorAssembler(inputCols=cols, outputCol="features")
    trainDf = v.transform(trainDf)
    testDf = v.transform(testDf)
    
    return unique_values.cache(), trainDf.cache(), testDf.cache()

## on sample

In [None]:
# on sample
uv, trainSample, testSample = createFeatureVector(trainSample, testSample)

In [None]:
trainSample.columns[-1], testSample.columns[-1]

In [None]:
trainSample.head()

In [None]:
trainSample.select('features').head()

In [None]:
testSample.select('features').head()

In [None]:
trainSample.write.format('parquet').save('gs:/notebooks/data/fullTrainMod100k.parquet')
testSample.write.format('parquet').save('gs:/notebooks/data/fullTestMod100k.parquet')

## on full

In [None]:
%%time
# on full
out1, out2 = createFeatureVector2(trainSample, testSample)

In [None]:
out1

In [None]:
train, test = createFeatureVector(train, test)

In [None]:
train.select('features').head()

In [None]:
train.columns

In [None]:
test.select('features').head()

In [None]:
train.write.format('parquet').save('gs:/notebooks/data/fullTrainMod100k.parquet')
test.write.format('parquet').save('gs:/notebooks/data/fullTestMod100k.parquet')

# pipeline implementation

In [None]:
def hashCategoricals(trainDf, testDf):
    # create hash function for binning categorical variables
    def hashValues(row):
        if row != None:
            # return integer value of hex label, modulo by 10000 (keep only the last 4 digits)
            return str(int('0x' + row, 16) % 10000)
        else:
            return str(row)
    
    # create the udf object from the helper function
    udf_object = F.udf(hashValues)
    
    # hash all hex strings in both train and test
    for c in trainDf.columns[14:]:
        trainDf = trainDf.withColumn(c, udf_object(trainDf[c]))
        testDf = testDf.withColumn(c, udf_object(testDf[c]))
    
    return trainDf.cache(), testDf.cache()

In [None]:
trainSample, testSample = hashCategoricals(trainSample, testSample)

In [None]:
trainSample.head()

In [None]:
testSample.head()

In [None]:
# pipeline implementation
def createFeatureVector2(trainDf, testDf):
    # generate stages for pipeline
    stages = []
    
    # create indexer to hash values into categories
    for c in trainDf.columns[14:]:
        strIdxCol = c + '_idx'
        oheCol = strIdxCol.strip('_idx') + '_OHE'
        indexer = StringIndexer(inputCol=c, outputCol=strIdxCol, handleInvalid='keep')
        OHE = OneHotEncoderEstimator(inputCols=strIdxCol, outputCols=oheCol, dropLast=False)
        stages += [indexer, OHE]
        
    # One-hot encode the categorical indices
#     inputCols = trainDf.columns[40:]
#     outputCols = [c.strip('_idx') + '_OHE' for c in inputCols]
#     encoder = OneHotEncoderEstimator(inputCols=inputCols, outputCols=outputCols, dropLast=False)
#     stages += [encoder]
#     print(stages)
#     e = encoder.fit(trainDf)
#     trainDf = e.transform(trainDf)
#     testDf = e.transform(testDf)
    
    # assemble all features into single SparseVector column
#     cols = [c for c in trainDf.columns if 'n' in c or 'OHE' in c]
#     v = VectorAssembler(inputCols=cols, outputCol="features")
#     stages += [v]
#     trainDf = v.transform(trainDf)
#     testDf = v.transform(testDf)
    
    pipe = Pipeline(stages=stages)
    model = pipe.fit(trainDf)
    trainDf = model.transform(trainDf)
    testDf = model.transform(testDf)
    
    return trainDf.cache(), testDf.cache()

In [None]:
trainSample.head()

In [None]:
trainer, tester = createFeatureVector2(trainSample, testSample)

In [None]:
trainer.columns[-1], tester.columns[-1]

In [None]:
trainer.head()

In [None]:
tester.head()

In [None]:
trainSample.write.format('parquet').save('data/trainSample.parquet')