In [1]:
#### This cell is to make spark work on a windows laptop
import os
import sys

# Path for spark source folder
os.environ['SPARK_HOME']="C:\spark-2.0.1-bin-hadoop2.7"

# Append pyspark  to Python Path
sys.path.append("C:\spark-2.0.1-bin-hadoop2.7\python")
sys.path.append("C:\spark-2.0.1-bin-hadoop2.7\python\lib\py4j-0.10.3-src.zip")
#os.environ['SPARK_EXECUTOR_MEMORY']="5G"

try:
    from pyspark import SparkContext
    from pyspark import SparkConf
    print ("Successfully imported Spark Modules")

except ImportError as e:
    print ("Can not import Spark Modules", e)
    sys.exit(1)

# Initialize SparkContext
sc = SparkContext()
words = sc.parallelize(["scala","java","hadoop","spark","akka"])
print (words.count())
print(words.countByValue())

Successfully imported Spark Modules
5
defaultdict(<class 'int'>, {'spark': 1, 'hadoop': 1, 'scala': 1, 'akka': 1, 'java': 1})


In [98]:
import os
import sys
import re
from IPython.display import display
from pyspark import SparkContext
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import types
from pyspark.sql import Row
from pyspark.sql import functions
from pyspark.sql import SparkSession
%matplotlib inline
import matplotlib.pyplot as plt
import matplotlib.mlab as mlab
import pandas as pd
import numpy as np
import pyspark.sql.functions as func
import matplotlib.patches as mpatches
import time as time
from matplotlib.patches import Rectangle
import datetime
import ast
from operator import add
import math
from itertools import combinations
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import NaiveBayes
from pyspark.mllib.regression import LinearRegressionWithSGD
from pyspark.mllib.linalg.distributed import RowMatrix
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml import Pipeline
from pyspark.ml.regression import *
from pyspark.ml.feature import *
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.classification import *
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import random

log4jLogger = sc._jvm.org.apache.log4j
LOGGER = log4jLogger.LogManager.getLogger(__name__)

# First step: cleaning the data

In [3]:
input_path = "train.csv"
raw_data = sc.textFile(input_path)
print("number of rows before cleaning:", raw_data.count())

# extract the header
header = raw_data.first()

# replace invalid data with NULL and remove header
cleaned_data = raw_data.filter(lambda row: row != header)

print("number of rows after cleaning:", raw_data.count())
print("Number of partitions: " + str(raw_data.getNumPartitions()))

sqlContext = SQLContext(sc)

number of rows before cleaning: 188319
number of rows after cleaning: 188319
Number of partitions: 2


We now need to put our data into a dataframe, so that it's going to be easier to plot graphs and get a better insight into our data.

In [4]:
header

'id;cat1;cat2;cat3;cat4;cat5;cat6;cat7;cat8;cat9;cat10;cat11;cat12;cat13;cat14;cat15;cat16;cat17;cat18;cat19;cat20;cat21;cat22;cat23;cat24;cat25;cat26;cat27;cat28;cat29;cat30;cat31;cat32;cat33;cat34;cat35;cat36;cat37;cat38;cat39;cat40;cat41;cat42;cat43;cat44;cat45;cat46;cat47;cat48;cat49;cat50;cat51;cat52;cat53;cat54;cat55;cat56;cat57;cat58;cat59;cat60;cat61;cat62;cat63;cat64;cat65;cat66;cat67;cat68;cat69;cat70;cat71;cat72;cat73;cat74;cat75;cat76;cat77;cat78;cat79;cat80;cat81;cat82;cat83;cat84;cat85;cat86;cat87;cat88;cat89;cat90;cat91;cat92;cat93;cat94;cat95;cat96;cat97;cat98;cat99;cat100;cat101;cat102;cat103;cat104;cat105;cat106;cat107;cat108;cat109;cat110;cat111;cat112;cat113;cat114;cat115;cat116;cont1;cont2;cont3;cont4;cont5;cont6;cont7;cont8;cont9;cont10;cont11;cont12;cont13;cont14;loss'

Our taget is the loss. The id isn't actually useful and will be removed later. "cat" means categorical feature and "cont" means a continuous feature. we will use that to create the data schema. Note that we'll convert strings in Ints since it's easier to manipulate them later. (In particular for partitioning).

In [5]:
names = header.split(";")

In [6]:
cats = names[1:117]
conts = names[117:-1]

