In [1]:
#### This cell is to make spark work on a windows laptop
import os
import sys

# Path for spark source folder
os.environ['SPARK_HOME']="C:\spark-2.0.1-bin-hadoop2.7"

# Append pyspark  to Python Path
sys.path.append("C:\spark-2.0.1-bin-hadoop2.7\python")
sys.path.append("C:\spark-2.0.1-bin-hadoop2.7\python\lib\py4j-0.10.3-src.zip")
#os.environ['SPARK_EXECUTOR_MEMORY']="5G"

try:
    from pyspark import SparkContext
    from pyspark import SparkConf
    print ("Successfully imported Spark Modules")

except ImportError as e:
    print ("Can not import Spark Modules", e)
    sys.exit(1)

# Initialize SparkContext
sc = SparkContext()
words = sc.parallelize(["scala","java","hadoop","spark","akka"])
print (words.count())
print(words.countByValue())

Successfully imported Spark Modules
5
defaultdict(<class 'int'>, {'akka': 1, 'hadoop': 1, 'scala': 1, 'spark': 1, 'java': 1})


In [2]:
import os
import sys
import re
from IPython.display import display
from pyspark import SparkContext
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import types
from pyspark.sql import Row
from pyspark.sql import functions
from pyspark.sql import SparkSession
%matplotlib inline
import matplotlib.pyplot as plt
import matplotlib.mlab as mlab
import pandas as pd
import numpy as np
import pyspark.sql.functions as func
import matplotlib.patches as mpatches
import time as time
from matplotlib.patches import Rectangle
import datetime
import ast
from operator import add
import math
from itertools import combinations
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import NaiveBayes
from pyspark.mllib.regression import LinearRegressionWithSGD
from pyspark.mllib.linalg.distributed import RowMatrix
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml import Pipeline
from pyspark.ml.regression import *
from pyspark.ml.feature import *
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.classification import *
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

log4jLogger = sc._jvm.org.apache.log4j
LOGGER = log4jLogger.LogManager.getLogger(__name__)

# First step: cleaning the data

In [3]:
input_path = "train.csv"
raw_data = sc.textFile(input_path)
print("number of rows before cleaning:", raw_data.count())

# extract the header
header = raw_data.first()

# replace invalid data with NULL and remove header
cleaned_data = raw_data.filter(lambda row: row != header)

print("number of rows after cleaning:", raw_data.count())
print("Number of partitions: " + str(raw_data.getNumPartitions()))

sqlContext = SQLContext(sc)

number of rows before cleaning: 188319
number of rows after cleaning: 188319
Number of partitions: 2


We now need to put our data into a dataframe, so that it's going to be easier to plot graphs and get a better insight into our data.

In [4]:
header

'id;cat1;cat2;cat3;cat4;cat5;cat6;cat7;cat8;cat9;cat10;cat11;cat12;cat13;cat14;cat15;cat16;cat17;cat18;cat19;cat20;cat21;cat22;cat23;cat24;cat25;cat26;cat27;cat28;cat29;cat30;cat31;cat32;cat33;cat34;cat35;cat36;cat37;cat38;cat39;cat40;cat41;cat42;cat43;cat44;cat45;cat46;cat47;cat48;cat49;cat50;cat51;cat52;cat53;cat54;cat55;cat56;cat57;cat58;cat59;cat60;cat61;cat62;cat63;cat64;cat65;cat66;cat67;cat68;cat69;cat70;cat71;cat72;cat73;cat74;cat75;cat76;cat77;cat78;cat79;cat80;cat81;cat82;cat83;cat84;cat85;cat86;cat87;cat88;cat89;cat90;cat91;cat92;cat93;cat94;cat95;cat96;cat97;cat98;cat99;cat100;cat101;cat102;cat103;cat104;cat105;cat106;cat107;cat108;cat109;cat110;cat111;cat112;cat113;cat114;cat115;cat116;cont1;cont2;cont3;cont4;cont5;cont6;cont7;cont8;cont9;cont10;cont11;cont12;cont13;cont14;loss'

