In [207]:
import findspark
findspark.init()
import pyspark as ps
import warnings
from pyspark.sql import SQLContext
from nltk.stem.snowball import SnowballStemmer
import pyspark.sql.functions as f
from pyspark.sql.functions import trim
from pyspark.sql.types import DoubleType,DateType

In [2]:
try:
    # create SparkContext on all CPUs available: in my case I have 4 CPUs on my laptop
    sc = ps.SparkContext('local[4]')
    sqlContext = SQLContext(sc)
    print("Just created a SparkContext")
except ValueError:
    warnings.warn("SparkContext already exists in this scope")



Just created a SparkContext


In [181]:
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('dataset/train.csv')
df_test = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('dataset/test.csv')
type(df)

pyspark.sql.dataframe.DataFrame

In [182]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- comment_text: string (nullable = true)
 |-- toxic: string (nullable = true)
 |-- severe_toxic: string (nullable = true)
 |-- obscene: string (nullable = true)
 |-- threat: string (nullable = true)
 |-- insult: string (nullable = true)
 |-- identity_hate: string (nullable = true)



In [183]:
df = df.dropna()
# df = df.select(trim("id"), 'comment_text', 'toxic', 'severe_toxic', 'obscene', 'threat', 'insult', 'identity_hate')
df = df.withColumn("id", trim(df. id))
df.count()

124633

In [184]:
df_test = df_test.dropna()
df_test = df_test.withColumn("id", trim(df_test. id))
df_test.count()

253637

In [185]:
df.registerTempTable("df");
df = sqlContext.sql("SELECT * FROM df WHERE LENGTH(toxic) == 1 AND LENGTH(id) == 16")



In [186]:
df_test.registerTempTable("df_test");
df_new_test = sqlContext.sql("SELECT * FROM df_test WHERE LENGTH(id) == 16")

In [209]:
df = df.withColumn("toxic",df.toxic.cast(DoubleType()))
df = df.withColumn("severe_toxic",df.severe_toxic.cast(DoubleType()))
df = df.withColumn("obscene",df.obscene.cast(DoubleType()))
df = df.withColumn("threat",df.threat.cast(DoubleType()))
df = df.withColumn("insult",df.insult.cast(DoubleType()))
df = df.withColumn("identity_hate",df.identity_hate.cast(DoubleType()))

In [210]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- comment_text: string (nullable = true)
 |-- toxic: double (nullable = true)
 |-- severe_toxic: double (nullable = true)
 |-- obscene: double (nullable = true)
 |-- threat: double (nullable = true)
 |-- insult: double (nullable = true)
 |-- identity_hate: double (nullable = true)



In [211]:
df.show()

+----------------+--------------------+-----+------------+-------+------+------+-------------+
|              id|        comment_text|toxic|severe_toxic|obscene|threat|insult|identity_hate|
+----------------+--------------------+-----+------------+-------+------+------+-------------+
|000103f0d9cfb60f|D'aww! He matches...|  0.0|         0.0|    0.0|   0.0|   0.0|          0.0|
|000113f07ec002fd|Hey man, I'm real...|  0.0|         0.0|    0.0|   0.0|   0.0|          0.0|
|0001d958c54c6e35|You, sir, are my ...|  0.0|         0.0|    0.0|   0.0|   0.0|          0.0|
|0002bcb3da6cb337|COCKSUCKER BEFORE...|  1.0|         1.0|    1.0|   0.0|   1.0|          0.0|
|00031b1e95af7921|Your vandalism to...|  0.0|         0.0|    0.0|   0.0|   0.0|          0.0|
|00037261f536c51d|Sorry if the word...|  0.0|         0.0|    0.0|   0.0|   0.0|          0.0|
|00040093b2687caa|alignment on this...|  0.0|         0.0|    0.0|   0.0|   0.0|          0.0|
|00070ef96486d6f9|Oh, and the girl ...|  0.0|     

In [212]:
df_new_test.show()

