# Kaggle Bosch Production Line Challenge Model
## Summary
This is a machine learning model, built on a [Spark](https://spark.apache.org) framework, to attempt to solve the [Bosch Production Line Performance Challenge](https://www.kaggle.com/c/bosch-production-line-performance) on [Kaggle](https://www.kaggle.com).  This project was begun by Thomas Hughes on November 24, 2016, after the competition was completed.  It should be considered a test of effectiveness of technology platforms.

## Notes on Execution
Since this Notebook is designed to run with Spark, it must be running with the PySpark interpreter.  This can be done mostly automatically if you launch the notebook using the script 'pyspark-notebook' that is available in the github repository along with the notebook.  PySpark will need to be installed and properly configured, and you may need to update the script to your local copy of PySpark.

In [1]:
# Load File Locations, using Kaggle specifications
import json
from pyspark.sql import SQLContext

print "Loading settings..."
with open('SETTINGS.json') as settings_file:
    settings = json.load(settings_file)

print "Loaded!"

# Source directory for your data
source_dir = settings['source_dir']

# sc is the SparkContext provided by the pyspark interpreter.  That's why you don't see it initialized here.
sqlContext = SQLContext(sc)

Loading settings...
Loaded!


## Data Wrangling

## Import Bosch Data

In [16]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder

def data_import(t):
    """ Helper function that loads the data files from the disk and does some pre-processing before they can be sent
    to the pipeline.  Would be ideal if this could be done in the pipeline, but several of these functions only work
    a single column at a time."""
    
    if t == 'train':
        numeric_file = settings['train_numeric_file']
        categorical_file = settings['train_categorical_file']
    elif t == 'test':
        numeric_file = settings['test_numeric_file']
        categorical_file = settings['test_categorical_file']
    else:
        return 'Error, data_import can only take strings "train" or "test"'
    
    # Import Bosch training numeric data
    print "Loading numeric data..."
    source_numeric = source_dir + numeric_file
    numeric = sqlContext.read.csv(source_numeric, header = "true", inferSchema = "true")
    
    print "Filling missing values..."
    # Fill remaining missing values with 0.
    numeric = numeric.na.fill(0)
    
    print "Loading categorical data..."
    # Now the categorical data
    source_categorical = source_dir + categorical_file
    categorical = sqlContext.read.csv(source_categorical, header="true", inferSchema="true")

    # Sample, for preliminary testing
    if t == 'train':
        print 'Selecting sample data...'
        categorical = categorical.sample(withReplacement = False, fraction=0.01, seed=42)    
    
    print "Dropping missing values..."
    # Drop columns that contain no data
    drop_list = ['L0_S3_F69', 'L0_S3_F71', 'L0_S3_F73', 'L0_S3_F75', 'L0_S3_F77', 'L0_S3_F79', 'L0_S3_F81', 
                 'L0_S3_F83', 'L0_S3_F85', 'L0_S3_F87', 'L0_S3_F89', 'L0_S3_F91', 'L0_S3_F93', 'L0_S3_F95', 
                 'L0_S3_F97', 'L0_S3_F99', 'L0_S3_F101', 'L0_S3_F103', 'L0_S18_F436', 'L0_S18_F438', 'L0_S18_F440', 
                 'L0_S18_F442', 'L0_S18_F443', 'L0_S18_F445', 'L0_S18_F446', 'L0_S18_F448', 'L0_S18_F450', 
                 'L0_S18_F452', 'L0_S23_F616', 'L0_S23_F618', 'L0_S23_F620', 'L0_S23_F622', 'L0_S23_F624', 
                 'L0_S23_F626', 'L0_S23_F628', 'L0_S23_F630', 'L0_S23_F632', 'L0_S23_F634', 'L0_S23_F636', 
                 'L0_S23_F638', 'L0_S23_F640', 'L0_S23_F642', 'L0_S23_F644', 'L0_S23_F646', 'L0_S23_F648', 
                 'L0_S23_F650', 'L0_S23_F652', 'L0_S23_F654', 'L0_S23_F656', 'L0_S23_F658', 'L0_S23_F660', 
                 'L0_S23_F662', 'L0_S23_F664', 'L0_S23_F666', 'L0_S23_F668', 'L0_S23_F670', 'L0_S23_F672', 
                 'L0_S23_F674', 'L1_S24_F676', 'L1_S24_F678', 'L1_S24_F680', 'L1_S24_F682', 'L1_S24_F684', 
                 'L1_S24_F686', 'L1_S24_F688', 'L1_S24_F690', 'L1_S24_F692', 'L1_S24_F694', 'L1_S24_F1157', 
                 'L1_S24_F1159', 'L1_S24_F1160', 'L1_S24_F1167', 'L1_S24_F1169', 'L1_S24_F1177', 'L1_S24_F1179', 
                 'L1_S24_F1181', 'L1_S24_F1183', 'L1_S24_F1561', 'L1_S24_F1563', 'L1_S24_F1564', 'L1_S24_F1673', 
                 'L1_S24_F1676', 'L1_S24_F1677', 'L1_S24_F1680', 'L1_S24_F1681', 'L1_S24_F1684', 'L1_S24_F1686', 
                 'L1_S24_F1689', 'L1_S24_F1691', 'L1_S24_F1694', 'L1_S24_F1696', 'L1_S24_F1699', 'L1_S24_F1701', 
                 'L1_S24_F1704', 'L1_S24_F1705', 'L1_S24_F1708', 'L1_S24_F1709', 'L1_S24_F1712', 'L1_S24_F1714', 
                 'L1_S24_F1717', 'L1_S24_F1719', 'L1_S24_F1722', 'L1_S24_F1724', 'L1_S24_F1727', 'L1_S24_F1729', 
                 'L1_S24_F1732', 'L1_S24_F1734', 'L1_S24_F1737', 'L1_S24_F1739', 'L1_S24_F1742', 'L1_S24_F1744', 
                 'L1_S24_F1747', 'L1_S24_F1749', 'L1_S24_F1752', 'L1_S24_F1754', 'L1_S24_F1757', 'L1_S24_F1759', 
                 'L1_S24_F1762', 'L1_S25_F1853', 'L1_S25_F1856', 'L1_S25_F1859', 'L1_S25_F1861', 'L1_S25_F1863', 
                 'L1_S25_F2956', 'L1_S25_F2959', 'L1_S25_F2961', 'L1_S25_F2964', 'L1_S25_F2966', 'L1_S25_F2969', 
                 'L1_S25_F2971', 'L1_S25_F2974', 'L1_S25_F2976', 'L1_S25_F2979', 'L1_S25_F2981', 'L1_S25_F2984', 
                 'L1_S25_F2986', 'L1_S25_F2989', 'L1_S25_F2991', 'L1_S25_F2994', 'L3_S46_F4136', 'L3_S46_F4137', 
                 'L3_S47_F4139', 'L3_S47_F4142', 'L3_S47_F4144', 'L3_S47_F4147', 'L3_S47_F4149', 'L3_S47_F4152', 
                 'L3_S47_F4154', 'L3_S47_F4157', 'L3_S47_F4159', 'L3_S47_F4162', 'L3_S47_F4164', 'L3_S47_F4167', 
                 'L3_S47_F4169', 'L3_S47_F4172', 'L3_S47_F4174', 'L3_S47_F4177', 'L3_S47_F4179', 'L3_S47_F4182', 
                 'L3_S47_F4184', 'L3_S47_F4187', 'L3_S47_F4189', 'L3_S47_F4192']
    
    
    good_columns = [x for x in categorical.columns if x not in drop_list]
    
    categorical = categorical.select(good_columns)
    
    # Fill remaining missing values with 'none' category; string needed for transformers
    print "Filling remaining missing values..."
    categorical = categorical.na.fill('None')
    
    # This totally feels messy, but there does not seem to be a way to convert multiple categorical 
    #strings into one hots from the ml pipeline framework.  Hopefully that is corrected in the future.
    print "One Hot encoding categorical data... (patience, this takes a while)"
    ignore = ['Id', 'Response']
    categorical_columns = [x for x in categorical.columns if x not in ignore]
   
    indexed_df = categorical
    encoded_df = categorical
    
    drop_index = []
    
    # This goes through all the remaining categorical columns, converts them to indexes, then one-hot vectors
    for col in categorical_columns:
        indexer = StringIndexer(inputCol=col, outputCol=(col+"_indexed")).fit(encoded_df)
        indexed_df = indexer.transform(encoded_df)
     #   encoded_df = OneHotEncoder(inputCol=(col+"_indexed"), outputCol=(col+"_vector")).transform(indexed_df)
        drop_index.append(col)
    
    vectored_columns = [x for x in indexer_df.columns if x not in drop_index]
    categorical = index_df.select(vectored_columns)
    
    print "Joining numeric and categorical data..."
    # Combine the numeric with the categorical dataframe, right_outer works for sampling
    df = numeric.join(categorical, on='Id', how='right_outer')
    
    print "Data import complete!"
    
    return df    

In [None]:
print "Load training data..."
train = data_import("train")

Load training data...
Loading numeric data...
Filling missing values...
Loading categorical data...
Selecting sample data...
Dropping missing values...
Filling remaining missing values...
One Hot encoding categorical data... (patience, this takes a while)


In [4]:
# We need to vectorize our features for MLLib
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

print "Building VectorAssembler..."
# Only vectorize the non-ID and non-Response columns
ignore = ['Id', 'Response']
train_columns = [x for x in train.columns if x not in ignore]

assembler = VectorAssembler(
    inputCols = train_columns,
    outputCol = 'features')

Building VectorAssembler...


NameError: name 'train' is not defined

## Model Generation

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier

# Train a GBT model.
print "Building GBTClassifier..."
gbt = GBTClassifier(labelCol = "Response", featuresCol = "features", maxIter = 10, maxDepth = 10, 
                    maxMemoryInMB = 1024, maxBins = 64)

print "Building Pipeline..."
# Chain indexers and GBT in a Pipeline
pipeline = Pipeline(stages = [assembler, gbt])

print "Fitting data to model..."
# Train model.  This also runs the indexers.
model = pipeline.fit(train)

print "Model generated!"

In [None]:
# Serialize the Model
print "Saving model to disk..."

model_source = source_dir + settings['model_file']
model.save(model_source)

print "Model saved!"

## Model Performance
### Load Test Data

In [None]:
print "Load test data..."
test = load_data('test')

### Load Saved Model

In [None]:
from pyspark.ml import PipelineModel

# Load serialized model
print "Load model..."

model = PipelineModel([]).load(source_dir + settings['model_file'])

print "Model loaded!"

### Generate Test Predictions

In [None]:
# Make predictions.
print "Making predictions for test data..."

preds = model.transform(test)

print "Predictions complete!"

### Format and Export Kaggle Submission

In [None]:
import pandas as pd
import numpy as np

# Collect the prediction from Spark
print "Formatting and saving Kaggle submission..."
predsGBT = preds.select("prediction").rdd.map(lambda r: r[0]).collect()

# Format to Kaggle Format
sub = pd.read_csv(source_dir + settings['sample_submission_file'])
sub['Response'] = np.asarray(predsGBT).astype(int)
sub.to_csv(source_dir + settings['final_submission_file'], index = False)

print "Submission complete!"

## Submission History

* Submission 1: -		Thomas M Hughes	0.13591	-	Sun, 27 Nov 2016 23:03:33 (GBT)
* Submission 2: -		Thomas M Hughes	0.13591	-	Mon, 28 Nov 2016 00:06:04 (GBT w/ Standard Scaler)
* Submission 3: -		Thomas M Hughes	0.15070	-	Mon, 28 Nov 2016 01:27:14 (GBT w/ maxDepth=10, maxBins=64)

## Obsolete Code

In [None]:
# Import Bosch training numeric data
source_numeric = source_dir + settings['train_numeric_file']
train_numeric = sqlContext.read.csv(source_numeric, header = "true", inferSchema = "true")

# Fill missing values with 0.
#train_numeric = train_numeric.na.fill(0)

In [None]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder

# Now the categorical data
source_categorical = source_dir + settings['train_categorical_file']
train_categorical = sqlContext.read.csv(source_categorical, header="true", inferSchema="true")

In [None]:
ignore = ['Id', 'Response']
numeric_columns = [x for x in train_numeric.columns if x not in ignore]

drop_list = []

for col in numeric_columns:
    if train_numeric.filter(train_numeric[col] != None).count() == 0:
        drop_list.append(col)
        print col

            
print drop_list
           


In [None]:
# The following columns have no variation, and thus contain no useful information
drop_list = ['L0_S3_F69', 'L0_S3_F71', 'L0_S3_F73', 'L0_S3_F75', 'L0_S3_F77', 'L0_S3_F79', 'L0_S3_F81', 
             'L0_S3_F83', 'L0_S3_F85', 'L0_S3_F87', 'L0_S3_F89', 'L0_S3_F91', 'L0_S3_F93', 'L0_S3_F95', 
             'L0_S3_F97', 'L0_S3_F99', 'L0_S3_F101', 'L0_S3_F103', 'L0_S18_F436', 'L0_S18_F438', 'L0_S18_F440', 
             'L0_S18_F442', 'L0_S18_F443', 'L0_S18_F445', 'L0_S18_F446', 'L0_S18_F448', 'L0_S18_F450', 
             'L0_S18_F452', 'L0_S23_F616', 'L0_S23_F618', 'L0_S23_F620', 'L0_S23_F622', 'L0_S23_F624', 
             'L0_S23_F626', 'L0_S23_F628', 'L0_S23_F630', 'L0_S23_F632', 'L0_S23_F634', 'L0_S23_F636', 
             'L0_S23_F638', 'L0_S23_F640', 'L0_S23_F642', 'L0_S23_F644', 'L0_S23_F646', 'L0_S23_F648', 
             'L0_S23_F650', 'L0_S23_F652', 'L0_S23_F654', 'L0_S23_F656', 'L0_S23_F658', 'L0_S23_F660', 
             'L0_S23_F662', 'L0_S23_F664', 'L0_S23_F666', 'L0_S23_F668', 'L0_S23_F670', 'L0_S23_F672', 
             'L0_S23_F674', 'L1_S24_F676', 'L1_S24_F678', 'L1_S24_F680', 'L1_S24_F682', 'L1_S24_F684', 
             'L1_S24_F686', 'L1_S24_F688', 'L1_S24_F690', 'L1_S24_F692', 'L1_S24_F694', 'L1_S24_F1157', 
             'L1_S24_F1159', 'L1_S24_F1160', 'L1_S24_F1167', 'L1_S24_F1169', 'L1_S24_F1177', 'L1_S24_F1179', 
             'L1_S24_F1181', 'L1_S24_F1183', 'L1_S24_F1561', 'L1_S24_F1563', 'L1_S24_F1564', 'L1_S24_F1673', 
             'L1_S24_F1676', 'L1_S24_F1677', 'L1_S24_F1680', 'L1_S24_F1681', 'L1_S24_F1684', 'L1_S24_F1686', 
             'L1_S24_F1689', 'L1_S24_F1691', 'L1_S24_F1694', 'L1_S24_F1696', 'L1_S24_F1699', 'L1_S24_F1701', 
             'L1_S24_F1704', 'L1_S24_F1705', 'L1_S24_F1708', 'L1_S24_F1709', 'L1_S24_F1712', 'L1_S24_F1714', 
             'L1_S24_F1717', 'L1_S24_F1719', 'L1_S24_F1722', 'L1_S24_F1724', 'L1_S24_F1727', 'L1_S24_F1729', 
             'L1_S24_F1732', 'L1_S24_F1734', 'L1_S24_F1737', 'L1_S24_F1739', 'L1_S24_F1742', 'L1_S24_F1744', 
             'L1_S24_F1747', 'L1_S24_F1749', 'L1_S24_F1752', 'L1_S24_F1754', 'L1_S24_F1757', 'L1_S24_F1759', 
             'L1_S24_F1762', 'L1_S25_F1853', 'L1_S25_F1856', 'L1_S25_F1859', 'L1_S25_F1861', 'L1_S25_F1863', 
             'L1_S25_F2956', 'L1_S25_F2959', 'L1_S25_F2961', 'L1_S25_F2964', 'L1_S25_F2966', 'L1_S25_F2969', 
             'L1_S25_F2971', 'L1_S25_F2974', 'L1_S25_F2976', 'L1_S25_F2979', 'L1_S25_F2981', 'L1_S25_F2984', 
             'L1_S25_F2986', 'L1_S25_F2989', 'L1_S25_F2991', 'L1_S25_F2994', 'L3_S46_F4136', 'L3_S46_F4137', 
             'L3_S47_F4139', 'L3_S47_F4142', 'L3_S47_F4144', 'L3_S47_F4147', 'L3_S47_F4149', 'L3_S47_F4152', 
             'L3_S47_F4154', 'L3_S47_F4157', 'L3_S47_F4159', 'L3_S47_F4162', 'L3_S47_F4164', 'L3_S47_F4167', 
             'L3_S47_F4169', 'L3_S47_F4172', 'L3_S47_F4174', 'L3_S47_F4177', 'L3_S47_F4179', 'L3_S47_F4182', 
             'L3_S47_F4184', 'L3_S47_F4187', 'L3_S47_F4189', 'L3_S47_F4192']


for drop in drop_list:
    train_categorical = train_categorical.drop(drop)

train_categorical = train_categorical.na.fill('None')

# This totally feels messy, but there does not seem to be a way to convert multiple categorical strings into one hots
# from the ml pipeline framework.  Hopefully that is corrected in the future.
ignore = ['Id', 'Response']
categorical_columns = [x for x in train_categorical.columns if x not in ignore]

indexed_df = train_categorical
encoded_df = train_categorical

for col in categorical_columns:
    indexer = StringIndexer(inputCol=col, outputCol=(col+"_indexed")).fit(encoded_df)
    indexed_df = indexer.transform(encoded_df)
    encoded_df = OneHotEncoder(inputCol=(col+"_indexed"), outputCol=(col+"_vector")).transform(indexed_df)
    encoded_df = encoded_df.drop(col).drop(col+"_indexed")

In [None]:
encoded_df.first().show()

In [None]:
# Load just like before
source_test = source_dir + settings['test_numeric_file']
data_test = sqlContext.read.csv(source_test, header = "true", inferSchema = "true")

# And set null data to zero
data_test = data_test.na.fill(0)