Our taget is the loss. The id isn't actually useful and will be removed later. "cat" means categorical feature and "cont" means a continuous feature. we will use that to create the data schema. Note that we'll convert strings in Ints since it's easier to manipulate them later. (In particular for partitioning).

In [5]:
names = header.split(";")

In [6]:
cats = names[1:117]
conts = names[117:-1]

In [7]:
def create_StructField(string):
    hint = string[:3]
    if hint == "cat":
        datatype = types.IntegerType()
    elif hint == "con":
        datatype = types.FloatType()
    elif hint == "id":
        datatype = types.IntegerType()
    elif hint == "los":
        datatype = types.FloatType()
    else:
        raise ValueError("Can\'t read this string:" + hint )

    return types.StructField(string, datatype, False)

In [8]:
structField_list = [create_StructField(string) for string in names]

In [9]:
data_schema = types.StructType(structField_list)

In [10]:
def tryeval(val,column_number):
    if column_number == 0:
        return int(val)
    elif 1 <= column_number <= 116:
        return val
    elif 117 <= column_number <= 131:
        return float(val)
    else:
        raise Exception("There is a big problem")

def to_tuple(string, character = ";"):
    list_of_strings = string.split(character)
    return tuple(tryeval(string, n) for n, string in enumerate(list_of_strings))

cleaned_data_splitted = cleaned_data.map(lambda x: to_tuple(x))

In [11]:
cleaned_data_splitted.count()

188318

Now, we'll try to convert the categorical values in integers as it's way easier to work with later on.

In [12]:
def to_tuples(list_):
    return tuple((string,) for string in list_)

def fusion(x, y):
    return tuple(tuple(set(xi + yi)) for xi, yi in zip(x,y))

list_of_dictionaries = []
a = cleaned_data_splitted.map(lambda x: to_tuples(x[1:117])).reduce(fusion)

In [13]:
sorted_tuples = tuple(tuple(sorted(tup)) for tup in a)

Now we have all the categories in order. We'll put them in a list of dictionaries as it makes it simpler to modify the RDD after that.

In [14]:
for tup in sorted_tuples:
    my_dict = dict()
    for idx, cat in enumerate(tup):
        my_dict[cat] = idx
    list_of_dictionaries.append(my_dict)

In [15]:
list_of_dictionaries[0]

{'A': 0, 'B': 1}

In [16]:
bListOfDictionaries = sc.broadcast(list_of_dictionaries)

In [17]:
def replace(row):
    strings = row[1:117]
    my_dicts = bListOfDictionaries.value
    tuple_of_ints = ()
    for dict_, string in zip(my_dicts, strings):
        try:
            tuple_of_ints += (dict_[string],)
        except KeyError:
            tuple_of_ints += (0,)
    return (row[0],) + tuple_of_ints + row[117:]

In [18]:
final_rdd = cleaned_data_splitted.map(replace)

we now have integers instead of "A" or "C" or any other string.

In [19]:
print(final_rdd.first())

(1, 0, 1, 0, 1, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 3, 1, 1, 3, 3, 1, 3, 2, 1, 3, 1, 0, 0, 0, 0, 0, 3, 1, 2, 4, 0, 2, 15, 1, 6, 0, 0, 8, 4, 6, 9, 6, 45, 28, 2, 19, 55, 0, 14, 269, 0.7263, 0.245921, 0.187583, 0.789639, 0.310061, 0.718367, 0.33506, 0.3026, 0.67135, 0.8351, 0.569745, 0.594646, 0.822493, 0.714843, 2213.18)


In [20]:
to_delete = [2,3,4,5,6,7,8,96]    # cela correspond à cat2, cat3, cat4 ...
features_to_keep = list(range(116))
for idx in to_delete:
    features_to_keep.remove(idx - 1) # Car to_delete commence à 1