+----------------+--------------------+
|              id|        comment_text|
+----------------+--------------------+
|00001cee341fdb12|Yo bitch Ja Rule ...|
|0000247867823ef7|     == From RfC == |
|00013b17ad220c46|                   "|
|00017563c3f7919a|:If you have a lo...|
|00017695ad8997eb|I don't anonymous...|
|0001ea8717f6de06|Thank you for und...|
|00024115d4cbde0f|Please do not add...|
|000247e83dcc1211|:Dear god this si...|
|00025358d4737918|                   "|
|00026d1092fe71cc|== Double Redirec...|
|0002eadc3b301559|I think its crap ...|
|0002f87b16116a7f|"""::: Somebody w...|
|0003806b11932181|, 25 February 201...|
|0003e1cccfd5a40a|                   "|
|00059ace3e3e9a53|                   "|
|000634272d0d44eb|==Current Positio...|
|000663aff0fffc80|this other one fr...|
|000689dd34e20979|== Reason for ban...|
|000834769115370c|:: Wallamoose was...|
|000844b52dee5f3f||blocked]] from e...|
+----------------+--------------------+
only showing top 20 rows



In [213]:
(train_set, val_set) = df_new.randomSplit([0.98, 0.02], seed = 2000)

In [214]:
train_y = train_set.drop('id').drop('comment_text')

In [215]:
train_y.show()

+-----+------------+-------+------+------+-------------+
|toxic|severe_toxic|obscene|threat|insult|identity_hate|
+-----+------------+-------+------+------+-------------+
|    0|           0|      0|     0|     0|            0|
|    0|           0|      0|     0|     0|            0|
|    0|           0|      0|     0|     0|            0|
|    1|           1|      1|     0|     1|            0|
|    0|           0|      0|     0|     0|            0|
|    0|           0|      0|     0|     0|            0|
|    0|           0|      0|     0|     0|            0|
|    0|           0|      0|     0|     0|            0|
|    0|           0|      0|     0|     0|            0|
|    0|           0|      0|     0|     0|            0|
|    0|           0|      0|     0|     0|            0|
|    0|           0|      0|     0|     0|            0|
|    0|           0|      0|     0|     0|            0|
|    0|           0|      0|     0|     0|            0|
|    0|           0|      0|   

In [216]:
train_x = train_set.select('id','comment_text')

In [217]:
train_x.show()

+----------------+--------------------+
|              id|        comment_text|
+----------------+--------------------+
|000103f0d9cfb60f|D'aww! He matches...|
|000113f07ec002fd|Hey man, I'm real...|
|0001d958c54c6e35|You, sir, are my ...|
|0002bcb3da6cb337|COCKSUCKER BEFORE...|
|00031b1e95af7921|Your vandalism to...|
|00037261f536c51d|Sorry if the word...|
|00040093b2687caa|alignment on this...|
|00070ef96486d6f9|Oh, and the girl ...|
|000897889268bc93|REDIRECT Talk:Voy...|
|0009801bd85e5806|The Mitsurugi poi...|
|000f35deef84dc4a|There's no need t...|
|000ffab30195c5e1|Yes, because the ...|
|0011cc71398479c4|How could I post ...|
|00128363e367d703|Not sure about a ...|
|0015f4aa35ebe9b5|pretty much every...|
|00169857adbc989b|Hi Explicit, can ...|
|00190820581d90ce|FUCK YOUR FILTHY ...|
|001c419c445b5a59|You had a point, ...|
|001c557175094f10|In other words, y...|
|001dc38a83d420cf|GET FUCKED UP. GE...|
+----------------+--------------------+
only showing top 20 rows



In [218]:
val_y = val_set.drop('id').drop('comment_text')

In [219]:
val_x = val_set.select('comment_text')

In [220]:
test_x = df_new_test.select('id','comment_text')

In [221]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

tokenizer = Tokenizer(inputCol="comment_text", outputCol="words")
hashtf = HashingTF(numFeatures=2**16, inputCol="words", outputCol='tf')
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
# label_stringIdx = StringIndexer(inputCol = "target", outputCol = "label")
pipeline = Pipeline(stages=[tokenizer, hashtf, idf])

pipelineFit_X = pipeline.fit(df)
pipelineFit_VAL = pipeline.fit(val_x)
pipelineFit_X_test = pipeline.fit(test_x)
train_X = pipelineFit_X.transform(df)
val_X = pipelineFit_VAL.transform(val_x)
test_X = pipelineFit_X.transform(test_x)
train_X.show(100)

+----------------+--------------------+-----+------------+-------+------+------+-------------+--------------------+--------------------+--------------------+
|              id|        comment_text|toxic|severe_toxic|obscene|threat|insult|identity_hate|               words|                  tf|            features|
+----------------+--------------------+-----+------------+-------+------+------+-------------+--------------------+--------------------+--------------------+
|000103f0d9cfb60f|D'aww! He matches...|  0.0|         0.0|    0.0|   0.0|   0.0|          0.0|[d'aww!, he, matc...|(65536,[2195,4714...|(65536,[2195,4714...|
|000113f07ec002fd|Hey man, I'm real...|  0.0|         0.0|    0.0|   0.0|   0.0|          0.0|[hey, man,, i'm, ...|(65536,[6589,1001...|(65536,[6589,1001...|
|0001d958c54c6e35|You, sir, are my ...|  0.0|         0.0|    0.0|   0.0|   0.0|          0.0|[you,, sir,, are,...|(65536,[389,2762,...|(65536,[389,2762,...|
|0002bcb3da6cb337|COCKSUCKER BEFORE...|  1.0|       

In [224]:
train_X.printSchema()

root
 |-- id: string (nullable = true)
 |-- comment_text: string (nullable = true)
 |-- toxic: double (nullable = true)
 |-- severe_toxic: double (nullable = true)
 |-- obscene: double (nullable = true)
 |-- threat: double (nullable = true)
 |-- insult: double (nullable = true)
 |-- identity_hate: double (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- tf: vector (nullable = true)
 |-- features: vector (nullable = true)



In [177]:
val_X.show()

+--------------------+--------------------+--------------------+--------------------+
|        comment_text|               words|                  tf|            features|
+--------------------+--------------------+--------------------+--------------------+
|For your informat...|[for, your, infor...|(65536,[649,1009,...|(65536,[649,1009,...|
|Same for File:Sea...|[same, for, file:...|(65536,[11104,233...|(65536,[11104,233...|
|the episode list ...|[the, episode, li...|(65536,[9389,1437...|(65536,[9389,1437...|
|The real personal...|[the, real, perso...|(65536,[3085,1863...|(65536,[3085,1863...|
|Vince - FYI 'Yell...|[vince, -, fyi, '...|(65536,[568,711,1...|(65536,[568,711,1...|
|Hahaha, you dont ...|[hahaha,, you, do...|(65536,[1903,2026...|(65536,[1903,2026...|
|Helpme is not to ...|[helpme, is, not,...|(65536,[438,989,6...|(65536,[438,989,6...|
|I'm not going to ...|[i'm, not, going,...|(65536,[7791,1001...|(65536,[7791,1001...|
|I know what you'r...|[i, know, what, y...|(65536,[835

In [192]:
train_toxic = train_X.select('features','toxic')

In [205]:
train_X.show()

+----------------+--------------------+-----+------------+-------+------+------+-------------+--------------------+--------------------+--------------------+
|              id|        comment_text|toxic|severe_toxic|obscene|threat|insult|identity_hate|               words|                  tf|            features|
+----------------+--------------------+-----+------------+-------+------+------+-------------+--------------------+--------------------+--------------------+
|000103f0d9cfb60f|D'aww! He matches...|    0|           0|      0|     0|     0|            0|[d'aww!, he, matc...|(65536,[2195,4714...|(65536,[2195,4714...|
|000113f07ec002fd|Hey man, I'm real...|    0|           0|      0|     0|     0|            0|[hey, man,, i'm, ...|(65536,[6589,1001...|(65536,[6589,1001...|
|0001d958c54c6e35|You, sir, are my ...|    0|           0|      0|     0|     0|            0|[you,, sir,, are,...|(65536,[389,2762,...|(65536,[389,2762,...|
|0002bcb3da6cb337|COCKSUCKER BEFORE...|    1|       

In [223]:
from pyspark.ml.classification import LogisticRegression

#Apply the logistic regression model
log_reg=LogisticRegression(featuresCol='features', labelCol='toxic').fit(train_X)

#Training Results
train_results=log_reg.evaluate(training_df).predictions
train_results.filter(train_results['toxic']==1).filter(train_results['prediction']==1).select(['toxic','prediction','probability']).show(10,False)

Py4JJavaError: An error occurred while calling o4177.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 159.0 failed 1 times, most recent failure: Lost task 1.0 in stage 159.0 (TID 328) (DESKTOP-ITN2I8Q executor driver): scala.MatchError: [null,1.0,(65536,[10790],[0.0])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
	at org.apache.spark.ml.PredictorParams.$anonfun$extractInstances$1(Predictor.scala:81)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:260)
	at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:260)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1431)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$4(RDD.scala:1236)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$6(RDD.scala:1237)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2323)
	at org.apache.spark.rdd.RDD.$anonfun$fold$1(RDD.scala:1174)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.fold(RDD.scala:1168)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$2(RDD.scala:1267)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1228)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$1(RDD.scala:1214)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1214)
	at org.apache.spark.ml.stat.Summarizer$.getClassificationSummarizers(Summarizer.scala:233)
	at org.apache.spark.ml.classification.LogisticRegression.$anonfun$train$1(LogisticRegression.scala:512)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:496)
	at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:286)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:151)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: scala.MatchError: [null,1.0,(65536,[10790],[0.0])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
	at org.apache.spark.ml.PredictorParams.$anonfun$extractInstances$1(Predictor.scala:81)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:260)
	at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:260)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1431)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$4(RDD.scala:1236)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$6(RDD.scala:1237)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more
