# Preprocessing

In [1]:
import pandas as pd

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.types import StructType, StringType, IntegerType, StructField
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.ml.classification import LogisticRegression

Create spark session

In [2]:
spark = (SparkSession.builder.appName("toxiccomment")
                            .enableHiveSupport()
                            .config("spark.executor.memory", "4G")
                            .config("spark.driver.memory","18G")
                            .config("spark.executor.cores","7")
                            .config("spark.python.worker.memory","4G")
                            .config("spark.driver.maxResultSize","0")
                            .config("spark.sql.crossJoin.enabled", "true")
                            .config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
                            .config("spark.default.parallelism","2").getOrCreate())

In [3]:
train_schema = StructType([
    StructField("id", StringType(), True),
    StructField("comment_text", StringType(), True),
    StructField("toxic", IntegerType(), True),
    StructField("severe_toxic", IntegerType(), True),
    StructField("obscene", IntegerType(), True),
    StructField("threat", IntegerType(), True),
    StructField("insult", IntegerType(), True),
    StructField("identity_hate", IntegerType(), True)
])
df_train = spark.read.csv('train.csv', header=True, schema=train_schema, multiLine=True, escape="\"")

test_schema = StructType([
    StructField("id", StringType(), True),
    StructField("comment_text", StringType(), True)
])
df_test = spark.read.csv('test.csv', header=True, schema=test_schema, multiLine=True, escape="\"")

df = spark.read.csv('test_labels.csv', header=True, inferSchema=True)

In [4]:
df.show()

+----------------+-----+------------+-------+------+------+-------------+
|              id|toxic|severe_toxic|obscene|threat|insult|identity_hate|
+----------------+-----+------------+-------+------+------+-------------+
|00001cee341fdb12|   -1|          -1|     -1|    -1|    -1|           -1|
|0000247867823ef7|   -1|          -1|     -1|    -1|    -1|           -1|
|00013b17ad220c46|   -1|          -1|     -1|    -1|    -1|           -1|
|00017563c3f7919a|   -1|          -1|     -1|    -1|    -1|           -1|
|00017695ad8997eb|   -1|          -1|     -1|    -1|    -1|           -1|
|0001ea8717f6de06|    0|           0|      0|     0|     0|            0|
|00024115d4cbde0f|   -1|          -1|     -1|    -1|    -1|           -1|
|000247e83dcc1211|    0|           0|      0|     0|     0|            0|
|00025358d4737918|   -1|          -1|     -1|    -1|    -1|           -1|
|00026d1092fe71cc|   -1|          -1|     -1|    -1|    -1|           -1|
|0002eadc3b301559|   -1|          -1| 

In [5]:
df_train.show()

+----------------+--------------------+-----+------------+-------+------+------+-------------+
|              id|        comment_text|toxic|severe_toxic|obscene|threat|insult|identity_hate|
+----------------+--------------------+-----+------------+-------+------+------+-------------+
|0000997932d777bf|Explanation\nWhy ...|    0|           0|      0|     0|     0|            0|
|000103f0d9cfb60f|D'aww! He matches...|    0|           0|      0|     0|     0|            0|
|000113f07ec002fd|Hey man, I'm real...|    0|           0|      0|     0|     0|            0|
|0001b41b1c6bb37e|"\nMore\nI can't ...|    0|           0|      0|     0|     0|            0|
|0001d958c54c6e35|You, sir, are my ...|    0|           0|      0|     0|     0|            0|
|00025465d4725e87|"\n\nCongratulati...|    0|           0|      0|     0|     0|            0|
|0002bcb3da6cb337|COCKSUCKER BEFORE...|    1|           1|      1|     0|     1|            0|
|00031b1e95af7921|Your vandalism to...|    0|     

In [6]:
df_test.show()

