In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import DoubleType

import numpy as np
import pandas as pd

## Amazon Configuration
Here we set up Amazon keys and mount S3. You should only need to run this cell once.

In [2]:
ACCESS_KEY = "AKIAITZKMRPEM3K3AHQQ"
SECRET_KEY = "wsO6AO2sn3bru8lwgwqa5spv0u2C3mdvRg4gTixL"
ENCODED_SECRET_KEY = SECRET_KEY.replace("/", "%2F")
AWS_BUCKET_NAME = "daveandhuey"
MOUNT_NAME = "daveandhuey"

dbutils.fs.mount("s3a://%s:%s@%s" % (ACCESS_KEY, ENCODED_SECRET_KEY, AWS_BUCKET_NAME), "/mnt/%s" % MOUNT_NAME)

NameError: name 'dbutils' is not defined

Use Boto to grab the sample submission file from S3. For whatever reason, Pandas doesn't handle the mounted paths correctly, so we download the .csv as a string and then pass it into Pandas.

In [7]:
from boto.s3.connection import S3Connection
from boto.s3.connection import Key

# Open a connection to S3.
c = S3Connection(ACCESS_KEY, SECRET_KEY)

bucket = c.get_bucket(AWS_BUCKET_NAME)
k = Key(bucket)
k.key = 'sample_submission.csv'

import sys
if sys.version_info[0] < 3:
  from StringIO import StringIO
else:
  from io import StringIO

sub = pd.read_csv(StringIO(k.get_contents_as_string()), sep=',')

## Training and Predicting
Here we load the numeric and test data, train a GBT classifier, and then make predictions.

In [7]:
train_num = spark.read.csv("/mnt/%s/train_numeric.csv" % MOUNT_NAME, header="true", inferSchema="true")
train_date = spark.read.csv("/mnt/%s/train_date.csv" % MOUNT_NAME, header="true", inferSchema="true")

# TODO(hkwik): Figure out when this is necessary. Code I've seen using XGBoost doesn't impute NaN.
train_num = train_num.na.fill(0.0)  
train_date = train_date.na.fill(0.0)

train = train_num.join(train_date, train_num.Id == train_date.Id)

ignore = ['Id', 'Response']
lista=[x for x in train.columns if x not in ignore]

assembler = VectorAssembler(inputCols=lista, outputCol='features')

train = (assembler.transform(train).select('Response', 'features'))

## Split the data into training and test sets (30% held out for testing)
(trainingData, validData) = train.randomSplit([0.7, 0.3], seed=24)
trainingData.cache()
validData.cache()

In [8]:
# Train a GBT model.
# maxIter = 1 for speed of testing.
gbt = GBTClassifier(labelCol="Response", featuresCol="features", maxIter=1, seed=24)

# Chain indexers and GBT in a Pipeline
pipeline = Pipeline(stages=[gbt])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData) # Make predictions.

In [9]:
# Test against validation set
predictions = model.transform(validData)
predsGBT = predictions.select("prediction").rdd.map(lambda r: r[0]).collect()
preds = np.asarray(predsGBT).astype(int)

from sklearn.metrics import matthews_corrcoef
valid_responses = validData.select('response').rdd.map(lambda r: r[0]).collect()
print('MCC: %s ' % matthews_corrcoef(valid_responses, preds))

In [10]:
# Make predictions and submission frame
test_num = spark.read.csv("/mnt/%s/test_numeric.csv" % MOUNT_NAME, header="true", inferSchema="true")
test_date = spark.read.csv("/mnt/%s/test_date.csv" % MOUNT_NAME, header="true", inferSchema="true")

test_join = test_num.join(test_date, "Id")
# L3_S46_D4135 looks like a bunch of empty strings, so it has a StringType.
test_join = test_join.withColumn('L3_S46_D4135', test_join['L3_S46_D4135'].cast(DoubleType()))
test_join = test_join.na.fill(0.0)

test = (assembler.transform(test_join).select("features"))

predictions_test = model.transform(test)

predsGBT = predictions_test.select("prediction").rdd.map(lambda r: r[0]).collect() 

sub['Response'] = np.asarray(predsGBT).astype(int)

## Writing Submission 
Write the submission Pandas DataFrame to our S3 bucket

In [12]:
sub_str = sub.to_csv(None, index=False)
k = Key(bucket)
k.key = 'bosch-submission.csv'
k.set_contents_from_string(sub_str)

In [13]:
# If we want to use GridSearchCV
from spark_sklearn import GridSearchCV