# Toxic Comment Classification with PySpark

## Author: Meixuezi Tong

## Background
Online Communications can be difficult due to the threat of abuse and harassment. In order to maintain a healthy online conversation environment, it is important for online communication platforms to build tools to automatically detect negative behaviors, for example, toxic comments (i.e. comments that are disrespectful or rude.) 

This study was aimed to implement PySpark pipeline tools to perform Natural Language Processing on raw text and fine-tune a Machine Learning model to classify toxic comments. The data used for this study was obtained and adapted from a public dataset from the Kaggle competition platform, provided by the Conversation AI research team. 
 


## Step 1: Setup Spark and Load Data

In [13]:
# import modules
from pyspark import SparkContext
from pyspark.ml import *
from pyspark.ml.classification import *
from pyspark.ml.feature import *
from pyspark.ml.param import *
from pyspark.ml.tuning import *
from pyspark.ml.evaluation import *
from pyspark.sql import *
import pixiedust
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np

# setup SparkContext
sc = SparkContext('local')
spark = SparkSession(sc)
sqlContext = SQLContext(sc)

# load data
data = pd.read_csv('train.csv')

# drop unnecessary columns
data = data.drop(['id','severe_toxic', 'obscene', 'threat', 'insult','identity_hate'], axis = 1) 

Pixiedust database opened successfully
Table VERSION_TRACKER created successfully
Table METRICS_TRACKER created successfully

Share anonymous install statistics? (opt-out instructions)

PixieDust will record metadata on its environment the next time the package is installed or updated. The data is anonymized and aggregated to help plan for future releases, and records only the following values:

{
   "data_sent": currentDate,
   "runtime": "python",
   "application_version": currentPixiedustVersion,
   "space_id": nonIdentifyingUniqueId,
   "config": {
       "repository_id": "https://github.com/ibm-watson-data-lab/pixiedust",
       "target_runtimes": ["Data Science Experience"],
       "event_id": "web",
       "event_organizer": "dev-journeys"
   }
}
You can opt out by calling pixiedust.optOut() in a new cell.


Pixiedust runtime updated. Please restart kernel
Table SPARK_PACKAGES created successfully
Table USER_PREFERENCES created successfully
Table service_connections created successfully


## Step 2: Data Exploratory Analysis

In [7]:
# check data examples
data.head(2)

Unnamed: 0,comment_text,toxic
0,Explanation\nWhy the edits made under my usern...,0
1,D'aww! He matches this background colour I'm s...,0


In [10]:
# Create dataframe

# Create fields and schema for creating dataframe from RDD
fields = [StructField('text', StringType(), True), StructField('label', ByteType(), True) ]
schema = StructType(fields)

# Apply the schema to the RDD.
df = spark.createDataFrame(data, schema)

In [11]:
print(df.take(1))

[Row(text="Explanation\nWhy the edits made under my username Hardcore Metallica Fan were reverted? They weren't vandalisms, just closure on some GAs after I voted at New York Dolls FAC. And please don't remove the template from the talk page since I'm retired now.89.205.38.27", label=0)]


In [151]:
# calculate the ratio of toxic comments
toxic = df.filter(df.label == 1).count()
total_count = df.count()
toxic_ratio = toxic/total_count
print('The ratio of toxic comments : {:.3f}: {} out of {}'.format(toxic_ratio, toxic, total_count))

The ratio of toxic comments : 0.096: 15294 out of 159571


## Step 3: Partition Training Dataset and Test Dataset

In [16]:
# set 20% of the data for testing the model, 80% for training the model
train_set, test_set = df.randomSplit([0.8, 0.2], 123)
print ("Total document count:",df.count())
print ("Training-set count:",train_set.count())
print ("Test-set count:",test_set.count())

Total document count: 159571
Training-set count: 127597
Test-set count: 31974


## Step 4: Construct a Pipeline for Feature Extractor and Classifier

In [None]:
# import modules for feature transformation
from pyspark.ml.linalg import Vector
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF,StopWordsRemover,IDF,Tokenizer