In [7]:
def create_StructField(string):
    hint = string[:3]
    if hint == "cat":
        datatype = types.IntegerType()
    elif hint == "con":
        datatype = types.FloatType()
    elif hint == "id":
        datatype = types.IntegerType()
    elif hint == "los":
        datatype = types.FloatType()
    else:
        raise ValueError("Can\'t read this string:" + hint )

    return types.StructField(string, datatype, False)

In [8]:
structField_list = [create_StructField(string) for string in names]

In [9]:
data_schema = types.StructType(structField_list)

In [10]:
def tryeval(val,column_number):
    if column_number == 0:
        return int(val)
    elif 1 <= column_number <= 116:
        return val
    elif 117 <= column_number <= 131:
        return float(val)
    else:
        raise Exception("There is a big problem")

def to_tuple(string, character = ";"):
    list_of_strings = string.split(character)
    return tuple(tryeval(string, n) for n, string in enumerate(list_of_strings))

cleaned_data_splitted = cleaned_data.map(lambda x: to_tuple(x))

In [11]:
cleaned_data_splitted.count()

188318

Now, we'll try to convert the categorical values in integers as it's way easier to work with later on.

In [12]:
def to_tuples(list_):
    return tuple((string,) for string in list_)

def fusion(x, y):
    return tuple(tuple(set(xi + yi)) for xi, yi in zip(x,y))

list_of_dictionaries = []
a = cleaned_data_splitted.map(lambda x: to_tuples(x[1:117])).reduce(fusion)

In [13]:
sorted_tuples = tuple(tuple(sorted(tup)) for tup in a)

Now we have all the categories in order. We'll put them in a list of dictionaries as it makes it simpler to modify the RDD after that.

In [14]:
for tup in sorted_tuples:
    my_dict = dict()
    for idx, cat in enumerate(tup):
        my_dict[cat] = idx
    list_of_dictionaries.append(my_dict)

In [15]:
list_of_dictionaries[0]

{'A': 0, 'B': 1}

In [16]:
bListOfDictionaries = sc.broadcast(list_of_dictionaries)

In [17]:
def replace(row):
    strings = row[1:117]
    my_dicts = bListOfDictionaries.value
    tuple_of_ints = ()
    for dict_, string in zip(my_dicts, strings):
        try:
            tuple_of_ints += (dict_[string],)
        except KeyError:
            tuple_of_ints += (0,)
    return (row[0],) + tuple_of_ints + row[117:]

In [18]:
final_rdd = cleaned_data_splitted.map(replace)

we now have integers instead of "A" or "C" or any other string.

In [19]:
print(final_rdd.first())

(1, 0, 1, 0, 1, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 3, 1, 1, 3, 3, 1, 3, 2, 1, 3, 1, 0, 0, 0, 0, 0, 3, 1, 2, 4, 0, 2, 15, 1, 6, 0, 0, 8, 4, 6, 9, 6, 45, 28, 2, 19, 55, 0, 14, 269, 0.7263, 0.245921, 0.187583, 0.789639, 0.310061, 0.718367, 0.33506, 0.3026, 0.67135, 0.8351, 0.569745, 0.594646, 0.822493, 0.714843, 2213.18)


In [20]:
to_delete = [2,3,4,5,6,7,8,96]    # cela correspond à cat2, cat3, cat4 ...
features_to_keep = list(range(116))
for idx in to_delete:
    features_to_keep.remove(idx - 1) # Car to_delete commence à 1

In [20]:
df = sqlContext.createDataFrame(final_rdd.map(lambda x: (float(x[-1]), Vectors.dense(x[1:-1]))), ["label", "features"])

In [50]:
indexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=20).fit(df)

In [51]:
(train, test) = df.randomSplit([0.7, 0.3])

In [52]:
dt = DecisionTreeRegressor(labelCol="label", featuresCol="indexedFeatures", maxDepth=20, maxBins=20)

In [53]:
pipeline = Pipeline(stages=[indexer, dt])

In [54]:
model = pipeline.fit(train)

In [55]:
predictionsTrain = model.transform(train)
predictionsTest = model.transform(test)

In [68]:
evaluator = RegressionEvaluator(metricName="mae")
rmseTrain = evaluator.evaluate(predictionsTrain)
print("Mean Squared Error (MSE) on test data = %g" % rmseTrain)

rmseTest = evaluator.evaluate(predictionsTest)
print("Mean Squared Error (MSE) on test data = %g" % rmseTest)