+----------------+--------------------+
|              id|        comment_text|
+----------------+--------------------+
|00001cee341fdb12|Yo bitch Ja Rule ...|
|0000247867823ef7|== From RfC == \n...|
|00013b17ad220c46|" \n\n == Sources...|
|00017563c3f7919a|:If you have a lo...|
|00017695ad8997eb|I don't anonymous...|
|0001ea8717f6de06|Thank you for und...|
|00024115d4cbde0f|Please do not add...|
|000247e83dcc1211|:Dear god this si...|
|00025358d4737918|" \n Only a fool ...|
|00026d1092fe71cc|== Double Redirec...|
|0002eadc3b301559|I think its crap ...|
|0002f87b16116a7f|"::: Somebody wil...|
|0003806b11932181|, 25 February 201...|
|0003e1cccfd5a40a|" \n\n It says it...|
|00059ace3e3e9a53|" \n\n == Before ...|
|000634272d0d44eb|==Current Positio...|
|000663aff0fffc80|this other one fr...|
|000689dd34e20979|== Reason for ban...|
|000834769115370c|:: Wallamoose was...|
|000844b52dee5f3f||blocked]] from e...|
+----------------+--------------------+
only showing top 20 rows



Check cache so we will read from disk once

In [7]:
df.cache()
df_train.cache()
df_test.cache()
print(df.is_cached)
print(df_train.is_cached)
print(df_test.is_cached)

True
True
True


Check schema

In [8]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- toxic: integer (nullable = true)
 |-- severe_toxic: integer (nullable = true)
 |-- obscene: integer (nullable = true)
 |-- threat: integer (nullable = true)
 |-- insult: integer (nullable = true)
 |-- identity_hate: integer (nullable = true)



In [9]:
df_train.printSchema()

root
 |-- id: string (nullable = true)
 |-- comment_text: string (nullable = true)
 |-- toxic: integer (nullable = true)
 |-- severe_toxic: integer (nullable = true)
 |-- obscene: integer (nullable = true)
 |-- threat: integer (nullable = true)
 |-- insult: integer (nullable = true)
 |-- identity_hate: integer (nullable = true)



In [10]:
df_test.printSchema()

root
 |-- id: string (nullable = true)
 |-- comment_text: string (nullable = true)



Check null value

In [11]:
from pyspark.sql.functions import col, count, when

df.select([count(when(col(c).isNull() , c)).alias(c) for c in df.columns]).show()
print(df.count())
df_train.select([count(when(col(c).isNull() , c)).alias(c) for c in df_train.columns]).show()
print(df_train.count())
df_test.select([count(when(col(c).isNull() , c)).alias(c) for c in df_test.columns]).show()
print(df_test.count())

+---+-----+------------+-------+------+------+-------------+
| id|toxic|severe_toxic|obscene|threat|insult|identity_hate|
+---+-----+------------+-------+------+------+-------------+
|  0|    0|           0|      0|     0|     0|            0|
+---+-----+------------+-------+------+------+-------------+

153164
+---+------------+-----+------------+-------+------+------+-------------+
| id|comment_text|toxic|severe_toxic|obscene|threat|insult|identity_hate|
+---+------------+-----+------------+-------+------+------+-------------+
|  0|           0|    0|           0|      0|     0|     0|            0|
+---+------------+-----+------------+-------+------+------+-------------+

159571
+---+------------+
| id|comment_text|
+---+------------+
|  0|           0|
+---+------------+

153164


# Modeling

Create a tokenizer:

In [12]:
tokenizer = Tokenizer(inputCol="comment_text", outputCol="words")
transformed_data = tokenizer.transform(df_train)
test_transformed_data = tokenizer.transform(df_test)

In [13]:
transformed_data.select(["comment_text", "words"]).show()
test_transformed_data.select(["comment_text", "words"]).show()