#Constructing a pipeline for feature transformation, and model tuning

#Tokenize into words
tokenizer = Tokenizer().setInputCol("text").setOutputCol("words")

#Remove stopwords
remover= StopWordsRemover().setInputCol("words").setOutputCol("filtered").setCaseSensitive(False)

#For each sentence (bag of words),use HashingTF to hash the sentence into a feature vector. 
hashingTF = HashingTF().setNumFeatures(1000).setInputCol("filtered").setOutputCol("rawFeatures")

#Create TF_IDF features
idf = IDF().setInputCol("rawFeatures").setOutputCol("features").setMinDocFreq(0)

# Create a Logistic regression model
lr = LogisticRegression()

# Streamline all above steps into a pipeline
pipeline=Pipeline(stages=[tokenizer,remover,hashingTF,idf, lr])

## Step 5: Perform a Parameter Grid Search to Fine Tune the Pipeline and Evaluate the Models by TrainValidationSplit Method

In [107]:
# Perform a grid search looking for the best parameters and the best models
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

paramGrid = ParamGridBuilder()\
    .addGrid(hashingTF.numFeatures,[1000,5000,10000])\
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.3, 0.6])\
    .build()
tvs = TrainValidationSplit(estimator=pipeline,
                           estimatorParamMaps=paramGrid,
                           evaluator=BinaryClassificationEvaluator().setMetricName('areaUnderPR'), # set area Under precision-recall curve as the evaluation metric
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8)

# Run TrainValidationSplit, and choose the best set of parameters.
model = tvs.fit(train_set)
# Make predictions
train_prediction = model.transform(train_set)
test_prediction = model.transform(test_set)

print("Area under the PR curve for best fitted model =",evaluator.evaluate(test_prediction))

# Caculate the accuracy score for the best model 
correct = test_prediction.filter(test_prediction.label == test_prediction.prediction).count()  
accuracy = correct/test_prediction.count()
print('Accuracy {:.2%} data items: {}, correct: {}'.format(accuracy, test_prediction.count(), correct))

Area under the PR curve for best fitted model = 0.8748302691581958
Accuracy 92.00% data items: 31974, correct: 29417


## Step 6: Output the Parameter Tuning Results in a CSV file

In [108]:
# create empty dataframe to store model performance
columns = ['sample_size','training_time/sec','testing_time/sec','areaUnderPR','acc_train','acc_test']
index = [i for i in range(4)]
pf = pd.DataFrame(index = index, columns = columns, dtype = 'float32') # transform the data from list format to pandas dataframe
pf = pf.fillna(0) 

In [109]:
pf

Unnamed: 0,sample_size,training_time/sec,testing_time/sec,areaUnderPR,acc_train,acc_test
0,0.0,0.0,0.0,0.0,0.0,0.0
1,0.0,0.0,0.0,0.0,0.0,0.0
2,0.0,0.0,0.0,0.0,0.0,0.0
3,0.0,0.0,0.0,0.0,0.0,0.0


In [111]:
# Sample the training data into different sizes and evalutate the results

from time import time # to compute training and testing time
sample_range = [0.2, 0.4, 0.8, 1]
for i in range(4):
    
    sample_size = sample_range[i]
    
    pf['sample_size'][i] = sample_size # store the sample size value
    if sample_size == 1:
        train = train_set # if the size is 1, no need to sample
    else:
        train = train_set.sample(False, sample_size, seed = 1234)
        
    print('start training model ', i)
    # compute the training time
    time1 = time()
    model = tvs.fit(train)
    time2 = time()
    
    pf['training_time/sec'][i] = time2 - time1 # store the training time value
    print('training done in {:.2f}'.format(time2-time1))
    train_prediction = model.transform(train)
    
    # compute the testing time
    time3 = time()
    print('start predicting on test set')
    test_prediction = model.transform(test_set)
    time4 = time()
    pf['testing_time/sec'][i] = time4 - time3 # store the testing time value
    print('testing done in {:.2f}'.format(time4-time3))
    
    # compute the Validation metric: areaUnderPR
    pf['areaUnderPR'][i] = evaluator.evaluate(test_prediction)
    
    # compute the accuracy score on test data
    cor_test = test_prediction.filter(test_prediction.label == test_prediction.prediction).count()  
    pf['acc_test'][i] = cor_test/test_prediction.count() # store the test accuracy score 
    print('accuracy on test data is {:.2f}'.format(cor_test/test_prediction.count()))
    
    # compute the accuracy score on training data
    cor_train = train_prediction.filter(train_prediction.label == train_prediction.prediction).count()  
    pf['acc_train'][i] = cor_train/train_prediction.count() # store the training accuracy score
    print('accuracy on training data is {:.2f}'.format(cor_train/train_prediction.count()))
    

