In [1]:
sc.stop()

In [2]:
import pyspark
from pyspark.sql import *
import pyspark.sql.functions as F
import pyspark.sql.types as T 
import string
from nltk.corpus import stopwords 
from nltk.tokenize import word_tokenize
from nltk.stem import PorterStemmer
get_ipython().run_line_magic('matplotlib', 'inline')

In [3]:
from pyspark.sql import SparkSession
sc = pyspark.SparkContext(appName="ToxicTwitterComments")
spark = pyspark.sql.SQLContext(sc)

In [4]:
import nltk
nltk.download('punkt')

[nltk_data] Downloading package punkt to /home/jupyterlab/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

In [5]:
import nltk
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to
[nltk_data]     /home/jupyterlab/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

# 1. Load data

In [6]:
trainDF = spark.read.csv('train.csv', 
                         header=True, 
                         multiLine=True, 
                         encoding="UTF-8",
                         sep=',',
                         escape='"',
                         inferSchema=True)

In [7]:
trainDF.printSchema()

root
 |-- id: string (nullable = true)
 |-- comment_text: string (nullable = true)
 |-- toxic: integer (nullable = true)
 |-- severe_toxic: integer (nullable = true)
 |-- obscene: integer (nullable = true)
 |-- threat: integer (nullable = true)
 |-- insult: integer (nullable = true)
 |-- identity_hate: integer (nullable = true)



In [8]:
train, test = trainDF.randomSplit([0.6, 0.4], seed = 2018)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

Training Dataset Count: 95557
Test Dataset Count: 64014


In [9]:
train.createOrReplaceTempView('train')
test.createOrReplaceTempView('test')

In [10]:
spark.sql('''
SELECT * FROM train
limit(10)
''').toPandas()

Unnamed: 0,id,comment_text,toxic,severe_toxic,obscene,threat,insult,identity_hate
0,0000997932d777bf,Explanation\nWhy the edits made under my usern...,0,0,0,0,0,0
1,000103f0d9cfb60f,D'aww! He matches this background colour I'm s...,0,0,0,0,0,0
2,000113f07ec002fd,"Hey man, I'm really not trying to edit war. It...",0,0,0,0,0,0
3,0001d958c54c6e35,"You, sir, are my hero. Any chance you remember...",0,0,0,0,0,0
4,00024b59235015f3,Virgin\nMy only warning? You'll block me? Well...,1,0,1,0,1,0
5,00025465d4725e87,"""\n\nCongratulations from me as well, use the ...",0,0,0,0,0,0
6,0002bcb3da6cb337,COCKSUCKER BEFORE YOU PISS AROUND ON MY WORK,1,1,1,0,1,0
7,0002bfc2abe2a51f,"""*::::::::I believe that you're confusing """"pr...",0,0,0,0,0,0
8,0002eeaf4c0cdf35,But isnt it against the rules to edit if you a...,0,0,0,0,0,0
9,00030003d620f7a8,"""\nseems about right. nableezy - """,0,0,0,0,0,0


# 2. Data Cleaning

In [11]:
def clean(lines):
    lines = lines.lower().translate(str.maketrans('', '', string.punctuation))
    stop_words = list(set(stopwords.words('english')))
    stop_words.remove('not')
    word_tokens = word_tokenize(lines)
    filtered_sentence = [w for w in word_tokens if not w in stop_words]
    cleaned_line = []
    for w in filtered_sentence:
        word = PorterStemmer().stem(w)
        cleaned_line.append(word)
    return ' '.join(cleaned_line)

In [12]:
clean("'Hey man, I'm really not trying to edit war.'")

'hey man im realli not tri edit war'

In [13]:
spark.udf.register("clean", clean, T.StringType())

<function __main__.clean(lines)>

In [14]:
#cleaning test data
df_test=spark.sql("""
SELECT clean(comment_text) cleaned_comment, toxic, severe_toxic,obscene,threat,insult,identity_hate
FROM test
LIMIT 10000
""")

In [15]:
df_train=spark.sql("""
SELECT clean(comment_text) cleaned_comment, toxic, severe_toxic,obscene,threat,insult,identity_hate
FROM train
limit 10000
""")

# 3. Feature Engineering (TF-IDF)

In [16]:
#from pyspark.mllib.feature import HashingTF, IDF
from __future__ import print_function
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.sql import SparkSession

In [17]:
from pyspark.ml import Pipeline
#data processing pipeline
tokenizer = Tokenizer(inputCol="cleaned_comment", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="rawFeatures")
idf = IDF(inputCol="rawFeatures", outputCol="features")
pipeline = Pipeline(stages=[tokenizer, hashingTF, idf])
pipelineModel = pipeline.fit(df_train)
rescaledData = pipelineModel.transform(df_train)
rescaledData_test = pipelineModel.transform(df_test)

In [18]:
from pyspark.ml.feature import NGram

