In [1]:
# import urllib
# ACCESS_KEY = "..."
# SECRET_KEY = "..."
# ENCODED_SECRET_KEY = urllib.quote(SECRET_KEY, "")
# AWS_BUCKET_NAME = "ruidatabricks"
# MOUNT_NAME = "my-data"
# dbutils.fs.mount("s3n://%s:%s@%s" % (ACCESS_KEY, ENCODED_SECRET_KEY, AWS_BUCKET_NAME), "/mnt/%s" % MOUNT_NAME)

In [2]:
# display(dbutils.fs.ls("/mnt/my-data"))

In [3]:
data = sc.textFile("/mnt/my-data/train.csv")
# data = sc.textFile("/users/aurora/desktop/train.csv")
data.count()

In [4]:
data.cache()
from csv import reader
def mysplit(input):
  temp = [line for line in reader(input)]
  return temp[0]

In [5]:
parts = data.map(lambda l: mysplit([l]))
parts.count()
parts.cache()

In [6]:
from pyspark.sql.types import *
show = list(parts.first())
print len(show)

In [7]:
header = parts.take(1)[0]
rows = parts.filter(lambda line: line != header)
len(rows.first())

In [8]:
from pyspark.sql import SQLContext, Row
schemadata = SQLContext.getOrCreate(sqlContext).createDataFrame(rows, header)
# schemadata = sqlContext.createDataFrame(rows, header)
schemadata.count()
schemadata.cache()

In [9]:
from pyspark.sql.functions import *
numerics = [x-1 for x in [213, 298, 299, 300, 318, 319, 320, 334, 335, 336, 369] ]

characters = [x-1 for x in [2, 6, 9, 10, 11, 12, 13, 44, 45, 197, 201, 203, 208, 214, 215, 217, 218, 222, 226, 229, 230, 232, 236, 237, 239, 273, 282, 304, 324, 341, 351, 352, 353, 403, 465, 466, 492, 839, 1933]]

times = [x-1 for x in [74, 76, 157, 158, 159, 160, 167, 168, 169, 170, 177, 178, 179, 180, 205]]

integers = list(set(xrange(1934)) - set(numerics) - set(characters) - set(times))

In [10]:
# deal with time data
from datetime import datetime
import calendar
months = {v.upper():int(k) for k,v in enumerate(calendar.month_abbr)}

def map_to_datetime(input):
  if input:
    return datetime(int('20'+input[5:7]),months[input[2:5]],int(input[:2]),int(input[8:10]))
  else:
    return datetime(1000,1,1)
map_date = udf(map_to_datetime, DateType())

In [11]:
# convert to integer, numeric, and datetime
def type_change(col_name, index):
  if index in integers:
    return col(col_name).cast('integer').alias(col_name)
  elif index in numerics:
    return col(col_name).cast('float').alias(col_name)
  elif index in times:
    return map_date(col(col_name)).alias(col_name)
  else:
    return col(col_name)

# Build up a list of column expressions, one per column.
#
# This could be done in one line with a Python list comprehension, but we're keeping
# it simple for those who don't know Python very well.
exprs = []
for index, col_name in enumerate(schemadata.columns):
  exprs.append(type_change(col_name, index))
  
clean_df = schemadata.select(*exprs)

In [12]:
clean_df.count()
clean_df.cache()

In [13]:
# convert categorical to numerical
from pyspark.ml.feature import StringIndexer
indexed = clean_df
for index in characters:
  indexer = StringIndexer(inputCol=indexed.columns[index], outputCol=indexed.columns[index]+'i')
  indexed = indexer.fit(indexed).transform(indexed)

In [14]:
indexed2 = indexed.select(*(indexed.columns[i] for i in xrange(1934) if i not in characters and i not in times))

In [15]:
from pyspark.mllib.regression import LabeledPoint
labeled = indexed2.map(lambda x: LabeledPoint(x.target,x[1:-1])).toDF()
stringIndexer = StringIndexer(inputCol="label", outputCol="indexed")
si_model = stringIndexer.fit(labeled)
labeled = si_model.transform(labeled)
labeled = labeled.select('indexed','features')

In [16]:
weights = [.8, .1, .1]
seed = 42
train, val, test = labeled.randomSplit(weights=weights,seed=seed)
train.cache()
val.cache()
test.cache()

In [17]:
from pyspark.ml.classification import *
rf = RandomForestClassifier(numTrees=300, maxDepth=5, labelCol="indexed", featuresCol="features", seed=42)
rf_model = rf.fit(train)

In [18]:
# Make predictions.
train_pred = rf_model.transform(train)
val_pred = rf_model.transform(val)
test_pred = rf_model.transform(test)

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="indexed", predictionCol="prediction", metricName="precision")

accuracy_val = evaluator.evaluate(val_pred)
accuracy_test = evaluator.evaluate(test_pred)
print("Validation Error = %g" % (1.0 - accuracy_val))
print("Test Error = %g" % (1.0 - accuracy_test))