pf       

start training model  0
training done in 190.30
start predicting on test set
testing done in 0.05
accuracy on test data is 0.90
accuracy on training data is 0.91
start training model  1
training done in 255.41
start predicting on test set
testing done in 0.04
accuracy on test data is 0.90
accuracy on training data is 0.90
start training model  2
training done in 382.68
start predicting on test set
testing done in 0.04
accuracy on test data is 0.90
accuracy on training data is 0.90
start training model  3
training done in 465.20
start predicting on test set
testing done in 0.04
accuracy on test data is 0.92
accuracy on training data is 0.94


Unnamed: 0,sample_size,training_time/sec,testing_time/sec,areaUnderPR,acc_train,acc_test
0,0.2,190.303772,0.054162,0.5,0.905323,0.903265
1,0.4,255.409302,0.04313,0.5,0.903602,0.903265
2,0.8,382.681091,0.040246,0.5,0.904582,0.903265
3,1.0,465.204407,0.044251,0.87483,0.935829,0.920029


In [None]:
pf.to_csv('performances.csv', index = False) # save the performance panda dataframe to a csv file

## Step 7: Output the best parameters for the best pipeline

In [125]:
model.bestModel.stages # check the pipleline stages

[Tokenizer_44b99c0d73a3fd8b2275,
 StopWordsRemover_4592b912739472d2fcb3,
 HashingTF_457797ddf4311c34da7b,
 IDF_475e88e4a4bd8cef7c9c,
 LogisticRegression_4bedb56b0e71083e3c87]

In [127]:
# extract the best parameters from the best model
best_model = model.bestModel
print('best HashingTF feature size: ', best_model.stages[2]._java_obj.getNumFeatures())
print('best regularization parameter: ', best_model.stages[4]._java_obj.getRegParam())
print('best elastic net parameter: ', best_model.stages[4]._java_obj.getElasticNetParam())

best HashingTF feature size:  10000
best regularization parameter:  0.01
best elastic net parameter:  0.0


In [131]:
model.validationMetrics

[0.3970055862471745,
 0.5035470355479709,
 0.544975240352693,
 0.43630075265885787,
 0.5179558877333595,
 0.5489222540618783,
 0.49677307181629665,
 0.4975075361892045,
 0.49786998286045986,
 0.39466895690033155,
 0.48509991860696533,
 0.5223649551287207,
 0.5477450484454648,
 0.5477450484454648,
 0.5477450484454648,
 0.34620259759078076,
 0.4281849520825269,
 0.4569848303973037]

In [144]:
# zip the metric value with parameter values for all models
grid_search_results = list(zip(model.validationMetrics, paramGrid))
# sort the results by the metric value by descending order
grid_search_results_sorted = sorted(grid_search_results, key=lambda x:x[0], reverse=True)
print('the best parameters for the pipeline: ,', grid_search_results_sorted[0][1])

the best parameters for the pipeline: , {Param(parent='LogisticRegression_4bedb56b0e71083e3c87', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0, Param(parent='LogisticRegression_4bedb56b0e71083e3c87', name='regParam', doc='regularization parameter (>= 0).'): 0.01, Param(parent='HashingTF_457797ddf4311c34da7b', name='numFeatures', doc='number of features.'): 10000}
