In [41]:
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

import pandas as pd
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression

from sklearn.metrics import f1_score, accuracy_score, precision_score, recall_score


In [2]:
spark = (SparkSession.builder
                  .appName('MLBD Comment Classification')
                  .enableHiveSupport()
                  .config("spark.executor.memory", "6G")
                  .config("spark.driver.memory","18G")
                  .config("spark.executor.cores","7")
                  .config("spark.python.worker.memory","6G")
                  .config("spark.driver.maxResultSize","0")
                  .config("spark.sql.crossJoin.enabled", "true")
                  .config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
                  .config("spark.default.parallelism","2")
                  .getOrCreate())

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/11 14:42:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
df = pd.read_csv('./hw4/train.csv')
df.comment_text.fillna("", inplace=True)

In [49]:
train_pd = df.sample(frac=0.75,random_state=42)
test_pd = df.drop(train_pd.index)

In [50]:
train, test = spark.createDataFrame(train_pd), spark.createDataFrame(test_pd)

In [9]:
with open('./hw4/stop_words.txt') as file:
    stop_words = list(map(lambda x: x.replace('"', '').strip(), file.readline().split(",")))

In [51]:
tokenizer = RegexTokenizer(inputCol="comment_text", outputCol="words", pattern="\\W")

In [52]:
stop_words_remover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(stop_words)

22/12/11 14:59:23 WARN StopWordsRemover: Default locale set was [en_BY]; however, it was not found in available locales in JVM, falling back to en_US locale. Set param `locale` in order to respect another locale.


In [53]:
hashingTF = HashingTF(inputCol="filtered", outputCol="filtered_hashed")

In [54]:
idf = IDF(inputCol="filtered_hashed", outputCol="features")

In [55]:
pipeline = Pipeline(stages=[tokenizer, stop_words_remover, hashingTF, idf])
pipeline_fit = pipeline.fit(train)

dataset = pipeline_fit.transform(train)

