In [1]:
spark

Intitializing Scala interpreter ...

Spark Web UI available at http://dmitriis-mbp:4040
SparkContext available as 'sc' (version = 3.3.1, master = local[*], app id = local-1670364470911)
SparkSession available as 'spark'


res0: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@6e86a6a3


In [2]:
val df = spark
.read
.option("header", "true")
.option("escape","\"")
.option("multiLine", "true")
.option("inferSchema", "true")
.csv("train.csv")

df.show()

+----------------+--------------------+-----+------------+-------+------+------+-------------+
|              id|        comment_text|toxic|severe_toxic|obscene|threat|insult|identity_hate|
+----------------+--------------------+-----+------------+-------+------+------+-------------+
|0000997932d777bf|Explanation\nWhy ...|    0|           0|      0|     0|     0|            0|
|000103f0d9cfb60f|D'aww! He matches...|    0|           0|      0|     0|     0|            0|
|000113f07ec002fd|Hey man, I'm real...|    0|           0|      0|     0|     0|            0|
|0001b41b1c6bb37e|"\nMore\nI can't ...|    0|           0|      0|     0|     0|            0|
|0001d958c54c6e35|You, sir, are my ...|    0|           0|      0|     0|     0|            0|
|00025465d4725e87|"\n\nCongratulati...|    0|           0|      0|     0|     0|            0|
|0002bcb3da6cb337|COCKSUCKER BEFORE...|    1|           1|      1|     0|     1|            0|
|00031b1e95af7921|Your vandalism to...|    0|     

df: org.apache.spark.sql.DataFrame = [id: string, comment_text: string ... 6 more fields]


In [3]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- comment_text: string (nullable = true)
 |-- toxic: integer (nullable = true)
 |-- severe_toxic: integer (nullable = true)
 |-- obscene: integer (nullable = true)
 |-- threat: integer (nullable = true)
 |-- insult: integer (nullable = true)
 |-- identity_hate: integer (nullable = true)



In [4]:
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer, Word2Vec}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.tuning.{TrainValidationSplit, ParamGridBuilder}

import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer, Word2Vec}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.tuning.{TrainValidationSplit, ParamGridBuilder}


In [8]:
val targets = Array("toxic", "severe_toxic", "obscene", "threat", "insult", "identity_hate")
val num_features = Array(10, 100, 1000, 10000)

targets: Array[String] = Array(toxic, severe_toxic, obscene, threat, insult, identity_hate)
num_features: Array[Int] = Array(10, 100, 1000, 10000)


In [7]:
for (target <- targets) {
    val tokenizer = new Tokenizer()
        .setInputCol("comment_text")
        .setOutputCol("words")
    val hashingTF = new HashingTF()
        .setNumFeatures(1000)
        .setInputCol(tokenizer.getOutputCol)
        .setOutputCol("rawFeatures")
    val idf = new IDF()
        .setInputCol(hashingTF.getOutputCol)
        .setOutputCol("features")
    val lr = new LogisticRegression()
        .setLabelCol(target)
    val pipeline = new Pipeline()
        .setStages(
            Array(
                tokenizer,
                hashingTF,
                idf,
                lr
            )
        )
    val paramGrid = new ParamGridBuilder()
        .addGrid(hashingTF.numFeatures, num_features)
        .build()

    val evaluator = new BinaryClassificationEvaluator()
        .setLabelCol(target)

    val trainValidationSplit = new TrainValidationSplit()
        .setEstimator(pipeline)
        .setEvaluator(evaluator)
        .setEstimatorParamMaps(paramGrid)
        .setTrainRatio(0.8)
        .setParallelism(4)
    val model = trainValidationSplit.fit(df)

    println(s"target: ${target}")
    for (i <- 0 until num_features.length) {
        println(s"numFeatures: ${num_features(i)} - ROC-AUC score: ${model.validationMetrics(i)}")
    }
    
}

