#Kaggle Competition

In this session lab, we are going to compete in a Kaggle Competition.

First, we are going to upload the `train` and `test` datasets to databricks using the following route:

*Data -> Add Data -> Upload File*

**Note:** You have the option to select the location to store the files within DBFS.

Once the files are uploaded, we can use them in our environment.

You will need to change /FileStore/tables/train.csv with the name of the files and the path(s) that you chose to store them.

**Note 1:** When the upload is complete, you will get a confirmation along the path and name assigned. Filenames might be slightly modified by Databricks.

**Note 2:** If you missed the path and filename message you can navigate the DBFS via: *Data -> Add Data -> Upload File -> DBFS* or checking the content of the path `display(dbutils.fs.ls("dbfs:/FileStore/some_path"))`

In [3]:
train_data = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferSchema='true').load('/FileStore/tables/train_set-51e11.csv')

test_data = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferSchema='true').load('/FileStore/tables/test_set-b5f57.csv')

display(train_data)

Id,Elevation,Aspect,Slope,Horizontal_Distance_To_Hydrology,Vertical_Distance_To_Hydrology,Horizontal_Distance_To_Roadways,Hillshade_9am,Hillshade_Noon,Hillshade_3pm,Horizontal_Distance_To_Fire_Points,Wilderness_Area1,Wilderness_Area2,Wilderness_Area3,Wilderness_Area4,Soil_Type1,Soil_Type2,Soil_Type3,Soil_Type4,Soil_Type5,Soil_Type6,Soil_Type7,Soil_Type8,Soil_Type9,Soil_Type10,Soil_Type11,Soil_Type12,Soil_Type13,Soil_Type14,Soil_Type15,Soil_Type16,Soil_Type17,Soil_Type18,Soil_Type19,Soil_Type20,Soil_Type21,Soil_Type22,Soil_Type23,Soil_Type24,Soil_Type25,Soil_Type26,Soil_Type27,Soil_Type28,Soil_Type29,Soil_Type30,Soil_Type31,Soil_Type32,Soil_Type33,Soil_Type34,Soil_Type35,Soil_Type36,Soil_Type37,Soil_Type38,Soil_Type39,Soil_Type40,Cover_Type
1,2611,326,20,120,27,1597,168,214,184,2913,0,0,1,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,6
2,2772,324,17,42,7,1814,175,220,183,2879,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,2
3,2764,4,14,480,-21,700,201,212,148,700,0,0,1,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,2
4,3032,342,9,60,8,4050,202,227,164,2376,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,2
5,2488,23,11,117,21,1117,209,218,151,1136,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,2
6,2968,83,8,390,19,4253,232,226,127,4570,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,2
7,3027,11,6,534,47,1248,214,228,151,2388,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,2
8,3216,277,9,67,23,5430,212,236,169,2373,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1
9,3242,262,5,849,169,1672,207,242,173,691,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1
10,3315,61,15,120,-6,3042,231,208,106,1832,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,7


In [4]:
print('Train data size: {} rows, {} columns'.format(train_data.count(), len(train_data.columns)))
print('Test data size: {} rows, {} columns'.format(test_data.count(), len(test_data.columns)))

We will use the `VectorAssembler()` to merge our feature columns into a single vector column as requiered by Spark methods.

In [6]:
from pyspark.ml.feature import VectorAssembler

columns = train_data.columns
columns.remove("Id")
columns.remove("Cover_Type")

vector_assembler = VectorAssembler(inputCols=columns, outputCol="features")

We will split our training data to 2 parts: 90% that will be used for training and 10% for testing/validation

In [8]:
train_data_training, train_data_validation = train_data.randomSplit([0.9, 0.1])
train_data_training.cache()
train_data_validation.cache()

Before going to predicting, let's see the most important features using: `DecisionTreeClassifier`.

In [10]:
from pyspark.ml.classification import DecisionTreeClassifier

# Setup classifier
classifier = DecisionTreeClassifier(labelCol="Cover_Type", featuresCol="features")

assembled_data = vector_assembler.transform(train_data_training)
model = classifier.fit(assembled_data)

featuresImportance = model.featureImportances

