In [1]:
!hdfs dfs -put ../data/* /user/bosche

2018-11-30 20:20:46,732 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
!hdfs dfs -ls /user/bosche

2018-11-30 20:20:49,740 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 2 items
-rw-r--r--   1 ryanbusby supergroup        302 2018-11-30 20:20 /user/bosche/toyTest.csv
-rw-r--r--   1 ryanbusby supergroup        343 2018-11-30 20:20 /user/bosche/toyTrain.csv


In [3]:
import pyspark as ps
from tqdm import tqdm
from pyspark.sql.functions import when
from datetime import datetime

def myMungeNoLabel(path, spark):
    df = spark.read.csv(path, header=True, inferSchema=True)

    print('counting observations'.upper())
    num_obs = df.rdd.map(lambda x:(x[0], counts(x, label=False)))\
    .toDF(['ii','Counts'])

    print('labeling outliers'.upper())
    df = multi_nameOuts(df, 1.5, label=False)(df)

    print('summing outliers'.upper())
    df = df.rdd.map(lambda x:(x[0],sum(x[1:]))).toDF(['Id','Outliers'])

    print('joining'.upper())
    df = df.join(num_obs, df.Id == num_obs.ii, 'left')\
    .select('Id','Outliers', 'Counts')

    cols = ['Id','Outliers', 'Obs', 'Outs']
    df = df.rdd.map(lambda x:(x[0],x[1],x[2][0],x[2][1])).toDF(cols)

    return df

def myMunge(path, spark):
    df = spark.read.csv(path, header=True, inferSchema=True)

    print('counting observations'.upper())
    num_obs = df.rdd.map(lambda x:(x[0], counts(x), x[-1]))\
    .toDF(['ii','Counts', 'Response'])

    print('labeling outliers'.upper())
    df = multi_nameOuts(df,1.5)(df)

    print('summing outliers'.upper())
    df = df.rdd.map(lambda x:(x[0],sum(x[1:-1]))).toDF(['Id','Outliers'])

    print('joining'.upper())
    df = df.join(num_obs, df.Id == num_obs.ii, 'left')\
    .select('Outliers', 'Counts', 'Response')

    cols = ['Outliers', 'Obs', 'Outs', 'Response']
    df = df.rdd.map(lambda x:(x[0],x[1][0],x[1][1],x[2])).toDF(cols)

    print('balancing classes'.upper())
    df = balance_classes(df)

    return df

def counts(x, label=True):
    obs, outs = 0, 0
    end = -1
    if not label:
        end = None
    for xx in x[1:end]:
        if xx:
            obs += 1
            if xx > .25 or xx < -.25:
                outs += 1
    return obs, outs

def nameOuts(df, col_name, iqrx):
    quants = df.approxQuantile([col_name],[.25,.75],.5)
    q1, q3 = quants[0][0], quants[0][1]
    iqr = q3 - q1
    lb = q1 - iqrx * iqr
    ub = q3 + iqrx * iqr
    return when((df[col_name]<lb) | (df[col_name]>ub),1).otherwise(0)

def multi_nameOuts(df, iqrx, label=True):
    # USE approxQuantile() TO CALCULATE THE IQR PER COLUMN AND LABEL OUTS
    end = -1
    if not label:
        end = None
    def inner(dataframe):
        for col_name in tqdm(df.columns[1:end]):
            dataframe = dataframe.withColumn(col_name,\
                               nameOuts(df, col_name, iqrx))
        return dataframe
    return inner

def balance_classes(df):
    # OVERSAMPLING SPECIFICALLY TO ADDRESS CLASS IMBALANCE OF BOSCHE DATA
    '''
    fraction argument in .sample() misbehaves
    if it didn't should be able to return without while loop
    '''
    c0 = df.filter(df.Response==0).count()
    c1 = df.filter(df.Response==1).count()
    diff = float(abs(c0 - c1))
    lrgrClss = max(c0, c1)
    smlrClss = min(c0, c1)
    if smlrClss == 0:
        smlrClss = 1
    x = diff / smlrClss
    f_label = 0
    if c0 > c1:
        f_label = 1
    if x < .25:
        return df
    else:
        while smlrClss+df.filter(df.Response==f_label)\
        .sample(True, x, 42).count() < .9*lrgrClss:
            x += x/2
    return df.union(df.filter(df.Response==f_label).sample(True,x,42))

def save_munged(X, file_name):
    dt = datetime.now().time()
    munged_file_name = str(dt).replace(':', '_') + '_' + file_name
    munged_path = root % munged_file_name
    print('saving data >>> '.upper() + munged_path)
    X.write.csv(munged_path, header=True)
    return munged_path

In [4]:
sparkContext = ps.SparkContext(master='spark://ryans-macbook:7077')
spark = ps.sql.SparkSession(sparkContext)

In [5]:
root = 'hdfs://ryans-macbook:9000/user/bosche/%s'
train_file_name = 'toyTrain.csv'
train_path = root % train_file_name
X = myMunge(train_path, spark)
munged_train_path = save_munged(X, train_file_name)

COUNTING OBSERVATIONS


  0%|          | 0/3 [00:00<?, ?it/s]

LABELING OUTLIERS


100%|██████████| 3/3 [00:00<00:00,  4.36it/s]


SUMMING OUTLIERS
JOINING
BALANCING CLASSES
SAVING DATA >>> hdfs://ryans-macbook:9000/user/bosche/20_21_10.434825_toyTrain.csv


In [6]:
X.show()

+--------+---+----+--------+
|Outliers|Obs|Outs|Response|
+--------+---+----+--------+
|       0|  3|   0|       0|
|       0|  3|   0|       0|
|       0|  1|   0|       1|
|       0|  2|   0|       0|
|       0|  2|   0|       0|
|       0|  1|   0|       0|
|       0|  2|   0|       0|
|       0|  1|   1|       0|
|       0|  1|   0|       0|
|       0|  2|   0|       1|
|       0|  2|   0|       0|
|       0|  3|   1|       0|
|       0|  2|   1|       0|
|       0|  1|   0|       1|
|       0|  1|   0|       0|
|       0|  1|   1|       0|
|       0|  1|   0|       1|
|       0|  3|   1|       0|
|       0|  1|   0|       1|
|       0|  2|   0|       1|
+--------+---+----+--------+
only showing top 20 rows



In [7]:
test_file_name = 'toyTest.csv'
test_path = root % test_file_name
X = myMungeNoLabel(test_path, spark)
munged_test_path = save_munged(X, test_file_name)

  0%|          | 0/3 [00:00<?, ?it/s]

COUNTING OBSERVATIONS
LABELING OUTLIERS


100%|██████████| 3/3 [00:00<00:00,  9.03it/s]


SUMMING OUTLIERS
JOINING
SAVING DATA >>> hdfs://ryans-macbook:9000/user/bosche/20_21_16.546415_toyTest.csv


In [8]:
X.show(49)

+---+--------+---+----+
| Id|Outliers|Obs|Outs|
+---+--------+---+----+
|  0|       0|  3|   0|
|  7|       0|  3|   0|
|  6|       0|  1|   0|
|  9|       0|  2|   0|
| 17|       0|  2|   0|
|  5|       0|  1|   0|
|  1|       0|  2|   0|
| 10|       0|  1|   1|
|  3|       0|  1|   0|
| 12|       0|  2|   0|
|  8|       0|  2|   0|
| 11|       0|  3|   1|
|  2|       0|  2|   1|
|  4|       0|  1|   0|
| 13|       0|  1|   0|
| 14|       0|  1|   1|
| 15|       0|  1|   0|
| 16|       0|  3|   1|
+---+--------+---+----+



In [9]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

def run(spark, root, train_path, test_path):
    model, X_test = trainModel(spark, train_path)
    validate_save_model(model, X_test)
    make_save_preds(model, test_path)

def load_data(path, persisted=True, test=False):
    if not persisted:
        if test:
            df = myMunge(path, spark, labeled=False)
            return df
        df = myMunge(path, spark)
        return df
    else:
        df = spark.read.csv(path, header=True, inferSchema=True)
        return df

def vectorize(df, test=False):
    numericCols = ['Obs', 'Outs', 'Outliers']
    assembler = VectorAssembler(inputCols=numericCols,\
                                outputCol='features')

    stages = [assembler]
    pipeline = Pipeline(stages=stages)
    pipelineModel = pipeline.fit(df)
    cols = ['features', 'Response']
    if test:
        cols = ['Id','features']
    df = pipelineModel.transform(df)
    return df

def trainModel(spark, train_path):
    train_df = load_data(train_path)
    train_df = vectorize(train_df)

    X_train, X_test = train_df.randomSplit([.8, .2], 42)

    # utilize pyspark.ml.tuning here to gridsearch and tune the model
    lr = LogisticRegression(featuresCol='features',\
                            labelCol='Response',\
                            maxIter=2,\
                            regParam=.3,\
                            elasticNetParam=.8)

    lrModel = lr.fit(X_train)
    return lrModel, X_test

def validate_save_model(model, X_test):
    bce = BinaryClassificationEvaluator(labelCol='Response')
    train_preds = model.transform(X_test)
    score = bce.evaluate(train_preds)
    print('The Model got a %s of %s' % (bce.getMetricName(), score))
    dt = datetime.now().time()
    date_name = str(dt).replace(':', '_')
    model.save('../models/%s_LR' % date_name)

def make_save_preds(model, test_path):
    test_df = load_data(test_path)
    test_df = vectorize(test_df, test=True)
    preds = model.transform(test_df).select('Id', 'prediction')
    dt = datetime.now().time()
    date_name = str(dt).replace(':', '_')
    preds.write.csv('%s' % root % date_name + '_PREDS.csv', header=True)

In [10]:
run(spark, root, munged_train_path, munged_test_path)

The Model got a areaUnderROC of 0.5


In [11]:
!hdfs dfs -ls /user/bosche

2018-11-30 20:21:42,819 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 5 items
drwxr-xr-x   - ryanbusby supergroup          0 2018-11-30 20:21 /user/bosche/20_21_10.434825_toyTrain.csv
drwxr-xr-x   - ryanbusby supergroup          0 2018-11-30 20:21 /user/bosche/20_21_16.546415_toyTest.csv
drwxr-xr-x   - ryanbusby supergroup          0 2018-11-30 20:21 /user/bosche/20_21_28.625402_PREDS.csv
-rw-r--r--   1 ryanbusby supergroup        302 2018-11-30 20:20 /user/bosche/toyTest.csv
-rw-r--r--   1 ryanbusby supergroup        343 2018-11-30 20:20 /user/bosche/toyTrain.csv