Mean Squared Error (MSE) on test data = 631.863
Mean Squared Error (MSE) on test data = 1570.83


In [25]:
# Defining the transformations
slicer = VectorSlicer(inputCol="features", outputCol="featuresSliced", indices = features_to_keep)
indexer = VectorIndexer(inputCol="featuresSliced", outputCol="indexedFeatures", maxCategories=20).fit(slicer.transform(df))
dt = DecisionTreeRegressor(labelCol="label", featuresCol="indexedFeatures", maxBins=20, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="variance")

# defining the pipeline
pipeline = Pipeline(stages=[slicer, indexer, dt])

# defining the parameters to test
paramGrid = ParamGridBuilder() \
    .addGrid(dt.maxDepth, [5]) \
    .build()
    
myRegressor = RegressionEvaluator(metricName="mae")
    
# defining the cross-validation
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=myRegressor,
                          numFolds=2)  # use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(df)

Cross-validation results (we see that the depth of 10 is better):

In [26]:
cvModel.avgMetrics

[1433.5199130594435]

Best model:

In [27]:
cvModel.bestModel

PipelineModel_4377bc1d5dd4a225144b

Training error for the best model (we're overfitting)

In [28]:
RegressionEvaluator(metricName="mae").evaluate(cvModel.bestModel.transform(df))

1424.2979358651687

Now we can try with all the features:

In [83]:
indexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=20).fit(df)
dt = DecisionTreeRegressor(labelCol="label", featuresCol="indexedFeatures", maxBins=20)

# defining the pipeline
pipeline = Pipeline(stages=[slicer, indexer, dt])

# defining the parameters to test
paramGrid = ParamGridBuilder() \
    .addGrid(dt.maxDepth, [10, 20]) \
    .build()
    
# defining the cross-validation
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(metricName="mae"),
                          numFolds=2)  # use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(df)

Is it better than last time?

In [84]:
cvModel.avgMetrics

[1359.6923954368876, 1588.2854298996053]

We see that those features bring absolutely no information. As shown in the other notebook "Allstate competition.ipynb".

In [39]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import MinMaxScaler
# Defining the transformations
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures").fit(df)

print(scaler.transform(df).show(1))
dt = LinearRegression(featuresCol="scaledFeatures")

# defining the pipeline
pipeline = Pipeline(stages=[scaler,dt])

# defining the parameters to test
paramGrid = ParamGridBuilder() \
    .addGrid(dt.maxIter, [25]) \
    .build()
    
myEvaluator = RegressionEvaluator(metricName="mae")
# defining the cross-validation
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=myEvaluator,
                          numFolds=2)  # use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(df)

print(cvModel.avgMetrics)
print(myEvaluator.evaluate(cvModel.bestModel.transform(df)))
print(cvModel.bestModel.stages)


