In [9]:
import os
import requests
import pandas as pd
import numpy as np
import re
import random
import gensim
from gensim.models import KeyedVectors
from sklearn.pipeline import Pipeline
from sklearn.ensemble import RandomForestClassifier
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.metrics import confusion_matrix, accuracy_score, f1_score, roc_auc_score
from sklearn.externals import joblib

import argparse

# Define the featurizers

In [10]:
class GensimPreprocessor(BaseEstimator, TransformerMixin):
    def __init__(self, newline_token='NEWLINE_TOKEN'):
        self.newline_pat = re.compile(newline_token)
    
    def fit(self, X, y=None):
        return self
    
    def inverse_transform(self, X):
        return [" ".join(doc) for doc in X]
    
    def transform(self, X):
        return [ list(self.tokenize(txt)) for txt in X ]
    
    def tokenize(self, doc):
        doc = self.newline_pat.sub(' ', doc)
        return gensim.utils.simple_preprocess(doc)

In [11]:
class AvgWordVectorFeaturizer(object):
    def __init__(self, embedding, restrict_vocab=400000):
        self.embedding = embedding
        self.word2index = { w:i for i,w in enumerate(embedding.index2word) }
        self.restrict_vocab = restrict_vocab
    
    def fit(self, X, y):
        return self
    
    def transform(self, X):
        # X is a list of tokenized documents
        return np.array([
            np.mean([self.embedding[t] for t in token_vec 
                        if t in self.embedding and 
                        (self.word2index[t] < self.restrict_vocab)
                    ]
                    or [np.zeros(self.embedding.vector_size)], axis=0)
            for token_vec in X
        ])

# Load, split, preprocess, and featurize the data

In [21]:
# Define the data files

w2v_file = 'miniglove_6B_50d_w2v.txt'
text_data_file = "attack_data.csv"
training_set_file = "training_set_01.csv"

In [22]:
# Load the text data

word_vectors = KeyedVectors.load_word2vec_format(w2v_file, binary=False)

text_data = pd.read_csv(text_data_file, encoding='windows-1252')

text_data = text_data.set_index("rev_id")

In [23]:
# Define the IDs of the training set and candidate test set observations

training_set_rev_ids = pd.read_csv(training_set_file).rev_id

test_candidate_rev_ids = set.difference(set(text_data.index.values), set(training_set_rev_ids))

In [24]:
# Take a random sample of the candidate test set observations

random.seed(1)
test_set_rev_ids = random.sample(test_candidate_rev_ids, 10000)

In [25]:
# Select the training and test set into separate Pandas dataframes

training_data = text_data.loc[training_set_rev_ids]
test_data = text_data.loc[test_set_rev_ids]

In [26]:
# Generate label lists

labels = [int(x) for x in training_data.is_attack]
test_labels = [int(x) for x in test_data.is_attack]

In [27]:
# Instantiate the featurizers

gp = GensimPreprocessor()
featurizer = AvgWordVectorFeaturizer(embedding=word_vectors)

In [28]:
# Featurize the training data

preprocessed_data = gp.transform(training_data.comment)
featurized_data = featurizer.transform(preprocessed_data)

In [29]:
# Featurize the test data

preprocessed_test_data = gp.transform(test_data.comment)
featurized_test_data = featurizer.transform(preprocessed_test_data)

# Train default (un-tuned) scikit-learn random forest

In [30]:
%%time

classifier_model = RandomForestClassifier(random_state=1)

fitted_model = classifier_model.fit(featurized_data, labels)

CPU times: user 30.2 ms, sys: 108 µs, total: 30.3 ms
Wall time: 28.8 ms


# Test the model

In [31]:
pred_prob = fitted_model.predict_proba(featurized_test_data)

scores = pred_prob[:,1]

auc = roc_auc_score(test_labels, scores)

print('AUC:', auc)

AUC: 0.8026801186123085


# Join features and labels

In [32]:
# Combine training set features and labels

featurized_data_df = pd.DataFrame(featurized_data).reset_index(drop=True)

labels_df = pd.DataFrame(labels).rename(index=str, columns={0: "Label"}).reset_index(drop=True)

