In [None]:
%matplotlib inline
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import sys
import os
sys.version

In [None]:
os.environ['PATH']

Use "python_2" conda environment, matching "Python 2" kernel. We need to use the same Python version in this kernel, which will act as a "Spark Driver", than the environment used in the Spark Executors.

In [None]:
assert sys.version_info.major < 3, ("Python 3 not supported on our Spark environment, "
                                    "please use this notebook in Python 2.7 kernel")

# Bosch Production Line Kaggle Challenge

This notebook analyze the data provided in the [Bosch Challenge](https://www.kaggle.com/c/bosch-production-line-performance). 2016.

This challenge provides "one of the largest datasets (in terms of number of features) ever hosted on Kaggle", that makes is a very good candidate of "Big Data" investigation using our fresh new Spark Infrastructure.

Beware of the various enviromnents:
- This Notebook hosted on a intranet server
- It uses a Kernel is also running on this machine. This kernel is a Python 2 kernel, because Python 2 is installed on the Spark Cluster and we need to use the same Python version. It uses an anaconda installation with some useful libraries.
- The Spark Driver run on the same kernel, so also on the same machine
- The Spark Executors are remote machine, and will use another Python installation.

Some deps are installed on the current env. They are Python dependencies to be installed on the Jupyter environment, ie, they may **not** be available on the Spark Cluster Python environments.

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import os
import sys
from xgboost import XGBClassifier

os.environ['PYSPARK_SUBMIT_ARGS'] = os.environ['PYSPARK_SUBMIT_ARGS'].replace("--total-executor-cores 10", "--total-executor-cores 20")
os.environ['PYSPARK_SUBMIT_ARGS'] = os.environ['PYSPARK_SUBMIT_ARGS'].replace("--executor-memory 16G", "--executor-memory 64G")
os.environ['PYSPARK_SUBMIT_ARGS']

def injectPackageInSpark(package_name, package_string):
    if package_name not in os.environ.get("PYSPARK_SUBMIT_ARGS", ""):
        os.environ["PYSPARK_SUBMIT_ARGS"] = (" --packages {} ".format(package_string) + os.environ.get("PYSPARK_SUBMIT_ARGS", ""))

def initPySpark():
    if "pyspark-shell" not in os.environ.get("PYSPARK_SUBMIT_ARGS", ""):
        # If defined, PYSPARK_SUBMIT_ARGS needs to specify the shell to use
        os.environ["PYSPARK_SUBMIT_ARGS"] += " pyspark-shell"

We want to connect to our Spark Cluster. The environment variables are **not** set yet so PySpark can be found and connect to the cluster.

We use findspark that can inject the right environment variables to initialize the `pyspark` module.

In [None]:
import findspark
findspark.init(python_path="/opt/sklearn_env/bin/python")  # This python_path points to the Python to use on the Executors

In [1]:
injectPackageInSpark('spark-csv', 'com.databricks:spark-csv_2.10:1.5.0')
injectPackageInSpark('sparkxgboost', 'rotationsymmetry:sparkxgboost:0.2.1-s_2.10')
initPySpark()

print("Let's start a nice Spark Context to help us dealing with large dataset")

import pyspark
spark_master = os.environ['SPARK_MASTER']
conf = (pyspark.SparkConf()
            .setMaster("spark://" + spark_master + ":7077")
            .setAppName("bosch-challenge")
            .set("spark.executor.memory", "16g")
            .set("spark.cores.max", 100)
            .set("spark.broadcast.factory", "org.apache.spark.broadcast.HttpBroadcastFactory")
            .set("spark.driver.port", 7001)
            .set("spark.fileserver.port", 7002)
            .set("spark.broadcast.port", 7003)
            .set("spark.replClassServer.port", 7004)
            .set("spark.blockManager.port", 7005)
            .set("spark.executor.port", 7006))
try:
    # Do not use the "master" argument here since it will ignore the --total-executor-cores argument
    sc = pyspark.SparkContext(conf=conf)
except ValueError:
    # Ignore reinitialization errors
    pass
print("Spark version: " + sc.version)
print("Spark Application name: " + sc.appName)

assert "local[" not in sc.master, "PySpark running in local mode!"
print("PySpark is configured against the cluster: {}".format(sc.master))
print("Spark UI can be found at: http://" + spark_master + ":8080")

NameError: name 'injectPackageInSpark' is not defined

In [None]:
import humanize
def getFileSize(filepath):
    statinfo = os.stat(filepath)
    return humanize.naturalsize(statinfo.st_size)
print("Competition files on HDFS:\n" + 
      "\n".join("/sbrouil/bosch/{}: {}".format(f, getFileSize(os.path.join("/mnt/hdfs/sbrouil/bosch", f))) for f in os.listdir("/mnt/hdfs/sbrouil/bosch")))

Each file is pretty big, the complete training set is > 6Gb, and loading this on a single computer is really slow and will make pandas suffering too much. Let's use Spark to distribute this analysis accross serveral computer. It can also be used to distribute independent computation using the sklearn environment running on each "executor".

# Data Preparation

Data comes in CSV file stored on HDFS. Let's try to load them into a dataframe. If it is really slow, we will transform it into Parquet file to see the difference.

Let's load the data. We will use the data inference to extract numerical values as integer, but keep in mind this will require the Spark job to process ALL data. I don't know how to provide a schema, since the number of column depends on the number of features and may be different from one file to another (and this is the case, like we will see in a few cells).

In [None]:
sqlContext = pyspark.SQLContext(sc)

# Spark is connected to the Hadoop cluster so it can access to the CSV file directly
def get_df_from_csv(csvFile, hasHeader=False, customSchema=None, inferschema='true'):
    df = (sqlContext
            .read
            .format('com.databricks.spark.csv')
            .options(header=hasHeader, inferschema=inferschema))
    if customSchema:
        df = df.schema(customSchema)
    return df.load(csvFile)

In [None]:
train_categorical = get_df_from_csv("/sbrouil/bosch/train_categorical.csv", hasHeader=True)

In [None]:
train_date = get_df_from_csv("/sbrouil/bosch/train_date.csv", hasHeader=True)

In [None]:
train_numeric = get_df_from_csv("/sbrouil/bosch/train_numeric.csv", hasHeader=True)

Some 20s' for loading all the data!

Please note that Spark has found the structure of these column alone, and thanks the Scala implementation (nothing here happens on Python), it has parsed each column pretty faster to find the write data type (integer, string...).

It is so convinient to be able to swimg along the **entier** data set, transparently using several dozen of machines with plenty of memory in it. We can work seamlessly in Python by drive a distributed computation is a faster language (Scala/JVM). Doing it only with Pandas, so on our local machine, involve loading all data in memory and hoping for our numpy installation to use some parallelism. Parallel computing is not so easy to do in Python, but here it is really easy to do **and so fast**!

You can click on the following link to see the application run on the Spark cluster. Click on "Application Detail UI" and then on "Event Timeline" to get nice visualization of what the cluster is doing with the job!

In [None]:
print("http://" + spark_master + ":8080/app/?appId=" + sc.applicationId)

Let's now discover our data.

# Discovering the data

In [None]:
def extractStationInfo(name):
    line, station, feature = name.split("_")
    return line[1:], station[1:], feature[1:]

def fmtStationInfo(name):
    line, station, feature = extractStationInfo(name)
    return "line {}, station {}, and feature number {}".format(line, station, feature)

Let's see how each training set table looks like.

- The `train_date` should hold dates.
- The `train_categorical` should hold categories.
- The `train_numeric` should hold numeric values and is told to hold the "Response"

In [None]:
train_numeric.printSchema()

In [None]:
train_categorical.printSchema()

In [None]:
train_date.printSchema()

We see there is an "Id" in each table, and in `train_numeric`, there is the `Response` column we need to predict in the test set.

How many features appart of Id and Response?

In [None]:
first_row_train_numeric = train_numeric.drop("Id").drop("Response").head(1)[0]
first_row_train_categorical = train_categorical.drop("Id").head(1)[0]
first_row_train_date = train_date.drop("Id").head(1)[0]

In [None]:
nb_numeric = len(first_row_train_numeric)
nb_categorical = len(first_row_train_categorical)
nb_date = len(first_row_train_date)

print("Number of columns in 'train_numeric': {}".format(nb_numeric))
print("Number of columns in 'train_categorical': {}".format(nb_categorical))
print("Number of columns in 'train_date': {}".format(nb_date))
print("Total number of features (out of Id and Response): {}".format(nb_numeric + nb_categorical))

Ok, let's look at the first rows of the 'date' dataframe.

In [None]:
import pandas
from pyspark.sql.functions import *

train_date.select("*").limit(10).toPandas()

There are lot of features (`Lx_Sx_Dx`), and lot of missing information (`Nan`). Note that all the columns are NOT displayed on this Pandas table.

What about 'numeric' dataframe?

In [None]:
train_numeric.select("*").limit(10).toPandas()

We see there are some `None` and `NaN` values. [According to this page](https://www.kaggle.com/c/bosch-production-line-performance/forums/t/22909/expeditive-exploration-models-on-data), there could be a reason behind this.

Let's look at categorical.

In [None]:
train_categorical.select("*").limit(10).toPandas()

Weird, there is only empty data? Let's take a random column

In [None]:
train_categorical.select("L0_S1_F31").where(isnan("L0_S1_F31")).limit(2).toPandas()

Nothing.

First, we see that the features does not exactly match. First example on `date` we have `L0_S0_D1` while on `numeric` we have `L0_S0_F0` and in `categorical` we have `L0_S1_F25`.

According to the data description, the features are not so easy to deal with. Let's first reorganize features.

# Features Organization

Along these 3 training tables, feature are related but does not 1-1 match. 

Let look at the [data description](https://www.kaggle.com/c/bosch-production-line-performance/data).

> The dataset contains an extremely large number of anonymized features. Features are named according to a convention that tells you the production line, the station on the line, and a feature number. E.g. L3_S36_F3939 is a feature measured on line 3, station 36, and is feature number 3939.

> On account of the large size of the dataset, we have separated the files by the type of feature they contain: numerical, categorical, and finally, a file with date features. The date features provide a timestamp for when each measurement was taken. Each date column ends in a number that corresponds to the previous feature number. E.g. the value of L0_S0_D1 is the time at which L0_S0_F0 was taken.

Features are referenced by column label. Date timestamp is indicated by the `LX_SX_DXX` that follows the `LX_SX_FXX` feature For example, numerical feature `L0_S1_F24` and categorical feature `L0_S1_F25` and is likely timestamped by `L0_S1_D26`.

In [None]:
def organize(features):
    line_features = {}
    station_features = {}
    lines = set([f.split('_')[0] for f in features])
    stations = set([f.split('_')[1] for f in features])
    
    for l in lines:
        line_features[l] = [f for f in features if l+'_' in f]
        
    for s in stations:
        station_features[s] = [f for f in features if s+'_' in f]
        
            
    return line_features, station_features

line_features, station_features = orgainize(features)

print("Features in Station 32: {}".format( station_features['S32'] ))

How, we have an 'Id' column that looks like and idea, let's join all tables!

In [None]:
station_list  = sorted([cell for cell in first_row_train_categorical[1:]])
print("Station list for 'train_categorical':\n" + "\n".join("{}: {}".format(c, fmtStationInfo(c)) for c in station_list))

In [None]:
station_list  = sorted(set([cell for cell in first_row_train_date][1:]]))
print("Station list for 'train_date':\n" + "\n".join(str(c) for c in station_list))

In [None]:
station_list  = sorted(set([cell for cell in first_row_train_numeric[1:]]))
print("Station list for 'train_numeric':\n" + "\n".join(str(c) for c in station_list))

In [None]:
#train_categorical.write.parquet("/gsemet/bosch/train_categorical.parquet")

In [None]:
#train_categorical = sc.read.parquet("/gsemet/bosch/train_categorical.parquet")

In [None]:
train_categorical.show()  