In [20]:
df = sqlContext.createDataFrame(final_rdd.map(lambda x: (float(x[-1]), Vectors.dense(x[1:-1]))), ["label", "features"])

In [50]:
indexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=20).fit(df)

In [51]:
(train, test) = df.randomSplit([0.7, 0.3])

In [52]:
dt = DecisionTreeRegressor(labelCol="label", featuresCol="indexedFeatures", maxDepth=20, maxBins=20)

In [53]:
pipeline = Pipeline(stages=[indexer, dt])

In [54]:
model = pipeline.fit(train)

In [55]:
predictionsTrain = model.transform(train)
predictionsTest = model.transform(test)

In [68]:
evaluator = RegressionEvaluator(metricName="mae")
rmseTrain = evaluator.evaluate(predictionsTrain)
print("Mean Squared Error (MSE) on test data = %g" % rmseTrain)

rmseTest = evaluator.evaluate(predictionsTest)
print("Mean Squared Error (MSE) on test data = %g" % rmseTest)

Mean Squared Error (MSE) on test data = 631.863
Mean Squared Error (MSE) on test data = 1570.83


In [25]:
# Defining the transformations
slicer = VectorSlicer(inputCol="features", outputCol="featuresSliced", indices = features_to_keep)
indexer = VectorIndexer(inputCol="featuresSliced", outputCol="indexedFeatures", maxCategories=20).fit(slicer.transform(df))
dt = DecisionTreeRegressor(labelCol="label", featuresCol="indexedFeatures", maxBins=20, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="variance")

# defining the pipeline
pipeline = Pipeline(stages=[slicer, indexer, dt])

# defining the parameters to test
paramGrid = ParamGridBuilder() \
    .addGrid(dt.maxDepth, [5]) \
    .build()
    
myRegressor = RegressionEvaluator(metricName="mae")
    
# defining the cross-validation
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=myRegressor,
                          numFolds=2)  # use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(df)

Cross-validation results (we see that the depth of 10 is better):

In [26]:
cvModel.avgMetrics

[1433.5199130594435]

Best model:

In [27]:
cvModel.bestModel

PipelineModel_4377bc1d5dd4a225144b

Training error for the best model (we're overfitting)

In [28]:
RegressionEvaluator(metricName="mae").evaluate(cvModel.bestModel.transform(df))

1424.2979358651687

Now we can try with all the features:

In [83]:
indexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=20).fit(df)
dt = DecisionTreeRegressor(labelCol="label", featuresCol="indexedFeatures", maxBins=20)

# defining the pipeline
pipeline = Pipeline(stages=[slicer, indexer, dt])

# defining the parameters to test
paramGrid = ParamGridBuilder() \
    .addGrid(dt.maxDepth, [10, 20]) \
    .build()
    
# defining the cross-validation
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(metricName="mae"),
                          numFolds=2)  # use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(df)

Is it better than last time?

In [84]:
cvModel.avgMetrics

[1359.6923954368876, 1588.2854298996053]

We see that those features bring absolutely no information. As shown in the other notebook "Allstate competition.ipynb".

In [39]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import MinMaxScaler
# Defining the transformations
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures").fit(df)

print(scaler.transform(df).show(1))
dt = LinearRegression(featuresCol="scaledFeatures")

# defining the pipeline
pipeline = Pipeline(stages=[scaler,dt])

# defining the parameters to test
paramGrid = ParamGridBuilder() \
    .addGrid(dt.maxIter, [25]) \
    .build()
    
myEvaluator = RegressionEvaluator(metricName="mae")
# defining the cross-validation
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=myEvaluator,
                          numFolds=2)  # use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(df)

print(cvModel.avgMetrics)
print(myEvaluator.evaluate(cvModel.bestModel.transform(df)))
print(cvModel.bestModel.stages)


