In [None]:
%%capture
%config InlineBackend.figure_format = 'retina'
import numpy as np
import os
import pandas as pd
import requests
import sys
import warnings
import zipfile
from joblib import Parallel, delayed
from pandas import CategoricalDtype
import sklearn
from sklearn import datasets, impute, neighbors, preprocessing, svm, compose, linear_model, neural_network, tree
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.decomposition import PCA
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import f1_score
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.preprocessing import StandardScaler

# Pip install and import gspread and df2gspread, as they're used to upload to Google Sheets
warnings.filterwarnings('ignore')
!{sys.executable} -m pip install gspread
!{sys.executable} -m pip install numpy df2gspread
import gspread
from df2gspread import df2gspread as d2g
from oauth2client.service_account import ServiceAccountCredentials

# Prologue

## Estimators 

These estimators are inspired by the estimators found in this article:  https://towardsdatascience.com/logistic-regression-classifier-on-census-income-data-e1dbef0b5738

In [None]:
class Converter(BaseEstimator, TransformerMixin):
    
    def __init__(self, categories):
        self.categories = categories
    
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        X_copy = X.copy()
        for column in self.categories:
            X_copy[column] = X_copy[column].astype('object')
        return pd.DataFrame(X_copy)

class ColumnsSelector(BaseEstimator, TransformerMixin):
    
    def __init__(self, types):
        self.type = types
        
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        return X.select_dtypes(include=self.type)


class CategoricalImputer(BaseEstimator, TransformerMixin):
    
    def __init__(self, strategy='most_frequent', columns=None):
        self.strategy = strategy
        self.columns = None
    
    def fit(self, X, y=None):
        if self.columns is None:
            self.columns = X.columns
        
        if self.strategy is 'most_frequent':
            # X[column].value_counts() returns pandas dataseries with index equal to the 
            # elements that are counted, and columns equal to the columns in X. The values are 
            # the count for each index. 
            # index[0] returns the first index, which is the index with the highest count
            self.fill = {column:X[column].value_counts().index[0] for column in self.columns}
            
        else:
            {column:0 for column in self.columns}  # If there is another strategy just replace missing values with 0.
    
        return self
    
    def transform(self, X):
        X_copy = X.copy()
        for column in self.columns:
            X_copy[column] = X_copy[column].fillna(self.fill[column])
        return X_copy
    
class CategoricalEncoder(BaseEstimator, TransformerMixin):
    
    def __init__(self, drop_first=True): # Set drop first in order to deal with multicollinearity
        self.drop_first = drop_first
        self.categories = dict()  # Categories to encode
    
    def fit(self, X, y=None):  # important that X is the entire data set, so we find all categories
        X = X.select_dtypes(include=['object'])
        for column in X.columns:
            self.categories[column] = X[column].value_counts().index.tolist()
        return self
    
    def transform(self, X):
        X_copy = X.copy()
        X_copy = X_copy.select_dtypes(include=['object'])
        for column in X_copy.columns:
            #"Add" all possible attributes to the column such that every category is included in the encoding
            X_copy[column] = X_copy[column].astype({column:CategoricalDtype(self.categories[column])})
        return pd.get_dummies(X_copy, drop_first=self.drop_first)

### Function used for downloading the datasets

In [None]:
def download(path, url):
    os.makedirs(os.path.dirname(path), exist_ok=True)
    
    filename = os.path.basename(url)
    if not os.path.exists(os.path.join(path,filename)): # Only download data if it doesn't already exists
        response = requests.get(url)
        with open(os.path.join(path,filename), 'wb') as f:
            f.write(response.content)

# Preprocessing data sets

In [None]:
# California housing data set.
# Modified to a classification problem. We want to predict whether a house costs more or less
# than the median price.
X0, Y0 = datasets.fetch_california_housing(return_X_y=True)

X0 = preprocessing.StandardScaler().fit_transform(X0)
Y0 = 2 * (Y0 >= np.median(Y0)).astype(np.float32) - 1

X0.shape, Y0.shape

