In [1]:
import numpy as np
import pandas as pd
import collections # For frequency counting
import findspark
findspark.init("/home/ubuntu/spark2")

import pyspark
from pyspark.sql import DataFrameNaFunctions
from pyspark.sql.functions import lit # Create columns of *literal* value
from pyspark.sql.functions import col # Returns a Column based on the 
                                      # given column name
from pyspark.ml.feature import StringIndexer #label encoding
from pyspark.ml import Pipeline

sc = pyspark.SparkContext(appName="helloworld")

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [3]:
trainDF = spark.read.csv("/home/ubuntu/Allstate-train/train.csv", header="true")
testDF = spark.read.csv("/home/ubuntu/Allstate-train/test.csv", header="true")

# Combine train and test

In [4]:
## Add Survived column to test, and dataset name as a column
trainDF = trainDF.withColumn('mark', lit('train'))
testDF = (testDF.withColumn('loss',lit(0))
                .withColumn('mark', lit('test')))
testDF = testDF[trainDF.columns]

## Append Test data to Train data
df = trainDF.unionAll(testDF)

In [5]:
df.createOrReplaceTempView("train")

In [6]:
# Let's define function
def to_anytype(df, colnames, typename):
    for colname in colnames:
        df = df.withColumn("tmp", df[colname].cast(typename)) \
        .drop(colname) \
        .withColumnRenamed("tmp", colname)
    return(df)

In [7]:
floatCols = ['cont1', 'cont2', 'cont3', 'cont4', 'cont5', 'cont6', 'cont7', 'cont8', 'cont9', 'cont10', 'cont11', 'cont12', 'cont13', 'cont14', 'loss']
df = to_anytype(df, floatCols, "float")

In [None]:
df.show(1)

# Categorical variable treatment

In [8]:
catVars = ["cat1", "cat2", "cat3", "cat4", "cat5", "cat6", "cat7", "cat8", "cat9", "cat10", "cat11", "cat12", "cat13", "cat14", "cat15", "cat16", "cat17", "cat18", "cat19", "cat20", "cat21", "cat22", "cat23", "cat24", "cat25", "cat26", "cat27", "cat28", "cat29", "cat30", "cat31", "cat32", "cat33", "cat34", "cat35", "cat36", "cat37", "cat38", "cat39", "cat40", "cat41", "cat42", "cat43", "cat44", "cat45", "cat46", "cat47", "cat48", "cat49", "cat50", "cat51", "cat52", "cat53", "cat54", "cat55", "cat56", "cat57", "cat58", "cat59", "cat60", "cat61", "cat62", "cat63", "cat64", "cat65", "cat66", "cat67", "cat68", "cat69", "cat70", "cat71", "cat72", "cat73", "cat74", "cat75", "cat76", "cat77", "cat78", "cat79", "cat80", "cat81", "cat82", "cat83", "cat84", "cat85", "cat86", "cat87", "cat88", "cat89", "cat90", "cat91", "cat92", "cat93", "cat94", "cat95", "cat96", "cat97", "cat98", "cat99", "cat100", "cat101", "cat102", "cat103", "cat104", "cat105", "cat106", "cat107", "cat108", "cat109", "cat110", "cat111", "cat112", "cat113", "cat114", "cat115", "cat116"]
 
# make use of pipeline to index all categorical variables
def indexer(df, col):
    si = StringIndexer(inputCol = col, outputCol = col+'_indexed').fit(df)
    return si
 
indexers = [indexer(df, col) for col in catVars]

In [9]:
numVars = ["cont1", "cont2", "cont3", "cont4", "cont5", "cont6", "cont7", "cont8", "cont9", "cont10", "cont11", "cont12", "cont13", "cont14"]

In [67]:
pipeline = Pipeline(stages = indexers)
df_indexed = pipeline.fit(df).transform(df)
 
df_indexed.select('id').show(3)

+---+
| id|
+---+
|  1|
|  2|
|  5|
+---+
only showing top 3 rows



# Data format translation

In [11]:
from pyspark.sql import Row
from pyspark.ml.linalg import DenseVector

