In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
#from pyspark.sql.functions import udf
from pyspark.sql.types import *

from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.rdd import RDD
from pyspark.files import SparkFiles
from pyspark.storagelevel import StorageLevel
from pyspark.accumulators import Accumulator, AccumulatorParam
from pyspark.broadcast import Broadcast
from pyspark.serializers import MarshalSerializer, PickleSerializer
from pyspark.status import *
from pyspark.profiler import Profiler, BasicProfiler

from pyspark.sql import HiveContext
from pyspark.sql import Window
from pyspark.sql.functions import rank, min

In [2]:
from datetime import datetime as dt
from datetime import timedelta as td
from dateutil import rrule
from datetime import date

In [3]:
import matplotlib.pylab as plt
# for notebook:
%matplotlib inline

In [4]:
import os
import numpy as np
import pandas as pd
from sklearn.cross_validation import train_test_split

from sklearn.ensemble import GradientBoostingClassifier  #GBM algorithm

from sklearn import cross_validation, metrics   #Additional scklearn functions
from sklearn.grid_search import GridSearchCV   #Perforing grid search


In [5]:
def load_data(path, file_name, nrows=None, verbose=True):
    """
    convenience func for printing
    side effects
    :param path:
    :param file_name:
    :param nrows:
    :return:
            (pandas.dataframe)
    """
    data_path = os.path.join(path, file_name)
    if verbose:
        print('\n#################################')
        print('Loading data from {0}...'.format(data_path))
    data = pd.read_csv(data_path, nrows=nrows)
    if verbose:
        print('Dataset num rows: {0}, num cols: {1}'
              .format(data.shape[0],data.shape[1]))
        print('Columns: {}'.format(list(data.columns.values)))
        print('Head: ')
        print(data.head())
    return data

In [10]:
DATA = '/mnt/tests'

train_path = os.path.join(DATA, 'train.csv')
test_path = os.path.join(DATA, 'test.csv')
client_data_path = os.path.join(DATA, 'cliente_tabla.csv')
product_data_path = os.path.join(DATA, 'producto_tabla.csv')
town_state_path = os.path.join(DATA, 'town_state.csv')

print('Loading data..')
df_train = load_data(path=DATA, file_name='train.csv', nrows=10**3)
df_client = load_data(path=DATA, file_name='cliente_tabla.csv')
df_prod = load_data(path=DATA, file_name='producto_tabla.csv')
df_town = load_data(path=DATA, file_name='town_state.csv')
df_test = load_data(path=DATA, file_name='test.csv')
ids = df_test['id']
df_test = df_test.drop(['id'], axis=1)

target = 'Demanda_uni_equil'
indep_vars = list(df_test.columns.values)
y = df_train[target]
X = df_train[indep_vars]

Loading data..

#################################
Loading data from /mnt/tests/train.csv...
Dataset num rows: 1000, num cols: 11
Columns: ['Semana', 'Agencia_ID', 'Canal_ID', 'Ruta_SAK', 'Cliente_ID', 'Producto_ID', 'Venta_uni_hoy', 'Venta_hoy', 'Dev_uni_proxima', 'Dev_proxima', 'Demanda_uni_equil']
Head: 
   Semana  Agencia_ID  Canal_ID  Ruta_SAK  Cliente_ID  Producto_ID  \
0       3        1110         7      3301       15766         1212   
1       3        1110         7      3301       15766         1216   
2       3        1110         7      3301       15766         1238   
3       3        1110         7      3301       15766         1240   
4       3        1110         7      3301       15766         1242   

   Venta_uni_hoy  Venta_hoy  Dev_uni_proxima  Dev_proxima  Demanda_uni_equil  
0              3      25.14                0          0.0                  3  
1              4      33.52                0          0.0                  4  
2              4      39.32       