In [None]:
# 'Adult' data set from UCI repository
path = 'data/adult/adult'
adult_data = pd.read_csv(f'{path}.data', header=None)
adult_test = pd.read_csv(f'{path}.test', skiprows=1, header=None)
adult = pd.concat([adult_data, adult_test])
# These are the columns containing categorical values
categorical = [13, 9, 8, 7, 6, 5, 3, 1]

X1 = Pipeline([
    ('impute_missing', impute.SimpleImputer(missing_values=' ?', strategy='most_frequent')),
    ('norm_and_onehot',
        compose.ColumnTransformer(
            sparse_threshold=0, 
            transformers=[
                ('onehot', preprocessing.OneHotEncoder(), categorical)
            ], 
            remainder=preprocessing.StandardScaler()
        )
    ),
    ('pca', PCA(n_components=0.8, svd_solver='full')),
]).fit_transform(adult.drop(14, axis=1).to_numpy())

Y1 = adult[14].to_numpy()
# A boolean array using +1 for >50K and -1 for <=50K.
Y1 = 2 * np.logical_or(Y1 == ' >50K.', Y1 == ' >50K').astype(np.float) - 1

X1.shape, Y1.shape

In [None]:
path = 'data/optdigits/optdigits.tra'
orig_train = pd.read_csv(f'data/optdigits/optdigits.tra', header=None)
orig_test = pd.read_csv(f'data/optdigits/optdigits.tes', header=None)
optdigits = pd.concat([orig_train, orig_test])

X2 = preprocessing.StandardScaler().fit_transform(optdigits.to_numpy()[:, :-1])
Y2 = optdigits.to_numpy()[:, -1]

X2.shape, Y2.shape

In [None]:
path = 'data/abalone/abalone.data'
abalone = pd.read_csv(path, header=None).to_numpy()

X3 = Pipeline([
    ('preprocess', 
         compose.ColumnTransformer(
            sparse_threshold=0,
            transformers=[
                ('onehot', preprocessing.OneHotEncoder(), [0])
            ],
            remainder=preprocessing.StandardScaler())
    ),
]).fit_transform(abalone[:, :-1])

Y3 = 2 * (abalone[:, -1] <= 9).astype(np.float) - 1

X3.shape, Y3.shape

### Online Shoppers Intention Data Set


In [None]:
folder_name = 'data/'

In [None]:
online_url = ('https://archive.ics.uci.edu/ml/machine-learning-databases/00468/online_shoppers_intention.csv')

download(folder_name, online_url)

#### Load online shoppers intention data set

In [None]:
online = pd.read_csv(os.path.join(folder_name, os.path.basename(online_url)), sep=',')

In [None]:
online_copy = online.copy()
online_copy['Revenue'] = online_copy['Revenue'].map({True:1,False:0})

In [None]:
X_online = online_copy.drop('Revenue', axis=1)
Y_online = online_copy['Revenue']

In [None]:
categories = ['SpecialDay','Month','OperatingSystems', 'Browser', 'Region','TrafficType', 'VisitorType', 'Weekend']

online_full_pipe = FeatureUnion(
    [
        (
            'cat_pipe', 
            Pipeline(
                [
                    ('Converter', Converter(categories=categories)),
                    ('Selector', ColumnsSelector(types=['object'])),
                    ('Encoder', CategoricalEncoder())
                ]
            )
        ),
        (
            'num_pipe', 
            Pipeline(
                [
                    ('Converter', Converter(categories=categories)),
                    ('Selector', ColumnsSelector(types=['int','float'])),
                    ('Scaler', StandardScaler()),
                ]
            )
        ),
    ]
)

online_pca_pipe = Pipeline([('online_pipeline', online_full_pipe), ('PCA', PCA(n_components=0.8))])

### Bank Marketing Data set

In [None]:
bank_url = 'https://archive.ics.uci.edu/ml/machine-learning-databases/00222/bank.zip'

if not os.path.exists(os.path.join(folder_name, os.path.basename(bank_url))):
    response = requests.get(bank_url)
    with open(os.path.join(folder_name, os.path.basename(bank_url)),'wb') as f:
        f.write(response.content)
    
    with zipfile.ZipFile(os.path.join(folder_name, os.path.basename(bank_url)), 'r') as f:
        print(os.path.join(folder_name, os.path.basename(bank_url)))
        f.extractall(folder_name)

