In [1]:
import pandas as pd
import numpy as np
import yaml, re, copy

from google.cloud import storage
from io import BytesIO

from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.base import TransformerMixin, BaseEstimator, clone
from sklearn.feature_extraction.text import TfidfTransformer, CountVectorizer
from sklearn.feature_selection import SelectKBest
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score
from sklearn.utils.validation import check_is_fitted

import distributed
from dask_ml.model_selection import GridSearchCV as GridSearchCVBase

In [2]:
# load the data
client_gcs = storage.Client()
bucket = client_gcs.get_bucket('djr-data')

def gcs_to_df(f):
    blob = bucket.blob(f)
    buf = BytesIO()
    blob.download_to_file(buf)
    buf.seek(0)
    return pd.read_csv(buf, encoding = "utf-8")
 
df_train = gcs_to_df("kaggle-jigsaw/train.csv")
df_test = gcs_to_df("kaggle-jigsaw/test.csv")
yvar = ['toxic', 'severe_toxic', 'obscene', 'threat', 'insult', 'identity_hate']

In [3]:
# initialize client for interacting with dask
# DASK_SCHEDULER_ADDRESS env variable specifies scheduler ip
client_dask = distributed.Client()

In [4]:
# correlation matrix
df_train[yvar].corr()

Unnamed: 0,toxic,severe_toxic,obscene,threat,insult,identity_hate
toxic,1.0,0.308619,0.676515,0.157058,0.647518,0.266009
severe_toxic,0.308619,1.0,0.403014,0.123601,0.375807,0.2016
obscene,0.676515,0.403014,1.0,0.141179,0.741272,0.286867
threat,0.157058,0.123601,0.141179,1.0,0.150022,0.115128
insult,0.647518,0.375807,0.741272,0.150022,1.0,0.337736
identity_hate,0.266009,0.2016,0.286867,0.115128,0.337736,1.0


In [5]:
df_train[yvar].apply(np.mean, axis = 0)

toxic            0.095844
severe_toxic     0.009996
obscene          0.052948
threat           0.002996
insult           0.049364
identity_hate    0.008805
dtype: float64

In [6]:
# train / test split
xdata = df_train.comment_text
ydata = df_train[yvar]
xdata_train, xdata_eval, ydata_train, ydata_eval = train_test_split(xdata, ydata, test_size = 0.2, random_state = 1)

In [7]:
# set up pipeline
pipeline = Pipeline(steps = [
    ('cv', CountVectorizer(min_df=5, max_features = 50000, lowercase=False, strip_accents='unicode', stop_words='english', analyzer='word')), 
    ('tfidf', TfidfTransformer(sublinear_tf = True, use_idf = True)), 
    ('kbest', SelectKBest()),
    ('model', LogisticRegression(class_weight = "balanced"))
])

In [8]:
# for non-multimetric, don't require refit = True for best_params_ / best_score_
class GridSearchCV(GridSearchCVBase):

    # For multiple metric evaluation, refit is a string denoting the scorer that should be 
    # used to find the best parameters for refitting the estimator 
    @property
    def scorer_key(self):
        return self.refit if self.multimetric_ else 'score'
    
    @property
    def best_index(self):
        check_is_fitted(self, 'cv_results_')
        return np.flatnonzero(self.cv_results_['rank_test_{}'.format(self.scorer_key)] == 1)[0]

    @property
    def best_params_(self):
        return self.cv_results_['params'][self.best_index]

    @property
    def best_score_(self):
        return self.cv_results_['mean_test_{}'.format(self.scorer_key)][self.best_index]

In [9]:
# hyperparameter tuning
param_grid = {
    'cv__ngram_range': [(1, 1), (1, 2)],
    'tfidf__norm': ['l1', 'l2', None],
    'kbest__k': [10000, 25000, "all"],
    'model__C': [0.01, 0.1],
    'model__penalty': ['l1', 'l2']
}
            
try:
    with open('model_param.yaml', 'r') as f:
        param_optimal = yaml.load(f)
except IOError:
    param_optimal = {}

    # create tuner
    tuner = GridSearchCV(pipeline, param_grid, scheduler = client_dask, scoring = 'roc_auc', 
                         cv = 3, refit = False, return_train_score = False)
    
    # determine optimal hyperparameters
    for y in yvar:
        tuner.fit(xdata_train, ydata_train[y])
        print('Best params for %s: %s' % (str(y), str(tuner.best_params_)))
        print('Best params score for %s: %s' % (str(y), str(tuner.best_score_)))
        param_optimal[y] = copy.deepcopy(tuner.best_params_)
    
    # save best params
    with open('model_param.yaml', 'w') as f:
        yaml.dump(param_optimal, f)

Best params for toxic: {'cv__ngram_range': (1, 1), 'kbest__k': 'all', 'model__C': 0.1, 'model__penalty': 'l2', 'tfidf__norm': 'l2'}
Best params score for toxic: 0.9581649926974369
Best params for severe_toxic: {'cv__ngram_range': (1, 2), 'kbest__k': 10000, 'model__C': 0.1, 'model__penalty': 'l2', 'tfidf__norm': 'l2'}
Best params score for severe_toxic: 0.984049857827509
Best params for obscene: {'cv__ngram_range': (1, 1), 'kbest__k': 10000, 'model__C': 0.01, 'model__penalty': 'l1', 'tfidf__norm': None}
Best params score for obscene: 0.9782391682556496
Best params for threat: {'cv__ngram_range': (1, 1), 'kbest__k': 10000, 'model__C': 0.1, 'model__penalty': 'l2', 'tfidf__norm': 'l2'}
Best params score for threat: 0.9794954551081121
Best params for insult: {'cv__ngram_range': (1, 1), 'kbest__k': 25000, 'model__C': 0.1, 'model__penalty': 'l2', 'tfidf__norm': 'l2'}
Best params score for insult: 0.9709854662517963
Best params for identity_hate: {'cv__ngram_range': (1, 1), 'kbest__k': 10000, 

In [10]:
# build model with optimal param and generate predictions on eval set
models = client_dask.map(lambda y: clone(pipeline).set_params(**param_optimal[y]).fit(xdata_train, ydata_train[y]), yvar)
ydata_eval_pred = client_dask.map(lambda m: pd.Series(m.predict_proba(xdata_eval)[:,1]), models)

ydata_eval_pred = client_dask.gather(ydata_eval_pred)
ydata_eval_pred = pd.concat(ydata_eval_pred, axis = 1)
ydata_eval_pred.columns = yvar

In [11]:
# calculate performance on eval set
auc = [roc_auc_score(ydata_eval[y], ydata_eval_pred[y]) for y in yvar]
print('Model AUCs: %s' % auc)
print('Avg AUC: %s' % np.mean(auc))

Model AUCs: [0.961862557804696, 0.9851282088224007, 0.977699840822474, 0.9809085233663242, 0.970649479505864, 0.9732070152491284]
Avg AUC: 0.9749092709284812


In [12]:
# xdata_test = df_test.comment_text

# models_final = client_dask.map(lambda y: clone(pipeline).fit(xdata, ydata[y]), yvar)
# ydata_test_pred = client_dask.map(lambda m: pd.Series(m.predict_proba(xdata_test)[:,1]), models_final)

# ydata_test_pred = client_dask.gather(ydata_test_pred)
# ydata_test_pred = pd.concat(ydata_test_pred, axis = 1)
# ydata_test_pred.columns = yvar
# ydata_test_pred['id'] = df_test.id

# ydata_test_pred.to_csv('submission.csv', index = False)