ngram = NGram(n=2, inputCol=tokenizer.getOutputCol(), outputCol="ngrams")
hashingTF_2gram = HashingTF(inputCol=ngram.getOutputCol(), outputCol="rawFeatures_2gram")
idf_2gram = IDF(inputCol="rawFeatures_2gram", outputCol="features_2gram")

pipeline_2gram = Pipeline(stages=[tokenizer, ngram, hashingTF_2gram, idf_2gram])
pipelineModel_2gram = pipeline_2gram.fit(df_train)
rescaledData_2gram = pipelineModel_2gram.transform(df_train)
rescaledData_test_2gram = pipelineModel_2gram.transform(df_test)

In [19]:
#get weight to adjust unbalanced
def weight_ratio(predictions,class_name):
    pos_lab=predictions[predictions[class_name]==1].count()
    neg_lab=predictions[predictions[class_name]==0].count()
    return neg_lab/pos_lab

In [20]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [21]:
rescaledData.createOrReplaceTempView('table1')
rescaledData_2gram.createOrReplaceTempView('table2')

In [22]:
rescaledData_test.createOrReplaceTempView('table3')
rescaledData_test_2gram.createOrReplaceTempView('table4')

In [23]:
vector_all = spark.sql('select table1.toxic, table1.obscene, table1.threat, table1.insult, table1.identity_hate,  table1.severe_toxic, table1.features, features_2gram from table1 left join table2 on (table1.cleaned_comment = table2.cleaned_comment)')

In [24]:
vector_all_test = spark.sql('select table3.toxic, table3.obscene, table3.threat, table3.insult, table3.identity_hate,  table3.severe_toxic, table3.features, features_2gram from table3 left join table4 on (table3.cleaned_comment = table4.cleaned_comment)')

In [25]:
assembler = VectorAssembler(
    inputCols=["features_2gram", "features"],
    outputCol="features_all")
rescaledData_all = assembler.transform(vector_all)

In [26]:

rescaledData_all_test = assembler.transform(vector_all_test)

# 5. Modelling (LR)

## 5.1 Train

In [27]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
evaluator = BinaryClassificationEvaluator()

def recall_precision(predictions):
    TP=predictions[(predictions.label==1)&(predictions.prediction==1)].count()
    FP=predictions[(predictions.label==0)&(predictions.prediction==1)].count()
    TN=predictions[(predictions.label==0)&(predictions.prediction==0)].count()
    FN=predictions[(predictions.label==1)&(predictions.prediction==0)].count()
    recall=TP/(TP+FN)
    precision=TP/(TP+FP)
    f1=2 * precision * recall / (precision + recall)
    return recall, precision, f1

In [28]:
class_name = ["toxic", 'severe_toxic', 'obscene', 'threat', 'insult', 'identity_hate']

In [29]:
for name in class_name:
    train=rescaledData.select("features",name)
    test = rescaledData_test.select("features",name)
    ratio=weight_ratio(train,name)
    train=train.withColumn("weight", F.when(train[name]==1,ratio).otherwise(1))
    ##changed maxIter to 5 for faster calculation
    lr = LogisticRegression(featuresCol = 'features',weightCol="weight",labelCol = name, maxIter=10,  regParam=0.03, elasticNetParam=1)
    lrModel = lr.fit(train)
    #save model, not yet tested
    #lrModel.save([spark_context], [file_path])
    predictions = lrModel.transform(test)
    #predictions.select(class_name, 'rawPrediction', 'prediction', 'probability').show(10)
    predictions=predictions.select(predictions[name].alias("label"), 'rawPrediction', 'prediction', 'probability')
    print('Classification model for ' + str(name))
    print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))
    recall, precision, f1 = recall_precision(predictions)
    print("Precision: " + str(precision))
    print("Recall: " + str(recall))
    print("F1 score: " + str(f1))

Classification model for toxic
Test Area Under ROC: 0.848597111759121
Precision: 0.20742213386348576
Recall: 0.8402684563758389
F1 score: 0.332713260696253
Classification model for severe_toxic
Test Area Under ROC: 0.942338060770156
Precision: 0.20607375271149675
Recall: 0.625
F1 score: 0.3099510603588907
Classification model for obscene
Test Area Under ROC: 0.8910953261677094
Precision: 0.5727513227513228
Recall: 0.5460277427490542
F1 score: 0.5590703679793415
Classification model for threat
Test Area Under ROC: 0.8319449172116989
Precision: 0.017482517482517484
Recall: 0.11363636363636363
F1 score: 0.030303030303030304
Classification model for insult
Test Area Under ROC: 0.8891314200800602
Precision: 0.4133083411433927
Recall: 0.5810276679841897
F1 score: 0.4830230010952903
Classification model for identity_hate
Test Area Under ROC: 0.9166439923855264
Precision: 0.10935441370223979
Recall: 0.6287878787878788
F1 score: 0.186307519640853