#### Load the bank dataset

In [None]:
bank_full_pipeline = FeatureUnion(
    [
        (
            'num_pipeline', 
            Pipeline(
                [
                    ('num_attri_selector', ColumnsSelector(types=['int'])),
                    ('scaler', StandardScaler()),
                ]
            )
        ),
        
        (
            'cat_pipeline', 
            Pipeline(
                [
                    ('cat_attri_selector', ColumnsSelector(types=['object'])),
                    ('encoder', CategoricalEncoder()),
                ]
            )
        ),
    ]
)

bank_pca_pipe = Pipeline([('bank_pipeline', bank_full_pipeline), ('PCA', PCA(n_components=0.8))])

In [None]:
bank = pd.read_csv(os.path.join(folder_name,'bank-full.csv'), sep=';')

bank_copy = bank.copy() # Copy the train data to keep the original as it is
bank_copy['y'] = bank_copy['y'].map({'yes':1,'no':0})
X_bank = bank_copy.drop('y', axis=1)
Y_bank = bank_copy['y']

## Dataset declaration
This cell declares the data sets that will be used.

In [None]:
DATASETS = [
    (X0, Y0, 'california_housing'),
    (X1, Y1, 'adult'),
    (X2, Y2, 'optdigits'),
    (X3, Y3, 'abalone'),
    (online_full_pipe.fit_transform(X_online), Y_online, 'online'),
    (online_pca_pipe.fit_transform(X_online), Y_online, 'online (pca)'),
    (bank_full_pipeline.fit_transform(X_bank), Y_bank, 'bank'),
    (bank_pca_pipe.fit_transform(X_bank), Y_bank, 'bank (pca)'),
]

# Setting up classifiers

In [None]:
# Name, estimator, param_grid to be used for grid search,
CLASSIFIERS = [
    # Random forest.
    (
        'RF',
        RandomForestClassifier(), 
        {
            'max_depth': [2, 4, 8, 16], 
            'n_estimators': [50, 100, 150]
        },
    ),
    # k-NN.
    (
        'KNN',
        neighbors.KNeighborsClassifier(), 
        {
            'n_neighbors': [1, 2, 4, 8, 16, 32, 64, 128]
        },
    ),
    # SVM.
    (
        'SVM',
        svm.LinearSVC(penalty='l2', loss='hinge', max_iter=10000),
        {
            'C': [10**r for r in range(-8, 5)],
        },
    ),
    # DT
    (
        'DT',
        tree.DecisionTreeClassifier(min_samples_split=0.05),
        {
            'min_samples_split': [0.025, 0.05, 0.1, 0.2],
            'criterion': ['gini', 'entropy'],
        },
    ),
    # LR
    (
        'LR',
        linear_model.LogisticRegression(),
        {
            'C': [10**r for r in range(-8, 5)],
        },
    ),
    # ANN
    (
        'ANN',
        neural_network.MLPClassifier(),
        {
            'alpha': [10**r for r in range(-5, 0)],
            'activation': ['relu', 'logistic', 'tanh'],
            'hidden_layer_sizes': [(64,), (72, 24), (16, 13, 7),],
        },
    ),
]

# Functions for training

In [None]:
# Run 3 datasets x 3 partitions x 3 trials x 3 classifiers.
def params():
    # This is gonna take some time.
    for (X, Y, name) in DATASETS:
        for p in [0.2, 0.5, 0.8]:
            for trial in range(3):
                # The trial data.
                X_train, X_test, Y_train, Y_test = train_test_split(X, Y, train_size = p)
                # Run this trial over each classifier.
                for (clf_name, clf, params) in CLASSIFIERS:
                    yield {
                        # Meta data to be used for data collection purposes.
                        'meta': {
                            'dataset': name,
                            'train_size': p,
                            'trial': trial,
                            'classifier': clf_name,
                        },
                        # Used in order to train this model.
                        'data': {
                            'clf': clf,
                            'params': params,
                            'x_train': X_train,
                            'y_train': Y_train,
                            'x_test': X_test,
                            'y_test': Y_test,
                        }
                    }