labeled_data = pd.concat([labels_df, featurized_data_df], axis=1)

In [33]:
# Combine test set features and labels

featurized_test_data_df = pd.DataFrame(featurized_test_data).reset_index(drop=True)

test_labels_df = pd.DataFrame(test_labels).rename(index=str, columns={0: "Label"}).reset_index(drop=True)

labeled_test_data = pd.concat([test_labels_df, featurized_test_data_df], axis=1)

# Beginning of PySpark code

In [34]:
import pyspark

import pandas as pd
import mmlspark
from pyspark.sql.types import IntegerType, StringType, FloatType, StructType, StructField

import os, urllib


In [35]:
# Create Spark dataframes

tune = spark.createDataFrame(labeled_data)
test = spark.createDataFrame(labeled_test_data)


# Tune logistic regression, random forest, and GBM

In [36]:
from mmlspark import TuneHyperparameters
from mmlspark.TrainClassifier import TrainClassifier
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier

# Define the models to try: Logistic Regression, Random Forest, and Gradient Boosted Trees

logReg = LogisticRegression()
randForest = RandomForestClassifier()
gbt = GBTClassifier()

smlmodels = [logReg, randForest, gbt]

mmlmodels = [TrainClassifier(model=model, labelCol="Label") for model in smlmodels]

In [37]:
from mmlspark import HyperparamBuilder
from mmlspark import RangeHyperParam
from mmlspark import DiscreteHyperParam
from mmlspark import RandomSpace

# Define the hyperparameters to tune

paramBuilder = \
  HyperparamBuilder() \
    .addHyperparam(logReg, logReg.regParam, RangeHyperParam(0.1, 0.3, isDouble=True)) \
    .addHyperparam(randForest, randForest.numTrees, RangeHyperParam(50, 1000)) \
    .addHyperparam(randForest, randForest.maxDepth, RangeHyperParam(3, 30)) \
    .addHyperparam(randForest, randForest.maxBins, RangeHyperParam(100, 1000)) \
    .addHyperparam(randForest, randForest.impurity, DiscreteHyperParam(['gini', 'entropy'])) \
    .addHyperparam(gbt, gbt.maxBins, RangeHyperParam(100, 1000)) \
    .addHyperparam(gbt, gbt.maxDepth, RangeHyperParam(3, 30))

randomSpace = RandomSpace(paramBuilder.build())

In [38]:
%%time
bestModel = TuneHyperparameters(
              evaluationMetric="AUC", models=mmlmodels, numFolds=3,
              numRuns=len(mmlmodels) * 1, parallelism=2,
              paramSpace=randomSpace.space(), seed=0).fit(tune)

CPU times: user 409 ms, sys: 245 ms, total: 654 ms
Wall time: 3min 36s


In [39]:
# Print the parameters of the best model

bestModelInfo = bestModel._java_obj.getBestModelInfo()

for entry in bestModelInfo.split(', '):
    print(entry)

cacheNodeIds: false
checkpointInterval: 10
featureSubsetStrategy: auto
featuresCol: TrainClassifier_4d17a05267268ba671a7_features
impurity: entropy
labelCol: Label
maxBins: 548
maxDepth: 28
maxMemoryInMB: 256
minInfoGain: 0.0
minInstancesPerNode: 1
numTrees: 948
predictionCol: prediction
probabilityCol: probability
rawPredictionCol: rawPrediction
seed: 3385818979260681125
subsamplingRate: 1.0


# Test the best model on the held-out test set

In [40]:
from mmlspark import ComputeModelStatistics

prediction = bestModel.transform(test)

metrics = ComputeModelStatistics().transform(prediction)

metrics.toPandas().transpose().rename(columns={0: ''})

Unnamed: 0,Unnamed: 1
evaluation_type,Classification
predicted_class_as_0.0_actual_is_0.0,8569
predicted_class_as_0.0_actual_is_1.0,796
predicted_class_as_1.0_actual_is_0.0,158
predicted_class_as_1.0_actual_is_1.0,477
accuracy,0.9046
precision,0.751181
recall,0.374705
AUC,0.88117