for score, column in zip(featuresImportance, columns):
    if (score != 0):
        print(column, ":", score)

Elevation is the most important feature. In addition, most features have no importance when predicting the cover type. Especially the categorical features `Soil_Typex`

Let's see what the model predicts on training data and also the probabilities (giving the correctness of each prediction):

In [13]:
predictions = model.transform(assembled_data)
predictions.select("Cover_Type", "prediction", "probability").show(truncate=False)

Let's use `MulticlassClassificationEvaluator` in order to compute the accuracy of our model:

In [15]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="Cover_Type")

In [16]:
print("Accuracy:", evaluator.setMetricName("accuracy").evaluate(predictions))
print("F1-score:", evaluator.setMetricName("f1").evaluate(predictions))

We can see that our model achieve 70% of accuracy which is not bad, but can be improved. Let's evaluate our model differently using the confusion matrix that will tell us, for each class, how many times the prediction is correct and how many times there is a confusion with another class. `MulticlassMetrics` requires RDDs, so we'll transform our data into it and then apply the class.

In [18]:
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType

predictionRDD = predictions.select(col("prediction").cast(DoubleType()), col("Cover_Type").cast(DoubleType())).rdd
multiclassMetrics = MulticlassMetrics(predictionRDD)

In [19]:
multiclassMetrics.confusionMatrix().toArray()

Some counts in the diagonal are high which means that the model predicts many time the correct value. However, there are some high values on the non-diagonal values, for instance, the class 1 is many times confused with the second class. There are some categories that are never predicted like the fifth one.

There are many categorical data and most of them are not important in predicting the cover type. These categorical features are encoded using One Hot Encoding. Let's try to decode them:

In [26]:
from pyspark.sql.functions import udf
import numpy as np

def decode(t):
  r = list(np.array(t).reshape(-1))
  if (1. in r):
    return float(r.index(1.))
  else:
    return 0.

def decode_onehotencoding(df):
  wilderness_columns = ["Wilderness_Area"+str(i) for i in range(1,5)]
  
  wilderness_assembler = VectorAssembler(inputCols=wilderness_columns, outputCol="Wilderness")

  decode_UDF = udf(decode, DoubleType())

  wilderness_only = wilderness_assembler.transform(df).drop(*wilderness_columns).withColumn("Wilderness", decode_UDF(col("Wilderness"))) 

  soil_columns = ["Soil_Type"+str(i) for i in range(1, 41)]

  soil_assembler = VectorAssembler(inputCols=soil_columns, outputCol="Soil")

  return soil_assembler.transform(wilderness_only).drop(*soil_columns).withColumn("Soil", decode_UDF(col("Soil")))

In [29]:
train_data = decode_onehotencoding(train_data)
train_data_training, train_data_validation = train_data.randomSplit([0.9, 0.1])
train_data_training.cache()
train_data_validation.cache()

columns = train_data.columns
columns.remove("Id")
columns.remove("Cover_Type")

vector_assembler = VectorAssembler(inputCols=columns, outputCol="features")

Until now, we just transformed our features to indexes. But they're not ready to be used in the classification model, and in addition, the order between these transformed features is not important. There is a class `VectorIndexer` that will take care of what we need.

Before, we used the decision trees classifier without even tuning the parameters. Thanks to `ParamGridBuilder`, we can find the best parameters using `MulticlassClassificationEvaluator` as an evaluator to compare the parameters. We'll use `TrainValidationSplit` to split the traning data to 90% for training and 10% for validation in order to choose the parameters. 

We'll use a pipeline that will chain the vector assembler, the vector indexer and the classifier stages.

In [34]:
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.feature import VectorIndexer

# Setup classifier
classifier = DecisionTreeClassifier(labelCol="Cover_Type", featuresCol="indexer")

indexer = VectorIndexer(maxCategories=40, inputCol="features", outputCol="indexer")

# Chain vecAssembler and classificaiton model 
pipeline = Pipeline(stages=[vector_assembler, indexer, classifier])

params = (
  ParamGridBuilder()
  .addGrid(classifier.impurity, ["gini", "entropy"])
  .addGrid(classifier.maxDepth, [1, 20])
  .addGrid(classifier.maxBins, [40, 300])
  .addGrid(classifier.minInfoGain, [0.0, 0.05])
  .build()
)