In [23]:
def train_gbm(X_train,  y_train, grid_search=False, verbose=True,
              min_samples_split=None, min_samples_leaf=50, max_depth=8,
              max_features='sqrt', sub_sample=0.8, n_estimators=100,
              learning_rate=0.1, random_state=10, param_grid=None,
              ):
    """

    :param X_train:
    :param indep_vars:
    :param dep_var:
    :param verbose:
    :param min_samples_leaf: (int) prevent overfitting, intuition based value..
    :param max_depth: (int) 8 # 5 -8, based on number of features and dataset size
    :param max_features: (str) 'sqrt' # general rule of thumb: sqrt(n_samples)
    :param sub_sample: (float) fraction of observations to be selected for each tree (0.8 commonly used value)
    :param n_estimators: (int) number of sequential trees to be modeled
    :param learning_rate: (float)
    :param random_state: (int)
    :param param_grid: (dict)
    :return:
            (model)
    """
    n_samples = X_train.shape[0]
    if not min_samples_split:
        min_samples_split = n_samples * .01 # prevent overfitting, general rule of thumb: 0.5 - 1%

    gbm = GradientBoostingClassifier(
        min_samples_split=min_samples_split, min_samples_leaf=min_samples_leaf,
        max_depth=max_depth, max_features=max_features, learning_rate=learning_rate,
        n_estimators=n_estimators, subsample=sub_sample, random_state=random_state)
    model = gbm
    if grid_search:
        if not param_grid:
            #param_grid = {'n_estimators':range(20,81,10)}
            param_grid = [{'n_estimators':range(20,81,10)}]
        model = GridSearchCV(
            estimator=gbm,
            param_grid=param_grid,
            scoring='roc_auc',
            n_jobs=4,
            iid=False,
            cv=5)
    if verbose:
        print('Starting to train model...')
    model.fit(X_train, y_train)
    if grid_search and verbose:
        print('Model Grid scores: {0}, best params: {1}, best score: {2}'
              .format(model.grid_scores_, model.best_params_, model.best_score_))
    if verbose:
        print('Finished training.')
    return model

In [24]:
def gbm_predict(X_train, y_train, X_test, y_test, indep_vars, grid_search=True,
                cv=True, verbose=True, cv_folds=5, scoring='roc_auc',
                submit=False, version='001', y_label='Demanda_uni_equil',
                ids=None):

    # Train
    model = train_gbm(X_train,  y_train, grid_search=grid_search, verbose=verbose)
    # predict on train
    if verbose:
        print('Starting predictions ...')
    test_pred = model.predict(X_test)

    if scoring=='roc_auc':
        # probability
        test_pred_prob = model.predict_proba(X_test)[:, 1]

    if cv:
        cv_score = cross_validation.cross_val_score(model, X_test,
                                                    y_test, cv=cv_folds,
                                                    scoring=scoring)
    if verbose and not submit:
        print("\nModel Report")
        if scoring=='roc_auc':
            print("Accuracy : %.4g" % metrics.accuracy_score(X_test.values, test_pred))
            print("AUC Score (Train): %f" % metrics.roc_auc_score(X_test, test_pred_prob))
            if cv:
                print("CV Score : Mean - %.7g | Std - %.7g | Min - %.7g | Max - %.7g"
                      % (np.mean(cv_score),np.std(cv_score),np.min(cv_score),np.max(cv_score)))
        # feature relevance
        predictive_relavance = pd.Series(model.feature_importances_, indep_vars).sort_values(ascending=False)
        predictive_relavance.plot(kind='bar', title='Feature relevance')
        plt.ylabel('Feature relevance score')

    if submit:
        submission = pd.DataFrame({'id':ids, y_label: test_pred})
        submission.to_csv('submission_gbm_{}.csv'.format(version), index=False)
    return model

In [25]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=123)

# w/o grid search
# gbm_10 = gbm_predict(X_train=X_train, y_train=y_train, X_test=X_test, y_test=np.array(y_test),
#                     indep_vars=indep_vars, grid_search=False, cv=True, verbose=True,
#                     cv_folds=5, scoring='mean_squared_error', y_label=target)
print('\n\n----------------------')
print('Finally, loading Kaggle test set to perform predictions...')
gbm_11 = gbm_predict(X_train=X_train, y_train=y_train, X_test=df_test, y_test=np.array(y_test),
                    indep_vars=indep_vars, grid_search=True, cv=True, verbose=True,
                    cv_folds=5, scoring='mean_squared_error', submit=True, ids=ids,
                    version='001', y_label=target)



----------------------
Finally, loading Kaggle test set to perform predictions...


ValueError: Parameter values should be a list.

# Loading the big guns - enter spark

In [None]:
#initialize a configuration object ..
MY_NAME='diogo' # change this
conf = SparkConf().setMaster("yarn-client").setAppName("bimbo - {}".format(MY_NAME)) \
.set("spark.driver.memory", "2g").set("spark.executor.memory", "2g") \
.set("spark.executor.instances", "2").set("spark.dynamicAllocation.enabled", "true")

In [6]:
# .. and pass it to a new SparkContext 
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
#initialize hiveContext
#sqlContext = HiveContext(sc)

