In [47]:
import findspark
findspark.init()

In [48]:
import numpy as np
import pandas as pd
import pyspark
import os
import urllib
import sys

from pyspark.sql.functions import *
from pyspark.ml.classification import *
from pyspark.ml.evaluation import *
from pyspark.ml.feature import *


In [49]:
# start Spark session
spark = pyspark.sql.SparkSession.builder.appName('Otto').getOrCreate()


In [50]:
# print runtime versions
print ('****************')
print ('Python version: {}'.format(sys.version))
print ('Spark version: {}'.format(spark.version))
print ('****************')

****************
Python version: 3.6.4 |Anaconda, Inc.| (default, Jan 16 2018, 10:22:32) [MSC v.1900 64 bit (AMD64)]
Spark version: 2.3.0
****************


In [51]:
# load train.csv into Spark dataframe
data = spark.createDataFrame(pd.read_csv('train.csv'))


In [52]:
# vectorize all numerical columns into a single feature column
feature_cols = data.columns[:-1]
assembler = pyspark.ml.feature.VectorAssembler(inputCols=feature_cols, outputCol='features')
data = assembler.transform(data)
data.head()

Row(feat_1=1, feat_2=0, feat_3=0, feat_4=0, feat_5=0, feat_6=0, feat_7=0, feat_8=0, feat_9=0, feat_10=0, feat_11=1, feat_12=0, feat_13=0, feat_14=0, feat_15=0, feat_16=0, feat_17=2, feat_18=0, feat_19=0, feat_20=0, feat_21=0, feat_22=1, feat_23=0, feat_24=4, feat_25=1, feat_26=1, feat_27=0, feat_28=0, feat_29=2, feat_30=0, feat_31=0, feat_32=0, feat_33=0, feat_34=0, feat_35=1, feat_36=0, feat_37=0, feat_38=0, feat_39=0, feat_40=1, feat_41=0, feat_42=5, feat_43=0, feat_44=0, feat_45=0, feat_46=0, feat_47=0, feat_48=2, feat_49=0, feat_50=0, feat_51=0, feat_52=0, feat_53=0, feat_54=1, feat_55=0, feat_56=0, feat_57=2, feat_58=0, feat_59=0, feat_60=11, feat_61=0, feat_62=1, feat_63=1, feat_64=0, feat_65=1, feat_66=0, feat_67=7, feat_68=0, feat_69=0, feat_70=0, feat_71=1, feat_72=0, feat_73=0, feat_74=0, feat_75=0, feat_76=0, feat_77=0, feat_78=0, feat_79=2, feat_80=1, feat_81=0, feat_82=0, feat_83=0, feat_84=0, feat_85=1, feat_86=0, feat_87=0, feat_88=0, feat_89=0, feat_90=0, feat_91=0, fea

In [53]:
# convert text labels into indices
data = data.select(['features', 'target'])
label_indexer = pyspark.ml.feature.StringIndexer(inputCol='target', outputCol='label').fit(data)
data = label_indexer.transform(data)

In [54]:
# only select the features and label column
data = data.select(['features', 'label'])
print("Reading for machine learning")
data.show(10)

Reading for machine learning
+--------------------+-----+
|            features|label|
+--------------------+-----+
|(93,[0,10,16,21,2...|  8.0|
|(93,[7,17,36,57,6...|  8.0|
|(93,[7,16,32,47,5...|  8.0|
|(93,[0,3,4,5,6,9,...|  8.0|
|(93,[16,23,40,49,...|  8.0|
|(93,[0,1,4,12,16,...|  8.0|
|(93,[0,7,9,21,24,...|  8.0|
|(93,[16,24,41,48,...|  8.0|
|(93,[7,11,12,16,1...|  8.0|
|(93,[6,16,19,23,2...|  8.0|
+--------------------+-----+
only showing top 10 rows



In [55]:
# change regularization rate and you will likely get a different accuracy.
reg = 0.01


In [56]:
# use Logistic Regression to train on the training set
train, test = data.randomSplit([0.90, 0.10])
lr = pyspark.ml.classification.LogisticRegression(regParam=reg)
model = lr.fit(data)

In [57]:
test.show(10)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(93,[0,1,2,3,4,5,...|  8.0|
|(93,[0,1,2,3,4,6,...|  8.0|
|(93,[0,1,2,4,5,6,...|  8.0|
|(93,[0,1,2,6,8,16...|  0.0|
|(93,[0,1,2,7,10,1...|  0.0|
|(93,[0,1,2,9,10,1...|  0.0|
|(93,[0,1,6,7,9,12...|  8.0|
|(93,[0,1,6,12,17,...|  8.0|
|(93,[0,1,12,13,23...|  0.0|
|(93,[0,2,3,4,6,9,...|  0.0|
+--------------------+-----+
only showing top 10 rows



In [61]:
testdata = spark.createDataFrame(pd.read_csv('test.csv'))
feature_cols1 = testdata.columns[:]
assembler2 = pyspark.ml.feature.VectorAssembler(inputCols=feature_cols1, outputCol='features')


In [62]:
testdata = assembler2.transform(testdata)


In [63]:
testdata = testdata.select(['features'])
testdata.show(10)

+--------------------+
|            features|
+--------------------+
|(93,[9,13,14,15,2...|
|(93,[0,1,2,3,14,1...|
|(93,[1,2,3,10,11,...|
|(93,[3,14,15,24,2...|
|(93,[0,3,6,7,9,23...|
|(93,[8,23,24,31,4...|
|(93,[9,13,15,17,1...|
|(93,[0,13,17,23,2...|
|(93,[4,13,23,24,3...|
|(93,[8,16,23,63,6...|
+--------------------+
only showing top 10 rows



In [64]:
# predict on the test set
prediction = model.transform(testdata)
print("Prediction")
prediction.show(10)

Prediction
+--------------------+--------------------+--------------------+----------+
|            features|       rawPrediction|         probability|prediction|
+--------------------+--------------------+--------------------+----------+
|(93,[9,13,14,15,2...|[4.23546802111468...|[0.26974766452314...|       3.0|
|(93,[0,1,2,3,14,1...|[0.44813591377910...|[0.01537548634700...|       1.0|
|(93,[1,2,3,10,11,...|[-1.2371349239515...|[1.31694338800390...|       1.0|
|(93,[3,14,15,24,2...|[5.00641281822240...|[0.69927517997399...|       0.0|
|(93,[0,3,6,7,9,23...|[-0.7249191292608...|[0.01665906578707...|       2.0|
|(93,[8,23,24,31,4...|[4.42472615054001...|[0.66174542620531...|       0.0|
|(93,[9,13,15,17,1...|[-0.4371875302354...|[0.00737054092038...|       2.0|
|(93,[0,13,17,23,2...|[3.33615392328478...|[0.71528484899770...|       0.0|
|(93,[4,13,23,24,3...|[2.58940292779638...|[0.51675998185098...|       0.0|
|(93,[8,16,23,63,6...|[1.41047028988930...|[0.24809920936525...|       2.0|
+

In [66]:
output = prediction.toPandas().to_csv("output.csv")