In [68]:
catVarsIndexed = [i + '_indexed' for i in catVars]
catVarsIndexed

['cat1_indexed',
 'cat2_indexed',
 'cat3_indexed',
 'cat4_indexed',
 'cat5_indexed',
 'cat6_indexed',
 'cat7_indexed',
 'cat8_indexed',
 'cat9_indexed',
 'cat10_indexed',
 'cat11_indexed',
 'cat12_indexed',
 'cat13_indexed',
 'cat14_indexed',
 'cat15_indexed',
 'cat16_indexed',
 'cat17_indexed',
 'cat18_indexed',
 'cat19_indexed',
 'cat20_indexed',
 'cat21_indexed',
 'cat22_indexed',
 'cat23_indexed',
 'cat24_indexed',
 'cat25_indexed',
 'cat26_indexed',
 'cat27_indexed',
 'cat28_indexed',
 'cat29_indexed',
 'cat30_indexed',
 'cat31_indexed',
 'cat32_indexed',
 'cat33_indexed',
 'cat34_indexed',
 'cat35_indexed',
 'cat36_indexed',
 'cat37_indexed',
 'cat38_indexed',
 'cat39_indexed',
 'cat40_indexed',
 'cat41_indexed',
 'cat42_indexed',
 'cat43_indexed',
 'cat44_indexed',
 'cat45_indexed',
 'cat46_indexed',
 'cat47_indexed',
 'cat48_indexed',
 'cat49_indexed',
 'cat50_indexed',
 'cat51_indexed',
 'cat52_indexed',
 'cat53_indexed',
 'cat54_indexed',
 'cat55_indexed',
 'cat56_indexed',
 

In [43]:
catVarsIndexed = ['cat18_indexed', 'cat67_indexed', 'cat17_indexed','cat51_indexed','cat61_indexed','cat21_indexed','cat14_indexed','cat33_indexed','cat59_indexed','cat56_indexed','cat22_indexed','cat48_indexed','cat63_indexed','cat60_indexed','cat55_indexed','cat46_indexed','cat68_indexed','cat58_indexed','cat35_indexed','cat34_indexed','cat69_indexed','cat70_indexed','cat47_indexed','cat64_indexed']

['cont1',
 'cont2',
 'cont3',
 'cont4',
 'cont5',
 'cont6',
 'cont7',
 'cont8',
 'cont9',
 'cont10',
 'cont11',
 'cont12',
 'cont13',
 'cont14',
 'cat18_indexed',
 'cat67_indexed',
 'cat17_indexed',
 'cat51_indexed',
 'cat61_indexed',
 'cat21_indexed',
 'cat14_indexed',
 'cat33_indexed',
 'cat59_indexed',
 'cat56_indexed',
 'cat22_indexed',
 'cat48_indexed',
 'cat63_indexed',
 'cat60_indexed',
 'cat55_indexed',
 'cat46_indexed',
 'cat68_indexed',
 'cat58_indexed',
 'cat35_indexed',
 'cat34_indexed',
 'cat69_indexed',
 'cat70_indexed',
 'cat47_indexed',
 'cat64_indexed']

In [69]:
featuresCol = numVars + catVarsIndexed
featuresCol

['cont1',
 'cont2',
 'cont3',
 'cont4',
 'cont5',
 'cont6',
 'cont7',
 'cont8',
 'cont9',
 'cont10',
 'cont11',
 'cont12',
 'cont13',
 'cont14',
 'cat1_indexed',
 'cat2_indexed',
 'cat3_indexed',
 'cat4_indexed',
 'cat5_indexed',
 'cat6_indexed',
 'cat7_indexed',
 'cat8_indexed',
 'cat9_indexed',
 'cat10_indexed',
 'cat11_indexed',
 'cat12_indexed',
 'cat13_indexed',
 'cat14_indexed',
 'cat15_indexed',
 'cat16_indexed',
 'cat17_indexed',
 'cat18_indexed',
 'cat19_indexed',
 'cat20_indexed',
 'cat21_indexed',
 'cat22_indexed',
 'cat23_indexed',
 'cat24_indexed',
 'cat25_indexed',
 'cat26_indexed',
 'cat27_indexed',
 'cat28_indexed',
 'cat29_indexed',
 'cat30_indexed',
 'cat31_indexed',
 'cat32_indexed',
 'cat33_indexed',
 'cat34_indexed',
 'cat35_indexed',
 'cat36_indexed',
 'cat37_indexed',
 'cat38_indexed',
 'cat39_indexed',
 'cat40_indexed',
 'cat41_indexed',
 'cat42_indexed',
 'cat43_indexed',
 'cat44_indexed',
 'cat45_indexed',
 'cat46_indexed',
 'cat47_indexed',
 'cat48_indexed',


In [70]:
labelCol = ['id', 'mark','loss']
labelCol

['id', 'mark', 'loss']

In [71]:
row = Row('id','mark','label','features') 
row

<Row(id, mark, label, features)>

In [72]:
df_indexed = df_indexed[labelCol + featuresCol]
df_indexed.show(1)

+---+-----+-------+------+--------+--------+--------+--------+--------+-------+------+-------+------+--------+--------+--------+--------+------------+------------+------------+------------+------------+------------+------------+------------+------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+----

In [73]:
# 0-mark, 1-label, 2-features
# map features to DenseVector
lf = df_indexed.rdd.map(lambda r: (row(r[0], r[1], r[2], DenseVector(r[3:])))).toDF()
lf.show()

+---+-----+------------------+--------------------+
| id| mark|             label|            features|
+---+-----+------------------+--------------------+
|  1|train| 2213.179931640625|[0.72630000114440...|
|  2|train|1283.5999755859375|[0.33051401376724...|
|  5|train| 3005.090087890625|[0.26184099912643...|
| 10|train| 939.8499755859375|[0.32159399986267...|
| 11|train|  2763.85009765625|[0.27320399880409...|
| 13|train|   5142.8701171875|[0.54667001962661...|
| 14|train| 1132.219970703125|[0.47144699096679...|
| 20|train|           3585.75|[0.82659101486206...|
| 23|train|  10280.2001953125|[0.33051401376724...|
| 24|train|     6184.58984375|[0.72630000114440...|
| 25|train|  6396.85009765625|[0.49606299400329...|
| 33|train|  5965.72998046875|[0.52069801092147...|
| 34|train| 1193.050048828125|[0.32159399986267...|
| 41|train|  1071.77001953125|[0.35135799646377...|
| 47|train| 585.1799926757812|[0.89433300495147...|
| 48|train| 1395.449951171875|[0.47289198637008...|
| 49|train| 

In [74]:
# index label
# convert numeric label to categorical, which is required by
# decisionTree and randomForest
lf = StringIndexer(inputCol = 'label', outputCol='index').fit(lf).transform(lf)
 
lf.show(3)

+---+-----+------------------+--------------------+--------+
| id| mark|             label|            features|   index|
+---+-----+------------------+--------------------+--------+
|  1|train| 2213.179931640625|[0.72630000114440...| 22259.0|
|  2|train|1283.5999755859375|[0.33051401376724...|132074.0|
|  5|train| 3005.090087890625|[0.26184099912643...|106882.0|
+---+-----+------------------+--------------------+--------+
only showing top 3 rows



## Split back into train and test

In [75]:
train = lf.where(lf.mark =='train')
test = lf.where(lf.mark =='test')

In [None]:
train.show(5)

# Modeling

In [60]:
from pyspark.ml.regression import LinearRegression
 
# regPara: lasso regularisation parameter (L1)
lr = LinearRegression(maxIter = 100, regParam = 100, labelCol='index').fit(train)

KeyboardInterrupt: 

In [81]:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(maxIter = 100, labelCol = 'index', subsamplingRate= 0.8, featuresCol= 'features').fit(train)

In [76]:
from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor(featuresCol="features", labelCol = 'index').fit(train)

In [82]:
pred_test = gbt.transform(test)

In [83]:
submission = pred_test.selectExpr("id as id",'prediction as loss')

In [None]:
submission.show(5)

In [84]:
submission.write.csv('submission.csv')