In [7]:
load_from_s3=False
if load_from_s3:
    s3_train = 's3n://bonial-jobs-stage/notebooks/training/grupo_bimbo/train.csv'
    s3_test = 's3n://bonial-jobs-stage/notebooks/training/grupo_bimbo/test.csv'
    s3_client = 's3n://bonial-jobs-stage/notebooks/training/grupo_bimbo/cliente_tabla.csv'
    s3_prod = 's3n://bonial-jobs-stage/notebooks/training/grupo_bimbo/producto_tabla.csv'
    s3_town = 's3n://bonial-jobs-stage/notebooks/training/grupo_bimbo/town_state.csv'
else:
    s3_train = 'mnt/tests/train.csv'
    s3_test = 'mnt/tests/test.csv'
    s3_client = 'mnt/tests/cliente_tabla.csv'
    s3_prod = 'mnt/tests/producto_tabla.csv'
    s3_town = 'mnt/tests/town_state.csv'

In [13]:
print('Loading datasets from hdfs')
print('Loading train set...')
df_train = sqlContext.read \
    .format("com.databricks.spark.csv") \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .load(s3_train)
#df_train.select("*").write.save("{}.parquet".format("train_bimbo"), format="parquet")
print('finished loading')
df_test.printSchema()

Loading datasets from hdfs
Loading train set...
finished loading
root
 |-- id: string (nullable = true)
 |-- Semana: string (nullable = true)
 |-- Agencia_ID: string (nullable = true)
 |-- Canal_ID: string (nullable = true)
 |-- Ruta_SAK: string (nullable = true)
 |-- Cliente_ID: string (nullable = true)
 |-- Producto_ID: string (nullable = true)



In [8]:
print('Loading test set...')
df_test = sqlContext.read \
    .format("com.databricks.spark.csv") \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .load(s3_test)
#df_test.select("*").write.save("{}.parquet".format("test_bimbo"), format="parquet")
print('finished loading')
df_test.printSchema()

Loading test set
finished loading
root
 |-- id: string (nullable = true)
 |-- Semana: string (nullable = true)
 |-- Agencia_ID: string (nullable = true)
 |-- Canal_ID: string (nullable = true)
 |-- Ruta_SAK: string (nullable = true)
 |-- Cliente_ID: string (nullable = true)
 |-- Producto_ID: string (nullable = true)



In [9]:
print('Loading client set...')
df_client = sqlContext.read \
    .format("com.databricks.spark.csv") \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .load(s3_client)
#df_client.select("*").write.save("{}.parquet".format("client_bimbo"), format="parquet")
print('finished loading')
df_client.printSchema()

Loading client set...
finished loading
root
 |-- Cliente_ID: string (nullable = true)
 |-- NombreCliente: string (nullable = true)



In [10]:
df_prod = sqlContext.read \
    .format("com.databricks.spark.csv") \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .load(s3_prod)   
#df_prod.select("*").write.save("{}.parquet".format("prod_bimbo"), format="parquet")
print('finished loading')
df_prod.printSchema()

finished loading
root
 |-- Producto_ID: string (nullable = true)
 |-- NombreProducto: string (nullable = true)



In [11]:
df_town = sqlContext.read \
    .format("com.databricks.spark.csv") \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .load(s3_town)
#df_town.select("*").write.save("{}.parquet".format("town_bimbo"), format="parquet")
print('finished loading')
df_town.printSchema()

finished loading
root
 |-- Agencia_ID: string (nullable = true)
 |-- Town: string (nullable = true)
 |-- State: string (nullable = true)



In [12]:
#df_train = sqlContext.read.parquet('train_bimbo')
#df_test = sqlContext.read.parquet('test_bimbo')
#df_client = sqlContext.read.parquet('client_bimbo')
#df_prod = sqlContext.read.parquet('prod_bimbo')
#df_town = sqlContext.read.parquet('town_bimbo')

In [15]:
print('Showing head')
df_train.printSchema()
df_train.show(6)

Showing head
root
 |-- Semana: string (nullable = true)
 |-- Agencia_ID: string (nullable = true)
 |-- Canal_ID: string (nullable = true)
 |-- Ruta_SAK: string (nullable = true)
 |-- Cliente_ID: string (nullable = true)
 |-- Producto_ID: string (nullable = true)
 |-- Venta_uni_hoy: string (nullable = true)
 |-- Venta_hoy: string (nullable = true)
 |-- Dev_uni_proxima: string (nullable = true)
 |-- Dev_proxima: string (nullable = true)
 |-- Demanda_uni_equil: string (nullable = true)



KeyboardInterrupt: 

KeyboardInterrupt: 