22/12/11 14:59:24 WARN TaskSetManager: Stage 424 contains a task of very large size (25197 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

In [56]:
dataset.select("features").head(1)

22/12/11 14:59:29 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/12/11 14:59:29 WARN TaskSetManager: Stage 425 contains a task of very large size (25197 KiB). The maximum recommended task size is 1000 KiB.


[Stage 425:>                                                        (0 + 1) / 1]

22/12/11 14:59:33 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 425 (TID 746): Attempting to kill Python Worker


                                                                                

[Row(features=SparseVector(262144, {37521: 5.7213, 62312: 7.1493, 76792: 9.4953, 92032: 3.9228, 109753: 4.1504, 120328: 6.0296, 134257: 8.109, 142437: 8.0037, 161102: 4.5049, 168503: 15.958, 186022: 5.9921, 194979: 5.0593, 200010: 10.594, 214718: 7.4585, 224040: 4.3595, 229412: 9.0535, 229821: 6.8174, 231185: 8.6015, 231304: 6.6886, 246215: 6.8804}))]

In [57]:
regParam = 0.1

In [58]:
dataset_test = pipeline_fit.transform(test)

In [63]:
test_res = test.select('id')
test_res.head()

22/12/11 15:03:39 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/12/11 15:03:39 WARN TaskSetManager: Stage 868 contains a task of very large size (25197 KiB). The maximum recommended task size is 1000 KiB.
22/12/11 15:03:40 WARN TaskSetManager: Stage 869 contains a task of very large size (8395 KiB). The maximum recommended task size is 1000 KiB.


[Stage 868:>                                                        (0 + 2) / 2]                                                                                

22/12/11 15:03:40 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/12/11 15:03:40 WARN TaskSetManager: Stage 870 contains a task of very large size (25197 KiB). The maximum recommended task size is 1000 KiB.


[Stage 869:>                                                        (0 + 1) / 1]                                                                                

22/12/11 15:03:41 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/12/11 15:03:41 WARN TaskSetManager: Stage 871 contains a task of very large size (25197 KiB). The maximum recommended task size is 1000 KiB.
22/12/11 15:03:41 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/12/11 15:03:42 WARN TaskSetManager: Stage 872 contains a task of very large size (25197 KiB). The maximum recommended task size is 1000 KiB.


[Stage 869:>                                                        (0 + 1) / 1]                                                                                

22/12/11 15:03:42 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/12/11 15:03:42 WARN TaskSetManager: Stage 873 contains a task of very large size (25197 KiB). The maximum recommended task size is 1000 KiB.


[Stage 869:>                                                        (0 + 1) / 1]                                                                                

22/12/11 15:03:43 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/12/11 15:03:43 WARN TaskSetManager: Stage 874 contains a task of very large size (25197 KiB). The maximum recommended task size is 1000 KiB.


[Stage 869:>                                                        (0 + 1) / 1]                                                                                

22/12/11 15:03:44 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/12/11 15:03:44 WARN TaskSetManager: Stage 875 contains a task of very large size (25197 KiB). The maximum recommended task size is 1000 KiB.
22/12/11 15:03:44 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 869 (TID 1551): Attempting to kill Python Worker


Row(id='00025465d4725e87')

22/12/11 15:03:44 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/12/11 15:03:44 WARN TaskSetManager: Stage 876 contains a task of very large size (25197 KiB). The maximum recommended task size is 1000 KiB.
22/12/11 15:03:45 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/12/11 15:03:45 WARN TaskSetManager: Stage 877 contains a task of very large size (25197 KiB). The maximum recommended task size is 1000 KiB.
22/12/11 15:03:46 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/12/11 15:03:46 WARN TaskSetManager: Stage 878 contains a task of very large size (25197 KiB). The maximum recommended task size is 1000 KiB.




22/12/11 15:03:46 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/12/11 15:03:47 WARN TaskSetManager: Stage 879 contains a task of very large size (25197 KiB). The maximum recommended task size is 1000 KiB.




22/12/11 15:03:47 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/12/11 15:03:47 WARN TaskSetManager: Stage 880 contains a task of very large size (25197 KiB). The maximum recommended task size is 1000 KiB.
22/12/11 15:03:48 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/12/11 15:03:48 WARN TaskSetManager: Stage 881 contains a task of very large size (25197 KiB). The maximum recommended task size is 1000 KiB.


[Stage 881:>                                                        (0 + 2) / 2]                                                                                

22/12/11 15:03:49 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/12/11 15:03:49 WARN TaskSetManager: Stage 882 contains a task of very large size (25197 KiB). The maximum recommended task size is 1000 KiB.


[Stage 882:>                                                        (0 + 2) / 2]                                                                                

22/12/11 15:03:50 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB


[Stage 883:>                                                        (0 + 0) / 2]

22/12/11 15:03:51 WARN TaskSetManager: Stage 883 contains a task of very large size (25197 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

22/12/11 15:03:52 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/12/11 15:03:52 WARN TaskSetManager: Stage 884 contains a task of very large size (25197 KiB). The maximum recommended task size is 1000 KiB.


[Stage 884:>                                                        (0 + 2) / 2]                                                                                

22/12/11 15:03:53 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/12/11 15:03:53 WARN TaskSetManager: Stage 885 contains a task of very large size (25197 KiB). The maximum recommended task size is 1000 KiB.


[Stage 885:>                                                        (0 + 2) / 2]                                                                                

In [68]:
c

['toxic', 'severe_toxic', 'obscene', 'threat', 'insult', 'identity_hate']

In [70]:
for col in cols_to_predict:
    logreg = LogisticRegression(featuresCol='features', labelCol=col, regParam=regParam)
    model = logreg.fit(dataset)
    res = model.transform(dataset_test)
    test_res = test_res.join(res.select('id', 'prediction'), on="id")
    test_res = test_res.withColumnRenamed("prediction", col)
    test_res.show(5)

22/12/11 15:10:55 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/12/11 15:10:55 WARN TaskSetManager: Stage 1216 contains a task of very large size (25197 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

22/12/11 15:10:59 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/12/11 15:10:59 WARN TaskSetManager: Stage 1217 contains a task of very large size (25197 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

22/12/11 15:11:03 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/12/11 15:11:03 WARN TaskSetManager: Stage 1218 contains a task of very large size (25197 KiB). The maximum recommended task size is 1000 KiB.
22/12/11 15:11:04 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/12/11 15:11:04 WARN TaskSetManager: Stage 1219 contains a task of very large size (25197 KiB). The maximum recommended task size is 1000 KiB.
22/12/11 15:11:04 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/12/11 15:11:04 WARN TaskSetManager: Stage 1220 contains a task of very large size (25197 KiB). The maximum recommended task size is 1000 KiB.
22/12/11 15:11:04 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/12/11 15:11:04 WARN TaskSetManager: Stage 1221 contains a task of very large size (25197 KiB). The maximum recommended task size is 1000 KiB.
22/12/11 15:11:04 WARN DAGScheduler: Broadcasting large task binary with siz

22/12/11 15:11:11 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/12/11 15:11:11 WARN TaskSetManager: Stage 1254 contains a task of very large size (25197 KiB). The maximum recommended task size is 1000 KiB.
22/12/11 15:11:11 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/12/11 15:11:11 WARN TaskSetManager: Stage 1255 contains a task of very large size (25197 KiB). The maximum recommended task size is 1000 KiB.
22/12/11 15:11:11 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/12/11 15:11:11 WARN TaskSetManager: Stage 1256 contains a task of very large size (25197 KiB). The maximum recommended task size is 1000 KiB.
22/12/11 15:11:11 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
22/12/11 15:11:11 WARN TaskSetManager: Stage 1257 contains a task of very large size (25197 KiB). The maximum recommended task size is 1000 KiB.
22/12/11 15:11:11 WARN DAGScheduler: Broadcasting large task binary with siz

[Stage 1269:> (0 + 2) / 2][Stage 1270:> (0 + 2) / 2][Stage 1271:> (0 + 2) / 2]2]

22/12/11 15:11:18 WARN TaskSetManager: Stage 1273 contains a task of very large size (8395 KiB). The maximum recommended task size is 1000 KiB.
22/12/11 15:11:18 WARN TaskSetManager: Stage 1274 contains a task of very large size (8395 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

+----------------+-----+------------+-------+------+------+-------------+
|              id|toxic|severe_toxic|obscene|threat|insult|identity_hate|
+----------------+-----+------------+-------+------+------+-------------+
|002264ea4d5f2887|  0.0|         0.0|    0.0|   0.0|   0.0|          0.0|
|0310c62027c1cc81|  0.0|         0.0|    0.0|   0.0|   0.0|          0.0|
|03effbaf048d353d|  0.0|         0.0|    0.0|   0.0|   0.0|          0.0|
|03f1f91ce9efe2c4|  0.0|         0.0|    0.0|   0.0|   0.0|          0.0|
|04d0cce9eb0667a8|  0.0|         0.0|    0.0|   0.0|   0.0|          0.0|
+----------------+-----+------------+-------+------+------+-------------+
only showing top 5 rows



In [71]:
test_res_pd = test_res.toPandas()

22/12/11 15:12:10 WARN TaskSetManager: Stage 1289 contains a task of very large size (8395 KiB). The maximum recommended task size is 1000 KiB.
22/12/11 15:12:10 WARN DAGScheduler: Broadcasting large task binary with size 5.4 MiB
22/12/11 15:12:10 WARN TaskSetManager: Stage 1290 contains a task of very large size (8395 KiB). The maximum recommended task size is 1000 KiB.
22/12/11 15:12:10 WARN DAGScheduler: Broadcasting large task binary with size 5.4 MiB
22/12/11 15:12:10 WARN TaskSetManager: Stage 1291 contains a task of very large size (8395 KiB). The maximum recommended task size is 1000 KiB.
22/12/11 15:12:10 WARN DAGScheduler: Broadcasting large task binary with size 5.4 MiB
22/12/11 15:12:10 WARN DAGScheduler: Broadcasting large task binary with size 5.4 MiB
22/12/11 15:12:10 WARN TaskSetManager: Stage 1292 contains a task of very large size (8395 KiB). The maximum recommended task size is 1000 KiB.
22/12/11 15:12:11 WARN DAGScheduler: Broadcasting large task binary with size 5.

[Stage 1290:> (0 + 2) / 2][Stage 1291:> (0 + 2) / 2][Stage 1292:> (0 + 2) / 2]

22/12/11 15:12:14 WARN TaskSetManager: Stage 1294 contains a task of very large size (8395 KiB). The maximum recommended task size is 1000 KiB.
22/12/11 15:12:14 WARN TaskSetManager: Stage 1295 contains a task of very large size (8395 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

In [72]:
test_res_pd

Unnamed: 0,id,toxic,severe_toxic,obscene,threat,insult,identity_hate
0,002264ea4d5f2887,0.0,0.0,0.0,0.0,0.0,0.0
1,0310c62027c1cc81,0.0,0.0,0.0,0.0,0.0,0.0
2,03effbaf048d353d,0.0,0.0,0.0,0.0,0.0,0.0
3,03f1f91ce9efe2c4,0.0,0.0,0.0,0.0,0.0,0.0
4,04d0cce9eb0667a8,0.0,0.0,0.0,0.0,0.0,0.0
...,...,...,...,...,...,...,...
39888,ae4d2effbdb0f8fc,0.0,0.0,0.0,0.0,0.0,0.0
39889,b437244bf9f0ffb5,1.0,0.0,0.0,0.0,0.0,0.0
39890,d43a518f105302fe,0.0,0.0,0.0,0.0,0.0,0.0
39891,fb57e4559f38e03b,0.0,0.0,0.0,0.0,0.0,0.0


In [73]:
test_res_pd = test_res_pd.sort_values(by=['id'])

In [74]:
c

In [75]:
test_res_pd.toxic = test_res_pd.toxic.astype(int)
test_res_pd.severe_toxic = test_res_pd.severe_toxic.astype(int)
test_res_pd.obscene = test_res_pd.obscene.astype(int)
test_res_pd.threat = test_res_pd.threat.astype(int)
test_res_pd.insult = test_res_pd.insult.astype(int)
test_res_pd.identity_hate = test_res_pd.identity_hate.astype(int)

In [76]:
test_res_pd

Unnamed: 0,id,toxic,severe_toxic,obscene,threat,insult,identity_hate
36098,00013fa6fb6ef643,0,0,0,0,0,0
11656,00025465d4725e87,0,0,0,0,0,0
35243,0002eeaf4c0cdf35,0,0,0,0,0,0
18272,00040093b2687caa,0,0,0,0,0,0
3678,0005300084f90edc,0,0,0,0,0,0
...,...,...,...,...,...,...,...
27058,fff5d08a356a9fde,0,0,0,0,0,0
29825,fff880e2b149dc13,0,0,0,0,0,0
21592,fff90f6920245ab8,0,0,0,0,0,0
20232,ffff3a700c54e047,0,0,0,0,0,0


In [77]:
test_res_real

Unnamed: 0,id,comment_text,toxic,severe_toxic,obscene,threat,insult,identity_hate
95737,00013fa6fb6ef643,"Wehwalt, FTR, I'm not objecting to text about ...",0,0,0,0,0,0
5,00025465d4725e87,"""\n\nCongratulations from me as well, use the ...",0,0,0,0,0,0
95739,0002eeaf4c0cdf35,But isnt it against the rules to edit if you a...,0,0,0,0,0,0
9,00040093b2687caa,alignment on this subject and which are contra...,0,0,0,0,0,0
10,0005300084f90edc,"""\nFair use rationale for Image:Wonju.jpg\n\nT...",0,0,0,0,0,0
...,...,...,...,...,...,...,...,...
95719,fff5d08a356a9fde,"Expansion \n\nI can handle the expansion, give...",0,0,0,0,0,0
95725,fff880e2b149dc13,February 2006 (UTC))\n\nWell if you would just...,0,0,0,0,0,0
143604,fff90f6920245ab8,How about edit the broken links of Incident at...,0,0,0,0,0,0
143606,ffff3a700c54e047,"""\n\nI am glad I can amuse you. Perhaps your t...",0,0,0,0,0,0


In [78]:
print(f1_score(test_res_pd.toxic, test_res_real.toxic))
print(recall_score(test_res_pd.toxic, test_res_real.toxic))
print(precision_score(test_res_pd.toxic, test_res_real.toxic))
print(accuracy_score(test_res_pd.toxic, test_res_real.toxic))

0.2571669477234402
0.7151230949589683
0.15677203803649448
0.9116637003985661


In [79]:
print(f1_score(test_res_pd.severe_toxic, test_res_real.severe_toxic))
print(recall_score(test_res_pd.severe_toxic, test_res_real.severe_toxic))
print(precision_score(test_res_pd.severe_toxic, test_res_real.severe_toxic))
print(accuracy_score(test_res_pd.severe_toxic, test_res_real.severe_toxic))

0.1032258064516129
0.43636363636363634
0.05853658536585366
0.9895470383275261


In [80]:
print(f1_score(test_res_pd.obscene, test_res_real.obscene))
print(recall_score(test_res_pd.obscene, test_res_real.obscene))
print(precision_score(test_res_pd.obscene, test_res_real.obscene))
print(accuracy_score(test_res_pd.obscene, test_res_real.obscene))

0.17730496453900707
0.596816976127321
0.10411846367422489
0.9476599904745193


In [81]:
print(f1_score(test_res_pd.threat, test_res_real.threat))
print(recall_score(test_res_pd.threat, test_res_real.threat))
print(precision_score(test_res_pd.threat, test_res_real.threat))
print(accuracy_score(test_res_pd.threat, test_res_real.threat))

0.08108108108108109
0.3157894736842105
0.046511627906976744
0.9965908806056201


In [82]:
print(f1_score(test_res_pd.identity_hate, test_res_real.identity_hate))
print(recall_score(test_res_pd.identity_hate, test_res_real.identity_hate))
print(precision_score(test_res_pd.identity_hate, test_res_real.identity_hate))
print(accuracy_score(test_res_pd.identity_hate, test_res_real.identity_hate))

0.019753086419753086
0.09523809523809523
0.011019283746556474
0.990048379414935