+-------+--------------------+--------------------+
|  label|            features|      scaledFeatures|
+-------+--------------------+--------------------+
|2213.18|[0.0,1.0,0.0,1.0,...|[0.0,1.0,0.0,1.0,...|
+-------+--------------------+--------------------+
only showing top 1 row

None
[1331.6055414944035]
1329.4492453430448
[MinMaxScaler_4aeb955f79fe5cfac729, LinearRegression_48a4bb18db05c6db6cc5]


In [21]:
df = sqlContext.createDataFrame(final_rdd.map(lambda x: (float(x[-1]), Vectors.dense(x[1:117]), Vectors.dense(x[117:-1]))), ["label", "cat_features", "cont_features"])

In [22]:
df.show(1)

+-------+--------------------+--------------------+
|  label|        cat_features|       cont_features|
+-------+--------------------+--------------------+
|2213.18|[0.0,1.0,0.0,1.0,...|[0.7263,0.245921,...|
+-------+--------------------+--------------------+
only showing top 1 row



Let's try different configurations for a random forest.

In [65]:
# Defining the transformations
indexer = VectorIndexer(inputCol="cat_features", outputCol="indexedFeatures", maxCategories=20).fit(df)
slicer_cat = VectorSlicer(inputCol="indexedFeatures", outputCol="sliced_cat_Features", indices=features_to_keep)
slicer_cont = VectorSlicer(inputCol="cont_features", outputCol="sliced_cont_Features", indices = list(range(1)))
assembler = VectorAssembler(inputCols=["sliced_cat_Features", "sliced_cont_Features"], outputCol="features")

rf = RandomForestRegressor(labelCol="label", featuresCol="features", maxBins=280,\
                           maxMemoryInMB=500, subsamplingRate=0.9, cacheNodeIds=True, 
                           checkpointInterval=10, numTrees=5)

# defining the pipeline
pipeline = Pipeline(stages=[indexer, slicer_cat, slicer_cont, assembler, rf])

# defining the parameters to test
paramGrid = ParamGridBuilder() \
    .addGrid(rf.maxDepth, [3]) \
    .build()
    
myRegressor = RegressionEvaluator(metricName="mae")
    
# defining the cross-validation
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=myRegressor,
                          numFolds=3)  # use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(df)

In [67]:
print(cvModel.avgMetrics)
print(myRegressor.evaluate(cvModel.bestModel.transform(df)))
print(cvModel.bestModel.stages)

[1506.7323171092319]
1505.6181538122055
[VectorIndexer_4557be16f2cf9e01e187, VectorSlicer_4ee8b70f28ee273d0226, VectorSlicer_4740827b3bfdca001282, VectorAssembler_4a18a2d0732f223883b7, RandomForestRegressionModel (uid=rfr_a9b44af90c4d) with 5 trees]


In [23]:
df.columns

['label', 'cat_features', 'cont_features']

In [24]:
df.rdd.first()

Row(label=2213.18, cat_features=DenseVector([0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 3.0, 1.0, 1.0, 3.0, 3.0, 1.0, 3.0, 2.0, 1.0, 3.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 3.0, 1.0, 2.0, 4.0, 0.0, 2.0, 15.0, 1.0, 6.0, 0.0, 0.0, 8.0, 4.0, 6.0, 9.0, 6.0, 45.0, 28.0, 2.0, 19.0, 55.0, 0.0, 14.0, 269.0]), cont_features=DenseVector([0.7263, 0.2459, 0.1876, 0.7896, 0.3101, 0.7184, 0.3351, 0.3026, 0.6714, 0.8351, 0.5697, 0.5946, 0.8225, 0.7148]))

In [25]:
 def t(element, fitInfo = 0, args = []):
        new_tup = tuple(int(scalar*args[0]) for scalar in element)
        return Vectors.dense(new_tup)

In [55]:
class customTransformer:
    
    def __init__(self, inputCol, outputCol, *others):
        self.inputCol = inputCol
        self.outputCol = outputCol
        self.args = list(others)
        self.fitInfo = 0
        
    # Store information taken from the dataframe
    def fit(self, df, fitting):
        idx = df.columns.index(self.inputCol)
        self.fitInfo = fitting(df.rdd.map(lambda x: x[idx]))
        return self
            
    # This transforms a dataframe into another dataframe
    def transform(self, df, transforming):
        
        # We get the index of the colmumns we'll be working on
        names = df.columns
        idx = names.index(self.inputCol)
        
        # We apply the transformation
        bInfo = sc.broadcast(tuple([0,[10]]))
        new_column = df.rdd.map(lambda x: transforming(x[idx], bInfo.value[0], bInfo.value[1]), True)
        
        # We attach the results to the old rdd
        old_rdd = df.rdd.map(lambda x: list(x))
        new_rdd = old_rdd.zip(new_column).map(lambda x: x[0] + [x[1]])
        new_names = names + [self.outputCol]
        
        return sqlContext.createDataFrame(new_rdd, new_names)

In [56]:
def apply(df, listOfTransformers):
    df1 = df
    for transformer in listOfTransformers:
        df1 = transformer.transform(df1)
    return df1

In [57]:
def t_bucketize(element, fitInfo = 0, args = []):
    new_tup = tuple(int(scalar*args[0]) for scalar in element)
    return Vectors.dense(new_tup)

In [61]:
# Pre-process
df1 = customTransformer("cont_features", "cont_bucked_features", 7).transform(df, t_bucketize).cache()

In [62]:
# Defining the transformations
indexer_cat = VectorIndexer(inputCol="cat_features", outputCol="cat_indexedFeatures", maxCategories=20).fit(df1)
indexer_cont = VectorIndexer(inputCol="cont_bucked_features", outputCol="cont_indexedFeatures", maxCategories=7).fit(df1)
slicer_cat = VectorSlicer(inputCol="cat_indexedFeatures", outputCol="sliced_cat_Features", indices=features_to_keep)
slicer_cont = VectorSlicer(inputCol="cont_indexedFeatures", outputCol="sliced_cont_Features", indices = list(range(2)))
assembler = VectorAssembler(inputCols=["sliced_cat_Features", "sliced_cont_Features"], outputCol="features")

rf = RandomForestRegressor(labelCol="label", featuresCol="features", maxBins=300,\
                           maxMemoryInMB=500, subsamplingRate=0.9, cacheNodeIds=True, 
                           checkpointInterval=10, numTrees=5)

# defining the pipeline
pipeline = Pipeline(stages=[indexer_cat, indexer_cont, slicer_cat, slicer_cont, assembler, rf])

# defining the parameters to test
paramGrid = ParamGridBuilder() \
    .addGrid(rf.maxDepth, [3]) \
    .build()
    
myRegressor = RegressionEvaluator(metricName="mae")
    
# defining the cross-validation
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=myRegressor,
                          numFolds=3)  # use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(df1)

In [63]:
print(cvModel.avgMetrics)
print(myRegressor.evaluate(cvModel.bestModel.transform(df1)))
print(cvModel.bestModel.stages)

[1502.4312263494066]
1496.8333043057692
[VectorIndexer_426baa11bfc634256996, VectorIndexer_44a39d01fe167cb77622, VectorSlicer_4ff28b35d924d51b6a10, VectorSlicer_4f58872cab1f2ec64485, VectorAssembler_4ab7a49df3fc35f97d31, RandomForestRegressionModel (uid=rfr_734c16e8a26e) with 5 trees]


The cross-validation of spark isn't really helpful because we can't get the results are they are calculated. We'll do a custom search for hyperparameters.

In [64]:
# Defining the transformations
indexer_cat = VectorIndexer(inputCol="cat_features", outputCol="cat_indexedFeatures", maxCategories=300).fit(df1)
indexer_cont = VectorIndexer(inputCol="cont_bucked_features", outputCol="cont_indexedFeatures", maxCategories=7).fit(df1)
slicer_cat = VectorSlicer(inputCol="cat_indexedFeatures", outputCol="sliced_cat_Features", indices=features_to_keep)
slicer_cont = VectorSlicer(inputCol="cont_indexedFeatures", outputCol="sliced_cont_Features", indices = list(range(2)))
assembler = VectorAssembler(inputCols=["sliced_cat_Features", "sliced_cont_Features"], outputCol="features")

rf = RandomForestRegressor(labelCol="label", featuresCol="features", maxBins=300,\
                           maxMemoryInMB=500, subsamplingRate=0.9, cacheNodeIds=True, 
                           checkpointInterval=10, numTrees=5)

df2 = apply(df1,[indexer_cat, indexer_cont, slicer_cat, slicer_cont, assembler]).cache()
df1.unpersist()
# defining the pipeline
pipeline = Pipeline(stages=[rf])


In [None]:
depths = [3,5,7,10,14,17,20,25]

for depth in depths:

    # defining the parameters to test
    paramGrid = ParamGridBuilder() \
        .addGrid(rf.maxDepth, [depth]) \
        .build()

    myRegressor = RegressionEvaluator(metricName="mae")

    # defining the cross-validation
    crossval = CrossValidator(estimator=pipeline,
                              estimatorParamMaps=paramGrid,
                              evaluator=myRegressor,
                              numFolds=3)  # use 3+ folds in practice

    # Run cross-validation, and choose the best set of parameters.
    cvModel = crossval.fit(df2)
    print("depth:" + str(depth))
    print(cvModel.avgMetrics[0]/3)
    print(myRegressor.evaluate(cvModel.bestModel.transform(df2)))
    print(cvModel.bestModel.stages)

Let's print the results:

Now we'll try different sets of features to have an idea of which ones are better.

In [104]:
# We now have 10 set of features to test
sets_cat_features = [sorted(random.sample(features_to_keep,60)) for _ in range(10)]
sets_cont_features = [sorted(random.sample(list(range(14)),8)) for _ in range(10)]
sets_features = list(zip(sets_cat_features,sets_cont_features))

In [None]:
# Defining the transformations
indexer_cat = VectorIndexer(inputCol="cat_features", outputCol="cat_indexedFeatures", maxCategories=300).fit(df1)
indexer_cont = VectorIndexer(inputCol="cont_bucked_features", outputCol="cont_indexedFeatures", maxCategories=7).fit(df1)


rf = RandomForestRegressor(labelCol="label", featuresCol="features", maxBins=300,\
                           maxMemoryInMB=500, subsamplingRate=0.9, cacheNodeIds=True, 
                           checkpointInterval=10, maxDepth=9, numTrees=5)

df2 = apply(df1,[indexer_cat, indexer_cont]).cache()
df1.unpersist()

In [None]:
for set_features in sets_features:

    slicer_cat=VectorSlicer(inputCol="cat_indexedFeatures", outputCol="sliced_cat_Features", indices=set_features[0])
    slicer_cont=VectorSlicer(inputCol="cont_indexedFeatures", outputCol="sliced_cont_Features", indices = set_features[1])
    assembler = VectorAssembler(inputCols=["sliced_cat_Features", "sliced_cont_Features"], outputCol="features")
    pipeline = Pipeline(stages=[slicer_cat, slicer_cont, assembler,rf])
    # defining the parameters to test
    paramGrid = ParamGridBuilder()\
        .build()

    myRegressor = RegressionEvaluator(metricName="mae")

    # defining the cross-validation
    crossval = CrossValidator(estimator=pipeline,
                              estimatorParamMaps=paramGrid,
                              evaluator=myRegressor,
                              numFolds=3)  # use 3+ folds in practice

    # Run cross-validation, and choose the best set of parameters.
    cvModel = crossval.fit(df2)
    print("features: " + str(set_features))
    print(cvModel.avgMetrics[0]/3) # this is because of a bug of spark 2.0.0
    print(myRegressor.evaluate(cvModel.bestModel.transform(df2)))
    print(cvModel.bestModel.stages)

This may be crazy, but i'll try to fit a linear regression to the result of the cross-validation. Reading the weights should tell us what features to keep.

In [117]:
cv_results = []
cv_results.append(tuple([[0, 8, 20, 23, 29, 30, 33, 34, 35, 37, 42, 43, 44, 45, 46, 47, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 63, 64, 66, 67, 68, 69, 70, 72, 73, 74, 75, 78, 81, 82, 83, 84, 85, 88, 89, 91, 92, 93, 96, 98, 100, 101, 103, 106, 107, 108, 109, 112, 114, 115], [0, 1, 2, 5, 7, 9, 11, 13],1431.54795088]))
cv_results.append(tuple([[0, 11, 12, 13, 15, 18, 20, 23, 25, 26, 28, 29, 31, 33, 35, 36, 37, 38, 40, 41, 43, 45, 46, 47, 48, 49, 50, 53, 54, 57, 59, 60, 65, 67, 72, 75, 77, 79, 81, 82, 83, 84, 85, 87, 88, 89, 91, 92, 93, 96, 97, 98, 99, 100, 101, 102, 104, 108, 112, 113], [0, 1, 2, 3, 7, 8, 10, 11],1372.77685625]))
cv_results.append(tuple([[8, 9, 12, 15, 16, 18, 19, 21, 22, 24, 29, 31, 34, 37, 39, 41, 42, 43, 44, 45, 47, 48, 52, 53, 55, 57, 58, 59, 61, 63, 65, 71, 73, 75, 76, 78, 79, 80, 81, 82, 84, 87, 89, 91, 93, 94, 96, 97, 98, 99, 102, 103, 104, 106, 107, 108, 109, 112, 113, 114], [0, 1, 2, 3, 4, 5, 8, 12],1382.60758841]))
cv_results.append(tuple([[8, 12, 14, 17, 18, 20, 21, 22, 23, 27, 28, 30, 32, 34, 35, 36, 38, 39, 40, 42, 45, 47, 50, 52, 53, 54, 55, 56, 57, 58, 59, 62, 63, 67, 68, 69, 72, 75, 76, 82, 83, 84, 86, 87, 88, 89, 90, 91, 93, 94, 97, 98, 99, 100, 102, 103, 104, 106, 110, 111], [2, 3, 4, 6, 8, 10, 11, 13],1453.34577263]))
cv_results.append(tuple([[8, 10, 13, 14, 15, 16, 17, 18, 19, 22, 23, 28, 30, 34, 35, 40, 42, 43, 47, 49, 50, 51, 53, 54, 57, 61, 62, 63, 64, 67, 68, 69, 70, 71, 72, 77, 79, 81, 82, 83, 84, 86, 87, 88, 89, 90, 93, 94, 96, 98, 101, 103, 104, 107, 108, 109, 110, 112, 114, 115], [1, 2, 3, 5, 6, 9, 10, 13],1435.7798927]))
cv_results.append(tuple([[10, 12, 13, 14, 17, 19, 20, 22, 23, 26, 32, 33, 34, 36, 37, 38, 39, 40, 43, 47, 50, 54, 55, 57, 60, 61, 63, 64, 65, 66, 67, 69, 70, 72, 74, 76, 77, 78, 80, 81, 82, 84, 85, 86, 87, 89, 90, 92, 93, 97, 99, 100, 102, 107, 109, 110, 111, 112, 113, 115], [0, 2, 3, 5, 6, 8, 10, 12],1406.99266046]))
cv_results.append(tuple([[0, 8, 12, 17, 19, 20, 21, 25, 26, 27, 31, 32, 33, 34, 35, 36, 39, 40, 41, 44, 45, 47, 49, 52, 54, 56, 58, 59, 62, 63, 64, 65, 67, 68, 70, 71, 72, 75, 76, 78, 80, 82, 84, 85, 86, 87, 88, 92, 93, 94, 97, 98, 101, 102, 105, 107, 108, 112, 113, 114], [0, 2, 5, 6, 8, 9, 10, 13],1427.41651413]))
cv_results.append(tuple([[0, 9, 11, 14, 16, 17, 18, 20, 22, 23, 24, 28, 30, 31, 34, 35, 36, 37, 39, 40, 45, 49, 50, 51, 52, 53, 54, 58, 59, 61, 64, 65, 67, 68, 70, 71, 73, 74, 76, 79, 80, 81, 83, 84, 87, 88, 90, 91, 94, 98, 99, 102, 103, 105, 106, 107, 109, 110, 112, 114], [0, 1, 5, 7, 8, 9, 10, 11],1369.85793953]))
cv_results.append(tuple([[8, 12, 14, 15, 17, 18, 23, 26, 27, 30, 31, 36, 37, 39, 40, 43, 44, 45, 46, 47, 49, 50, 51, 54, 55, 56, 58, 61, 62, 63, 65, 66, 71, 72, 74, 77, 78, 80, 81, 85, 86, 87, 89, 90, 91, 93, 96, 97, 98, 99, 100, 101, 104, 105, 106, 107, 108, 109, 112, 115], [0, 3, 4, 8, 9, 10, 12, 13],1405.87042128]))
cv_results.append(tuple([[0, 9, 14, 15, 17, 19, 21, 23, 24, 25, 26, 28, 29, 30, 31, 32, 33, 36, 38, 39, 40, 41, 44, 47, 48, 50, 51, 52, 54, 55, 56, 58, 59, 60, 62, 63, 65, 66, 68, 69, 71, 73, 74, 75, 76, 80, 84, 85, 89, 93, 94, 97, 100, 102, 105, 107, 109, 110, 114, 115], [1, 2, 4, 6, 9, 11, 12, 13],1464.52998139]))





In [118]:
def to_binary(my_list,n):
    result = [0 for _ in range(n)]
    for i in my_list:
        result[i] = 1
    return result

In [119]:
x = []
y = []
for result in cv_results:
    xi = to_binary(result[0],116) + to_binary(result[1],14)
    x.append(xi)
    
    y.append(result[2])


In [120]:
from sklearn import linear_model
regr = linear_model.LinearRegression()
regr.fit(x,y)

LinearRegression(copy_X=True, fit_intercept=True, n_jobs=1, normalize=False)

In [124]:
print('Coefficients: \n', sorted(range(len(regr.coef_)), key=lambda k: regr.coef_[k]))

Coefficients: 
 [37, 116, 99, 124, 81, 11, 65, 79, 112, 61, 31, 113, 123, 91, 87, 49, 45, 18, 80, 16, 12, 126, 43, 121, 102, 20, 77, 9, 24, 106, 98, 22, 53, 46, 78, 96, 92, 13, 36, 119, 117, 48, 108, 0, 73, 74, 39, 64, 70, 90, 109, 67, 105, 41, 26, 128, 71, 104, 60, 107, 59, 76, 83, 34, 1, 4, 6, 5, 2, 95, 3, 7, 15, 29, 97, 40, 103, 35, 88, 57, 84, 101, 10, 85, 127, 82, 23, 50, 25, 58, 111, 33, 114, 28, 54, 51, 72, 38, 52, 14, 27, 44, 66, 125, 19, 17, 94, 100, 110, 89, 120, 86, 75, 55, 47, 93, 8, 42, 118, 21, 115, 30, 32, 63, 68, 56, 122, 62, 69, 129]


given a list, we should be able to search the best combinaisons of features:

In [128]:
feat_cat = [37, 116, 99, 124, 81, 11, 65, 79, 112, 61, 31, 113, 123, 91, 87, 49, 45, 18, 80, 16, 12, 126, 43, 121, 102, 20, 77, 9, 24, 106, 98, 22, 53, 46, 78, 96, 92, 13, 36, 119, 117, 48, 108, 0, 73, 74, 39, 64, 70, 90, 109, 67, 105, 41, 26, 128, 71, 104, 60, 107, 59, 76, 83, 34, 1, 4, 6, 5, 2, 95, 3, 7, 15, 29, 97, 40, 103, 35, 88, 57, 84, 101, 10, 85, 127, 82, 23, 50, 25, 58, 111, 33, 114, 28, 54, 51, 72, 38]
feat_cont = [a - 116  for a in feat_cat if a>=116]
feat_cat = [a for a in feat_cat if a<116]
# We now have 10 set of features to test
sets_cat_features = [sorted(random.sample(feat_cat,60)) for _ in range(10)]
sets_cont_features = [sorted(random.sample(feat_cont,8)) for _ in range(10)]
sets_features = list(zip(sets_cat_features,sets_cont_features))

let's try to parse the results, it'll be easier to use it

In [203]:
x = []
y = []
f = open("results_cluster.txt","r")
for i,line in enumerate(f):
    if line not in ['\n', '\r\n']:
        if i%4 == 0:
            g = line[12:-3]
            LOGGER.warn(g)
            a, b = g.split("]")
            b = b[3:]
            a = [int(j) for j in a.split(",")]
            b = [int(j) for j in b.split(",")]
            x.append(to_binary(a,116) + to_binary(b,14))
        if i%4 ==1:
            y.append(float(line[:-1]))

In [204]:
regr = linear_model.LinearRegression()
regr.fit(x,y)

LinearRegression(copy_X=True, fit_intercept=True, n_jobs=1, normalize=False)

In [205]:
print('Coefficients: \n', sorted(range(len(regr.coef_)), key=lambda k: regr.coef_[k]))

Coefficients: 
 [118, 79, 116, 14, 23, 61, 6, 57, 109, 59, 121, 78, 54, 99, 68, 47, 119, 117, 15, 49, 45, 124, 126, 97, 0, 127, 76, 129, 80, 81, 25, 114, 36, 92, 91, 58, 103, 104, 18, 65, 113, 128, 7, 27, 100, 101, 82, 20, 106, 53, 122, 13, 72, 105, 40, 9, 108, 115, 35, 63, 17, 83, 66, 93, 52, 24, 60, 8, 1, 75, 102, 39, 84, 28, 51, 85, 43, 86, 33, 44, 34, 70, 21, 95, 87, 16, 3, 69, 74, 123, 71, 26, 56, 94, 2, 29, 73, 38, 30, 77, 11, 19, 37, 64, 90, 12, 46, 50, 41, 112, 31, 10, 107, 48, 111, 120, 55, 22, 62, 88, 42, 4, 125, 96, 98, 32, 5, 110, 67, 89]


In [206]:
sorted(regr.coef_)

[-24.34608711058107,
 -23.96121071208772,
 -22.833092826059875,
 -22.417759974304964,
 -19.41323596719301,
 -17.769796910249664,
 -13.981240424954068,
 -12.037945880656888,
 -12.022632507887598,
 -11.243139846628056,
 -11.138901683645873,
 -11.096599511662994,
 -10.689322973732185,
 -9.3010524368919558,
 -9.1023924714377564,
 -7.5937417627592323,
 -7.4310486509665266,
 -7.1460850835086323,
 -7.0806478160123749,
 -7.0003509482078865,
 -6.7722103607461177,
 -6.4632575379510326,
 -6.3283253557476655,
 -5.0618566316580038,
 -4.9127648317981905,
 -4.7691068844622517,
 -4.3631250912961965,
 -4.3069760664321279,
 -4.2104635111248836,
 -4.1974093422979397,
 -4.1027140067414445,
 -3.8884465089143823,
 -3.8268796720604055,
 -3.8109257575731759,
 -3.4804396901984052,
 -3.2834973524173847,
 -3.1508780503756979,
 -3.1078552932334231,
 -2.9139638916870148,
 -2.6936952976410007,
 -2.3177243533806173,
 -2.2477351184445284,
 -2.1846083022503215,
 -2.0574471611072855,
 -1.7591961265984442,
 -1.738842234