In [None]:
def _train_instance(x_train, y_train, x_test, y_test, clf, params):
    # Fit the classifier to the transformed input.
    grid_search = GridSearchCV(clf, params, cv=5, return_train_score=True, n_jobs=-1)
    grid_search.fit(x_train, y_train)
    # After determining the best hyper parameters, we can attain the
    # estimator that performed the best.
    estimator = grid_search.best_estimator_
    
    # Then, we need these four metrics from the trained classifier
    train = grid_search.cv_results_['mean_train_score'][grid_search.best_index_]
    validation = grid_search.cv_results_['mean_test_score'][grid_search.best_index_]
    test = estimator.score(x_test, y_test)
    # As well as the actual parameters that were used
    params = grid_search.best_params_
    
    # This is used to compute f-score
    y_pred = estimator.predict(x_test)
    
    return {
        'f_score': f1_score(y_test, y_pred, average='macro'),
        'training_accuracy': train,
        'validation_accuracy': validation,
        'test_accuracy': test,
        'params': params,
        'cv_results': grid_search.cv_results_,
    }
    
def train_instance(param):
    return {
        **param['meta'],
        **_train_instance(**param['data']),
    }

In [None]:
def train(seed = 1, parameters = params()):
    # For reproducible results
    np.random.seed(seed)
    # Parallel training
    return pd.DataFrame(
        Parallel(n_jobs=-1, verbose=50)(
            delayed(train_instance)(param) for param in parameters
        )
    )

# Distributed training
Since training is boring and time consuming, we can attempt to parallelize it as much as possible. The `train` method above is already written to utilize multiple threads. However, we can do better. We can distribute the computation among several workers, enabling a substantial speed up. Therefore, I am proud to present the world's most ad hoc distributed machine learning training algorithm, ever.

In [None]:
import os

# When distributing work across several machines, we'll use a very simple (and inefficient) method to divide
# the work between them. We'll simply assign each of N machines an id in range(0, N), and give each of them an
# equal chunk of the total work. The id is here represented by the environment variable `WORKER_NUM`, while the
# number of workers is denoted by `TOTAL_WORKERS`. Default: *single machine doing all the work*.
user = int(os.environ.get('WORKER_NUM', 0))
workers = int(os.environ.get('TOTAL_WORKERS', 1))

# Retrieve the work to be done by this instance.
work = list(params())
items_per_worker = len(work) // workers + 1
work = work[user * items_per_worker: (user + 1) * items_per_worker]

# Train on this worker's subset.
results = train(seed = user, parameters=work)

In [None]:
# Just to print the results if running with in Jupyter with a GUI.
results

# Upload to Google Sheets
Now after computing the results, we'll want to upload it to Google Sheets. This enables us to easily crowdsource the computation between gutta.

> NOTE: The keyfile is not included in this submission, so this portion of the project will crash horribly. But, it's only really needed when doing distributed training anyway.

## Authorization for using Google Cloud API.

In [None]:
# Where we want the credentials to be sent in order to be authorized
scope = ['https://spreadsheets.google.com/feeds', 'https://www.googleapis.com/auth/drive']
keyfile = 'gserviceaccount-client-secret.json'

credentials = ServiceAccountCredentials.from_json_keyfile_name(keyfile, scope)
gc = gspread.authorize(credentials)

## Uploading the computed results.

In [None]:
spreadsheet_key = '1QUMP6tlBqR3CqYlh7uC18e_XA4FgMT9SqMSc5agoGUA'

# This adjusts between our 0-indexing and GS' 1-indexing, as well as the extra space
# taken up by the header row.
offset = 1 if user == 0 else 2
d2g.upload(
    results,
    spreadsheet_key,
    credentials=credentials,
    clean=False,
    row_names=False,
    col_names=user == 0,
    start_cell=f'A{user * items_per_worker + offset}',
    wks_name = 'Results',
)