+--------------------+--------------------+
|        comment_text|               words|
+--------------------+--------------------+
|Explanation\nWhy ...|[explanation, why...|
|D'aww! He matches...|[d'aww!, he, matc...|
|Hey man, I'm real...|[hey, man,, i'm, ...|
|"\nMore\nI can't ...|[", more, i, can'...|
|You, sir, are my ...|[you,, sir,, are,...|
|"\n\nCongratulati...|[", , congratulat...|
|COCKSUCKER BEFORE...|[cocksucker, befo...|
|Your vandalism to...|[your, vandalism,...|
|Sorry if the word...|[sorry, if, the, ...|
|alignment on this...|[alignment, on, t...|
|"\nFair use ratio...|[", fair, use, ra...|
|bbq \n\nbe a man ...|[bbq, , , be, a, ...|
|Hey... what is it...|[hey..., what, is...|
|Before you start ...|[before, you, sta...|
|Oh, and the girl ...|[oh,, and, the, g...|
|"\n\nJuelz Santan...|[", , juelz, sant...|
|Bye! \n\nDon't lo...|[bye!, , , don't,...|
|REDIRECT Talk:Voy...|[redirect, talk:v...|
|The Mitsurugi poi...|[the, mitsurugi, ...|
|Don't mean to bot...|[don't, me

In [14]:
hashingTF = HashingTF(inputCol="words", outputCol="features")
tf = hashingTF.transform(transformed_data)
test_tf = hashingTF.transform(test_transformed_data)

In [15]:
tf.show()

+----------------+--------------------+-----+------------+-------+------+------+-------------+--------------------+--------------------+
|              id|        comment_text|toxic|severe_toxic|obscene|threat|insult|identity_hate|               words|            features|
+----------------+--------------------+-----+------------+-------+------+------+-------------+--------------------+--------------------+
|0000997932d777bf|Explanation\nWhy ...|    0|           0|      0|     0|     0|            0|[explanation, why...|(262144,[6240,722...|
|000103f0d9cfb60f|D'aww! He matches...|    0|           0|      0|     0|     0|            0|[d'aww!, he, matc...|(262144,[2195,471...|
|000113f07ec002fd|Hey man, I'm real...|    0|           0|      0|     0|     0|            0|[hey, man,, i'm, ...|(262144,[18700,27...|
|0001b41b1c6bb37e|"\nMore\nI can't ...|    0|           0|      0|     0|     0|            0|[", more, i, can'...|(262144,[11104,16...|
|0001d958c54c6e35|You, sir, are my ...|  

In [16]:
test_tf.show()

+----------------+--------------------+--------------------+--------------------+
|              id|        comment_text|               words|            features|
+----------------+--------------------+--------------------+--------------------+
|00001cee341fdb12|Yo bitch Ja Rule ...|[yo, bitch, ja, r...|(262144,[15716,18...|
|0000247867823ef7|== From RfC == \n...|[==, from, rfc, =...|(262144,[28328,30...|
|00013b17ad220c46|" \n\n == Sources...|[", , , , ==, sou...|(262144,[24695,38...|
|00017563c3f7919a|:If you have a lo...|[:if, you, have, ...|(262144,[19036,23...|
|00017695ad8997eb|I don't anonymous...|[i, don't, anonym...|(262144,[7221,828...|
|0001ea8717f6de06|Thank you for und...|[thank, you, for,...|(262144,[4522,190...|
|00024115d4cbde0f|Please do not add...|[please, do, not,...|(262144,[11853,27...|
|000247e83dcc1211|:Dear god this si...|[:dear, god, this...|(262144,[94851,10...|
|00025358d4737918|" \n Only a fool ...|[", , , only, a, ...|(262144,[1172,152...|
|00026d1092fe71c

## Building Logistic Regression to identify toxic comments

In [17]:
lr = LogisticRegression(featuresCol="features", labelCol="toxic")
lrModel = lr.fit(tf)

lrPrediction = lrModel.transform(test_tf)

In [18]:
lrPrediction.show(20)

+----------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|              id|        comment_text|               words|            features|       rawPrediction|         probability|prediction|
+----------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|00001cee341fdb12|Yo bitch Ja Rule ...|[yo, bitch, ja, r...|(262144,[15716,18...|[-57.914027262729...|[7.05111073244556...|       1.0|
|0000247867823ef7|== From RfC == \n...|[==, from, rfc, =...|(262144,[28328,30...|[4.98390268172049...|[0.99319927893242...|       0.0|
|00013b17ad220c46|" \n\n == Sources...|[", , , , ==, sou...|(262144,[24695,38...|[4.77661469909058...|[0.99164590867774...|       0.0|
|00017563c3f7919a|:If you have a lo...|[:if, you, have, ...|(262144,[19036,23...|[15.7158150926043...|[0.99999985047694...|       0.0|
|00017695ad8997eb|I don't anonymous...|[i, don't, anony

In [19]:
lrPrediction.count()

153164

In [20]:
extract_prob = F.udf(lambda x: float(x[1]), T.FloatType())

In [21]:
type(lrPrediction)

pyspark.sql.dataframe.DataFrame

In [22]:
lrPrediction.withColumn('proba', extract_prob(lrPrediction["probability"])).select("proba","prediction").show()

+-------------+----------+
|        proba|prediction|
+-------------+----------+
|          1.0|       1.0|
|  0.006800721|       0.0|
|  0.008354091|       0.0|
| 1.4952306E-7|       0.0|
| 2.8329346E-8|       0.0|
| 1.8986604E-5|       0.0|
| 3.787608E-10|       0.0|
|   0.64238024|       1.0|
|          1.0|       1.0|
|2.9318725E-10|       0.0|
|    0.9993772|       1.0|
|          0.0|       0.0|
| 2.220446E-16|       0.0|
|          0.0|       0.0|
|  2.194918E-8|       0.0|
|          0.0|       0.0|
|  0.020682989|       0.0|
|2.2182256E-13|       0.0|
|          0.0|       0.0|
|    0.7826813|       1.0|
+-------------+----------+
only showing top 20 rows



In [23]:
result = lrPrediction.select("id", "prediction").withColumn("toxic", lrPrediction["prediction"]).drop("prediction")

## Building Logistic Regression to identify the rest comments

In [24]:
criterias = ["severe_toxic", "obscene", "threat", "insult", "identity_hate"]
for c in criterias:
    lr = LogisticRegression(featuresCol="features", labelCol=c)
    lrModel = lr.fit(tf)

    lrPrediction = lrModel.transform(test_tf)
    result = result.join(lrPrediction.select('id', 'prediction'), on="id")
    result = result.withColumn(c, result["prediction"]).drop("prediction")
    result.show(5)

+----------------+-----+------------+
|              id|toxic|severe_toxic|
+----------------+-----+------------+
|000968ce11f5ee34|  0.0|         0.0|
|00491682330fdd1d|  1.0|         0.0|
|008eb47c4684d190|  1.0|         0.0|
|00d251f47486b6d2|  0.0|         0.0|
|0114ae82c53101a9|  1.0|         1.0|
+----------------+-----+------------+
only showing top 5 rows

+----------------+-----+------------+-------+
|              id|toxic|severe_toxic|obscene|
+----------------+-----+------------+-------+
|000968ce11f5ee34|  0.0|         0.0|    0.0|
|00491682330fdd1d|  1.0|         0.0|    0.0|
|008eb47c4684d190|  1.0|         0.0|    0.0|
|00d251f47486b6d2|  0.0|         0.0|    0.0|
|0114ae82c53101a9|  1.0|         1.0|    1.0|
+----------------+-----+------------+-------+
only showing top 5 rows

+----------------+-----+------------+-------+------+
|              id|toxic|severe_toxic|obscene|threat|
+----------------+-----+------------+-------+------+
|000968ce11f5ee34|  0.0|         0.

In [25]:
result.show(30)

+----------------+-----+------------+-------+------+------+-------------+
|              id|toxic|severe_toxic|obscene|threat|insult|identity_hate|
+----------------+-----+------------+-------+------+------+-------------+
|000968ce11f5ee34|  0.0|         0.0|    0.0|   0.0|   0.0|          0.0|
|00491682330fdd1d|  1.0|         0.0|    0.0|   0.0|   0.0|          0.0|
|008eb47c4684d190|  1.0|         0.0|    0.0|   0.0|   0.0|          0.0|
|00d251f47486b6d2|  0.0|         0.0|    0.0|   0.0|   0.0|          0.0|
|0114ae82c53101a9|  1.0|         1.0|    1.0|   0.0|   0.0|          0.0|
|012c7429c5a34290|  1.0|         0.0|    0.0|   0.0|   0.0|          0.0|
|015017ec394a264e|  1.0|         0.0|    0.0|   0.0|   0.0|          0.0|
|01d94c94a86a4327|  0.0|         0.0|    0.0|   0.0|   0.0|          0.0|
|020eb3a1af28453f|  1.0|         0.0|    1.0|   0.0|   1.0|          0.0|
|0216909e11cfeac0|  0.0|         0.0|    0.0|   0.0|   0.0|          1.0|
|026460a698a91698|  0.0|         0.0| 

In [26]:
result.coalesce(1).write.csv('result',header=True)