In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 44 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 57.2 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=deed904bfbfbe5bbc0c1ae7e9cdcf52bb6f7bba53cb116947f171e439c2bd64a
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1


In [2]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext

In [3]:
sc = SparkContext()
sqlContext = SQLContext(sc)



In [4]:
data = sqlContext.read.format('com.databricks.spark.csv').options(header='true', multiline = True, escape = '\"').load('train.csv')
data.show(5)

+----------------+--------------------+-----+------------+-------+------+------+-------------+
|              id|        comment_text|toxic|severe_toxic|obscene|threat|insult|identity_hate|
+----------------+--------------------+-----+------------+-------+------+------+-------------+
|0000997932d777bf|Explanation\nWhy ...|    0|           0|      0|     0|     0|            0|
|000103f0d9cfb60f|D'aww! He matches...|    0|           0|      0|     0|     0|            0|
|000113f07ec002fd|Hey man, I'm real...|    0|           0|      0|     0|     0|            0|
|0001b41b1c6bb37e|"\nMore\nI can't ...|    0|           0|      0|     0|     0|            0|
|0001d958c54c6e35|You, sir, are my ...|    0|           0|      0|     0|     0|            0|
+----------------+--------------------+-----+------------+-------+------+------+-------------+
only showing top 5 rows



In [5]:
labels = [column for column in data.columns if column not in ["comment_text",  "id"]]
labels

['toxic', 'severe_toxic', 'obscene', 'threat', 'insult', 'identity_hate']

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.feature import RegexTokenizer, CountVectorizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

regexTokenizer = RegexTokenizer(inputCol="comment_text", outputCol="words", pattern="\\W")
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5)
lr_models_tfidf = {}
for label in labels:
  label_stringIdx = StringIndexer(inputCol = label, outputCol = "label")
  pipeline = Pipeline(stages=[regexTokenizer, hashingTF, idf, label_stringIdx])


  pipelineFit = pipeline.fit(data)
  dataset = pipelineFit.transform(data)

  (trainingData, testData) = dataset.randomSplit([0.7, 0.3])
  lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
  lrModel = lr.fit(trainingData)
  lr_models_tfidf[label] = lrModel

  predictions = lrModel.transform(testData)
  evaluator = BinaryClassificationEvaluator()
  print("evaluation on tf&idf on "+ label + " " + str(evaluator.evaluate(predictions)))

evaluation on tf&idf on toxic 0.8989244537273711
evaluation on tf&idf on severe_toxic 0.9310365596798398
evaluation on tf&idf on obscene 0.8925484864942721
evaluation on tf&idf on threat 0.9394891486801633
evaluation on tf&idf on insult 0.9036237458216981
evaluation on tf&idf on identity_hate 0.901659678079029


In [None]:
label_stringIdx = StringIndexer(inputCol = "toxic", outputCol = "label")
for n in range(1000, 20000, 1000):
  hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=n)
  pipeline = Pipeline(stages=[regexTokenizer, hashingTF, idf, label_stringIdx])
  pipelineFit = pipeline.fit(data)
  dataset = pipelineFit.transform(data)

  (trainingData, testData) = dataset.randomSplit([0.7, 0.3])
  lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
  lrModel = lr.fit(trainingData)
  predictions = lrModel.transform(testData)
  evaluator = BinaryClassificationEvaluator()
  print("evaluation hashingTF numFeatures = "+ str(n) + " :" + str(evaluator.evaluate(predictions)))


evaluation hashingTF numFeatures = 1000 :0.8430641536054826
evaluation hashingTF numFeatures = 2000 :0.851802063302561
evaluation hashingTF numFeatures = 3000 :0.8789732832559787
evaluation hashingTF numFeatures = 4000 :0.8772863304328277
evaluation hashingTF numFeatures = 5000 :0.8878163331608615
evaluation hashingTF numFeatures = 6000 :0.889256669267699
evaluation hashingTF numFeatures = 7000 :0.8888300433317279
evaluation hashingTF numFeatures = 8000 :0.8933010610529228
evaluation hashingTF numFeatures = 9000 :0.8953694094029437
evaluation hashingTF numFeatures = 10000 :0.8923546111029904
evaluation hashingTF numFeatures = 11000 :0.8938495476296622
evaluation hashingTF numFeatures = 12000 :0.8967176467706912
evaluation hashingTF numFeatures = 13000 :0.8923978756964882
evaluation hashingTF numFeatures = 14000 :0.8960668265949326
evaluation hashingTF numFeatures = 15000 :0.8994409984922082
evaluation hashingTF numFeatures = 16000 :0.9017589726976493
evaluation hashingTF numFeatures = 

In [8]:
from pyspark.ml.feature import Word2Vec
w2v = Word2Vec(vectorSize=100, minCount=5, inputCol = "words", outputCol="features")
lr_models_w2v = {}
for label in labels:
  label_stringIdx = StringIndexer(inputCol = label, outputCol = "label")
  pipeline = Pipeline(stages=[label_stringIdx, regexTokenizer, w2v])


  pipelineFit = pipeline.fit(data)
  dataset = pipelineFit.transform(data)

  (trainingData, testData) = dataset.randomSplit([0.7, 0.3])
  lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
  lrModel = lr.fit(trainingData)
  lr_models_w2v[label] = lrModel

  predictions = lrModel.transform(testData)
  evaluator = BinaryClassificationEvaluator()
  print("evaluation w2v on "+ label + " " + str(evaluator.evaluate(predictions)))

evaluation w2v on toxic 0.9446731069921979
evaluation w2v on severe_toxic 0.9802581683406942
evaluation w2v on obscene 0.9642604423735611
evaluation w2v on threat 0.9517305950128626
evaluation w2v on insult 0.9569818441459753
evaluation w2v on identity_hate 0.952240737141214