In [30]:
for name in class_name:
    train=rescaledData_2gram.select("features_2gram",name)
    test = rescaledData_test_2gram.select("features_2gram",name)
    ratio=weight_ratio(train,name)
    train=train.withColumn("weight", F.when(train[name]==1,ratio).otherwise(1))
    ##changed maxIter to 5 for faster calculation
    lr = LogisticRegression(featuresCol = 'features_2gram',weightCol="weight",labelCol = name, maxIter=10,regParam=0.03, elasticNetParam=1)
    lrModel = lr.fit(train)
    #save model, not yet tested
    #lrModel.save([spark_context], [file_path])
    predictions = lrModel.transform(test)
    #predictions.select(class_name, 'rawPrediction', 'prediction', 'probability').show(10)
    predictions=predictions.select(predictions[name].alias("label"), 'rawPrediction', 'prediction', 'probability')
    print('Classification model for ' + str(name))
    print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))
    recall, precision, f1 = recall_precision(predictions)
    print("Precision: " + str(precision))
    print("Recall: " + str(recall))
    print("F1 score: " + str(f1))

Classification model for toxic
Test Area Under ROC: 0.5593419242023056
Precision: 0.10686188811188811
Recall: 0.9845637583892617
F1 score: 0.19279800236561967
Classification model for severe_toxic
Test Area Under ROC: 0.730005946219938
Precision: 0.2109375
Recall: 0.17763157894736842
F1 score: 0.19285714285714284
Classification model for obscene
Test Area Under ROC: 0.6132865607783878
Precision: 0.5987261146496815
Recall: 0.11853720050441362
F1 score: 0.19789473684210523
Classification model for threat
Test Area Under ROC: 0.6503303630042103
Precision: 0.15384615384615385
Recall: 0.045454545454545456
F1 score: 0.07017543859649122
Classification model for insult
Test Area Under ROC: 0.5971463473821953
Precision: 0.05807275047862157
Recall: 0.9591567852437418
F1 score: 0.1095148552087251
Classification model for identity_hate
Test Area Under ROC: 0.5756487901615018
Precision: 0.0851063829787234
Recall: 0.09090909090909091
F1 score: 0.08791208791208792


In [31]:
for name in class_name:
    train=rescaledData_all.select("features_all",name)
    test = rescaledData_all_test.select("features_all",name)
    ratio=weight_ratio(train,name)
    train=train.withColumn("weight", F.when(train[name]==1,ratio).otherwise(1))
    ##changed maxIter to 5 for faster calculation
    lr = LogisticRegression(featuresCol = 'features_all',weightCol="weight",labelCol = name, maxIter=10,regParam=0.03, elasticNetParam=1)
    lrModel = lr.fit(train)
    #save model, not yet tested
    #lrModel.save([spark_context], [file_path])
    predictions = lrModel.transform(test)
    #predictions.select(class_name, 'rawPrediction', 'prediction', 'probability').show(10)
    predictions=predictions.select(predictions[name].alias("label"), 'rawPrediction', 'prediction', 'probability')
    print('Classification model for ' + str(name))
    print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))
    recall, precision, f1 = recall_precision(predictions)
    print("Precision: " + str(precision))
    print("Recall: " + str(recall))
    print("F1 score: " + str(f1))

Classification model for toxic
Test Area Under ROC: 0.8538930843934898
Precision: 0.21015820149875103
Recall: 0.8447121820615796
F1 score: 0.3365782104280571
Classification model for severe_toxic
Test Area Under ROC: 0.9465591972711515
Precision: 0.1800356506238859
Recall: 0.6644736842105263
F1 score: 0.28330995792426367
Classification model for obscene
Test Area Under ROC: 0.892818984011728
Precision: 0.5241635687732342
Recall: 0.7085427135678392
F1 score: 0.6025641025641025
Classification model for threat
Test Area Under ROC: 0.8463264798942696
Precision: 0.09859154929577464
Recall: 0.1590909090909091
F1 score: 0.1217391304347826
Classification model for insult
Test Area Under ROC: 0.890067033553443
Precision: 0.1368909512761021
Recall: 0.8528252299605782
F1 score: 0.2359142130134497
Classification model for identity_hate
Test Area Under ROC: 0.9117853586163541
Precision: 0.10096153846153846
Recall: 0.6363636363636364
F1 score: 0.17427385892116182


## 5.2 Prediction

In [None]:

# Load documents (one per line).
#documents = sc.textFile("data/mllib/kmeans_data.txt").map(lambda line: line.split(" "))

hashingTF = HashingTF()
tf = hashingTF.transform(df_train.rdd)

# While applying HashingTF only needs a single pass to the data, applying IDF needs two passes:
# First to compute the IDF vector and second to scale the term frequencies by IDF.
tf.cache()
idf = IDF().fit(tf)
tfidf = idf.transform(tf)

# spark.mllib's IDF implementation provides an option for ignoring terms
# which occur in less than a minimum number of documents.
# In such cases, the IDF for these terms is set to 0.
# This feature can be used by passing the minDocFreq value to the IDF constructor.
idfIgnore = IDF(minDocFreq=2).fit(tf)
tfidfIgnore = idfIgnore.transform(tf)