+-------+--------------------+--------------------+
|  label|            features|      scaledFeatures|
+-------+--------------------+--------------------+
|2213.18|[0.0,1.0,0.0,1.0,...|[0.0,1.0,0.0,1.0,...|
+-------+--------------------+--------------------+
only showing top 1 row

None
[1331.6055414944035]
1329.4492453430448
[MinMaxScaler_4aeb955f79fe5cfac729, LinearRegression_48a4bb18db05c6db6cc5]


In [21]:
df = sqlContext.createDataFrame(final_rdd.map(lambda x: (float(x[-1]), Vectors.dense(x[1:117]), Vectors.dense(x[117:-1]))), ["label", "cat_features", "cont_features"])

In [22]:
df.show(1)

+-------+--------------------+--------------------+
|  label|        cat_features|       cont_features|
+-------+--------------------+--------------------+
|2213.18|[0.0,1.0,0.0,1.0,...|[0.7263,0.245921,...|
+-------+--------------------+--------------------+
only showing top 1 row



Let's try different configurations for a random forest.

In [65]:
# Defining the transformations
indexer = VectorIndexer(inputCol="cat_features", outputCol="indexedFeatures", maxCategories=20).fit(df)
slicer_cat = VectorSlicer(inputCol="indexedFeatures", outputCol="sliced_cat_Features", indices=features_to_keep)
slicer_cont = VectorSlicer(inputCol="cont_features", outputCol="sliced_cont_Features", indices = list(range(1)))
assembler = VectorAssembler(inputCols=["sliced_cat_Features", "sliced_cont_Features"], outputCol="features")

rf = RandomForestRegressor(labelCol="label", featuresCol="features", maxBins=280,\
                           maxMemoryInMB=500, subsamplingRate=0.9, cacheNodeIds=True, 
                           checkpointInterval=10, numTrees=5)

# defining the pipeline
pipeline = Pipeline(stages=[indexer, slicer_cat, slicer_cont, assembler, rf])

# defining the parameters to test
paramGrid = ParamGridBuilder() \
    .addGrid(rf.maxDepth, [3]) \
    .build()
    
myRegressor = RegressionEvaluator(metricName="mae")
    
# defining the cross-validation
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=myRegressor,
                          numFolds=3)  # use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(df)

In [67]:
print(cvModel.avgMetrics)
print(myRegressor.evaluate(cvModel.bestModel.transform(df)))
print(cvModel.bestModel.stages)

[1506.7323171092319]
1505.6181538122055
[VectorIndexer_4557be16f2cf9e01e187, VectorSlicer_4ee8b70f28ee273d0226, VectorSlicer_4740827b3bfdca001282, VectorAssembler_4a18a2d0732f223883b7, RandomForestRegressionModel (uid=rfr_a9b44af90c4d) with 5 trees]


In [37]:
df.columns

['label', 'cat_features', 'cont_features']

In [26]:
df.rdd.first()

Row(label=2213.18, cat_features=DenseVector([0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 3.0, 1.0, 1.0, 3.0, 3.0, 1.0, 3.0, 2.0, 1.0, 3.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 3.0, 1.0, 2.0, 4.0, 0.0, 2.0, 15.0, 1.0, 6.0, 0.0, 0.0, 8.0, 4.0, 6.0, 9.0, 6.0, 45.0, 28.0, 2.0, 19.0, 55.0, 0.0, 14.0, 269.0]), cont_features=DenseVector([0.7263, 0.2459, 0.1876, 0.7896, 0.3101, 0.7184, 0.3351, 0.3026, 0.6714, 0.8351, 0.5697, 0.5946, 0.8225, 0.7148]))

In [132]:
 def t(element, fitInfo = 0, args = []):
        new_tup = tuple(int(scalar*args[0]) for scalar in element)
        return Vectors.dense(new_tup)

In [237]:
class customTransformer:
    
    def __init__(self, inputCol, outputCol, *others):
        self.inputCol = inputCol
        self.outputCol = outputCol
        self.args = list(others)
        self.fitInfo = 0
        
    # Store information taken from the dataframe
    def fit(self, df):
        idx = df.columns.index(self.inputCol)
        self.fitInfo = fitting(df.rdd.map(lambda x: x[idx]))
        return self
            
    # This is the function to implement
    def fitting(self, rdd):
        return 0
            
    # This transforms a dataframe into another dataframe
    def transform(self, df):
        names = df.columns
        idx = names.index(self.inputCol)
        bInfo = sc.broadcast([0, self.args])
        new_column = df.rdd.map(lambda x: t(x[idx], bInfo.value[0], bInfo.value[1]), preservesPartitioning = True)
        print(new_column.first())
        old_rdd = df.rdd.map(lambda x: list(x))
        new_rdd = old_rdd.zip(new_column).map(lambda x: x[0] + [x[1]])
        print(new_rdd.first())
        print(self.outputCol)
        print(names)
        new_names = names + [self.outputCol]
        print(new_names)
        return sqlContext.createDataFrame(new_rdd, new_names)
            
    # This is a function to implement, 
    # fitInfo is the information gotten from the fit function
    # args are the arguments given when creating the instance
    @staticmethod
    def transforming(element, fitInfo = 0, args = []):
        return element

In [238]:
print([5,8,6] + [Vectors.dense([5,3,7])])

[5, 8, 6, DenseVector([5.0, 3.0, 7.0])]


In [239]:
def apply(df, listOfTransformers):
    df1 = df
    for transformer in listOfTransformers:
        df1 = transformer.transform(df1)
    return df1

In [240]:
class vectorBucketizer(customTransformer):
    
    def __init__(self, inputCol, outputCol, *others):
        customTransformer.__init__(self, inputCol, outputCol, *others)
    
    @staticmethod
    def transforming(element, fitInfo = 0, args = []):
        new_tup = tuple(int(scalar*args) for scalar in element)
        return Vectors.dense(new_tup)

In [241]:
df1 = vectorBucketizer("cont_features", "bucked_features", 10).transform(df)

[7.0,2.0,1.0,7.0,3.0,7.0,3.0,3.0,6.0,8.0,5.0,5.0,8.0,7.0]
[2213.18, DenseVector([0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 3.0, 1.0, 1.0, 3.0, 3.0, 1.0, 3.0, 2.0, 1.0, 3.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 3.0, 1.0, 2.0, 4.0, 0.0, 2.0, 15.0, 1.0, 6.0, 0.0, 0.0, 8.0, 4.0, 6.0, 9.0, 6.0, 45.0, 28.0, 2.0, 19.0, 55.0, 0.0, 14.0, 269.0]), DenseVector([0.7263, 0.2459, 0.1876, 0.7896, 0.3101, 0.7184, 0.3351, 0.3026, 0.6714, 0.8351, 0.5697, 0.5946, 0.8225, 0.7148]), DenseVector([7.0, 2.0, 1.0, 7.0, 3.0, 7.0, 3.0, 3.0, 6.0, 8.0, 5.0, 5.0, 8.0, 7.0])]
bucked_features
['label', 'cat_features', 'cont_features']
['label', 'cat_features', 'cont_features', 'bucked_feature

In [242]:
df1.rdd.first()

Row(label=2213.18, cat_features=DenseVector([0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 3.0, 1.0, 1.0, 3.0, 3.0, 1.0, 3.0, 2.0, 1.0, 3.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 3.0, 1.0, 2.0, 4.0, 0.0, 2.0, 15.0, 1.0, 6.0, 0.0, 0.0, 8.0, 4.0, 6.0, 9.0, 6.0, 45.0, 28.0, 2.0, 19.0, 55.0, 0.0, 14.0, 269.0]), cont_features=DenseVector([0.7263, 0.2459, 0.1876, 0.7896, 0.3101, 0.7184, 0.3351, 0.3026, 0.6714, 0.8351, 0.5697, 0.5946, 0.8225, 0.7148]), bucked_features=DenseVector([7.0, 2.0, 1.0, 7.0, 3.0, 7.0, 3.0, 3.0, 6.0, 8.0, 5.0, 5.0, 8.0, 7.0]))