evaluator = MulticlassClassificationEvaluator(labelCol="Cover_Type", metricName="accuracy")

validator = (
  TrainValidationSplit()
  .setEstimator(pipeline)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(params)
  .setTrainRatio(0.9)
)

# Run stages in pipeline with the train data
model = validator.fit(train_data_training)

Let's see the best score we're given.

In [35]:
max(model.validationMetrics)

Our model improved so much compared to the last one, and that's just by tuning the parameters of the decision tree and reducing the number of features. Let's evaluate the model on the validation data. In order to avoid confusion, `TrainValidationSplit` splitting was made in order to find the best parameters. Evaluating the model on the remaining data will be a good measure of how our model is good in predicting data it had never seen before.

In [36]:
evaluator.evaluate(model.transform(train_data_validation))

89% of accuracy is good. we can use it to make predictions on the test data.

In [40]:
test_data = decode_onehotencoding(test_data)

# Make predictions on testData
predictions = model.transform(test_data)

predictions = predictions.withColumn("Cover_Type", predictions["prediction"].cast("int"))  # Cast predictions to 'int' to mach the data type expected by Kaggle
# Show the content of 'predictions'
predictions.printSchema()


In [41]:
# Display predictions and probabilities
display(predictions.select("Cover_Type", "probability"))

Cover_Type,probability
2,"List(1, 8, List(), List(0.0, 0.09876543209876543, 0.8950617283950617, 0.0, 0.0, 0.006172839506172839, 0.0, 0.0))"
2,"List(1, 8, List(), List(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0))"
1,"List(1, 8, List(), List(0.0, 0.7263157894736842, 0.2736842105263158, 0.0, 0.0, 0.0, 0.0, 0.0))"
2,"List(1, 8, List(), List(0.0, 0.014792899408284023, 0.985207100591716, 0.0, 0.0, 0.0, 0.0, 0.0))"
1,"List(1, 8, List(), List(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0))"
1,"List(1, 8, List(), List(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0))"
1,"List(1, 8, List(), List(0.0, 0.9494252873563218, 0.05057471264367816, 0.0, 0.0, 0.0, 0.0, 0.0))"
3,"List(1, 8, List(), List(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0))"
2,"List(1, 8, List(), List(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0))"
2,"List(1, 8, List(), List(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0))"


Finally, we can create a file with the predictions.

In [43]:
# Select columns Id and prediction
(predictions
 .repartition(1)
 .select('Id', 'Cover_Type')
 .write
 .format('com.databricks.spark.csv')
 .options(header='true')
 .mode('overwrite')
 .save('/FileStore/kaggle-submission'))

To be able to download the predictions file, we need its name (`part-*.csv`):

In [45]:
display(dbutils.fs.ls("dbfs:/FileStore/kaggle-submission"))

path,name,size
dbfs:/FileStore/kaggle-submission/_committed_1066089883742585911,_committed_1066089883742585911,209
dbfs:/FileStore/kaggle-submission/_committed_2802196729756863731,_committed_2802196729756863731,199
dbfs:/FileStore/kaggle-submission/_committed_6646593098176635370,_committed_6646593098176635370,200
dbfs:/FileStore/kaggle-submission/_started_6646593098176635370,_started_6646593098176635370,0
dbfs:/FileStore/kaggle-submission/part-00000-tid-6646593098176635370-65f1e9d8-9bbe-41e0-8eaf-bfeeffcfb2e7-28961-c000.csv,part-00000-tid-6646593098176635370-65f1e9d8-9bbe-41e0-8eaf-bfeeffcfb2e7-28961-c000.csv,2039369


Files stored in /FileStore are accessible in your web browser via `https://<databricks-instance-name>.cloud.databricks.com/files/`.
  
For this example:

https://community.cloud.databricks.com/files/kaggle-submission/part-*.csv?o=######

where `part-*.csv` should be replaced by the name displayed in your system  and the number after `o=` is the same as in your Community Edition URL.


Finally, we can upload the predictions to kaggle and check what is the perfromance.