22/12/06 22:53:30 WARN BlockManager: Asked to remove block broadcast_5593_piece0, which does not exist
target: toxic
numFeatures: 10 - ROC-AUC score: 0.6395966506739984
numFeatures: 100 - ROC-AUC score: 0.7808456528214043
numFeatures: 1000 - ROC-AUC score: 0.8742418602472807
numFeatures: 10000 - ROC-AUC score: 0.8720380596670918
target: severe_toxic
numFeatures: 10 - ROC-AUC score: 0.7150827845741294
numFeatures: 100 - ROC-AUC score: 0.7959638071490243
numFeatures: 1000 - ROC-AUC score: 0.8921820181480886
numFeatures: 10000 - ROC-AUC score: 0.829040210962328
target: obscene
numFeatures: 10 - ROC-AUC score: 0.6659464636868121
numFeatures: 100 - ROC-AUC score: 0.7917703491161217
numFeatures: 1000 - ROC-AUC score: 0.8929038256643931
numFeatures: 10000 - ROC-AUC score: 0.8502915385051808
target: threat
numFeatures: 10 - ROC-AUC score: 0.6764621165453284
numFeatures: 100 - ROC-AUC score: 0.8060826214327531
numFeatures: 1000 - ROC-AUC score: 0.868782423416597
numFeatures: 10000 - ROC-AUC sco

**the more numFeatures the better the result, but starting from a certain point, our model starts to overfit due to the large complexity caused by numFeatures**

In [6]:
val tokenizer = new Tokenizer()
    .setInputCol("comment_text")
    .setOutputCol("words")
val word2Vec = new Word2Vec()
    .setInputCol("words")
    .setOutputCol("features")

val w2v_pipeline = new Pipeline()
    .setStages(
        Array(
            tokenizer,
            word2Vec,
        )
    )

val w2v_transormation = w2v_pipeline.fit(df)

val w2v_df = w2v_transormation.transform(df)

tokenizer: org.apache.spark.ml.feature.Tokenizer = tok_8d49520d6eb2
word2Vec: org.apache.spark.ml.feature.Word2Vec = w2v_05495579dd52
w2v_pipeline: org.apache.spark.ml.Pipeline = pipeline_05812102f229
w2v_transormation: org.apache.spark.ml.PipelineModel = pipeline_05812102f229
w2v_df: org.apache.spark.sql.DataFrame = [id: string, comment_text: string ... 8 more fields]


In [9]:
for (target <- targets) {
    val lr = new LogisticRegression()
        .setLabelCol(target)
    val paramGrid = new ParamGridBuilder()
        .build()
    val evaluator = new BinaryClassificationEvaluator()
        .setLabelCol(target)
    val trainValidationSplit = new TrainValidationSplit()
        .setEstimator(lr)
        .setEvaluator(evaluator)
        .setEstimatorParamMaps(paramGrid)
        .setTrainRatio(0.8)
    val model = trainValidationSplit.fit(w2v_df)

    println(s"target: ${target}")
    println(s"W2V - ROC-AUC score: ${model.validationMetrics(0)}")

}

22/12/06 23:16:02 WARN MemoryStore: Not enough space to cache rdd_39_0 in memory! (computed 258.1 MiB so far)
22/12/06 23:16:02 WARN BlockManager: Persisting block rdd_39_0 to disk instead.
22/12/06 23:16:10 WARN MemoryStore: Not enough space to cache rdd_125_0 in memory! (computed 67.5 MiB so far)
22/12/06 23:16:10 WARN BlockManager: Persisting block rdd_125_0 to disk instead.
target: toxic
W2V - ROC-AUC score: 0.9407561243442133
22/12/06 23:16:26 WARN MemoryStore: Not enough space to cache rdd_226_0 in memory! (computed 67.5 MiB so far)
22/12/06 23:16:26 WARN BlockManager: Persisting block rdd_226_0 to disk instead.
22/12/06 23:16:35 WARN MemoryStore: Not enough space to cache rdd_327_0 in memory! (computed 67.5 MiB so far)
22/12/06 23:16:35 WARN BlockManager: Persisting block rdd_327_0 to disk instead.
target: severe_toxic
W2V - ROC-AUC score: 0.9697990140702598
22/12/06 23:16:50 WARN MemoryStore: Not enough space to cache rdd_449_0 in memory! (computed 67.5 MiB so far)
22/12/06 23:

**LogisticRegression with W2V works much better than with TFIDF**