Work was conducted with [Spark 3.2.0](//spark.apache.org/releases/spark-release-3-2-0.html) and [Zeppelin 0.10.1](https://hub.docker.com/layers/zeppelin/apache/zeppelin/0.10.1/images/sha256-9c1b5ddd6225ad45cedc327a853f6f13e45797ff419260d20bb9c14f5cbe3a87?context=explore)

### 1. Read Train Data

In [2]:
%spark
import org.apache.spark.sql.DataFrame

val train_raw: DataFrame = spark.read
    .option("multiline", "true")
    .option("quote", "\"")
    .option("header", "true")
    .option("escape", "\\")
    .option("escape", "\"")
    .csv("/notebook/data/train.csv")
    .limit(20000)

train_raw.show(5)

In [3]:
%spark
// remove NaN and drop "id" column as we won't need it anymore
val train: DataFrame = train_raw.na.drop()
    .drop("id")

Let's view some toxic comments

In [5]:
%spark
train.filter(train("toxic") === 1)
    .select("comment_text")
    .take(2)

### 2. Tokenize data


In [7]:
%spark
import org.apache.spark.ml.feature.Tokenizer

val tokenizer: Tokenizer = new Tokenizer()
    .setInputCol("comment_text")
    .setOutputCol("comment_text_tokenized")
                    
val train_tokenized: DataFrame = tokenizer.transform(train)
    .drop("comment_text")

train_tokenized.show(5)

### 3. Feature Extraction: HashingTF-IDF


In [9]:
%spark
import org.apache.spark.ml.feature.HashingTF

val hashingTF: HashingTF = new HashingTF()
    .setInputCol("comment_text_tokenized")
    .setOutputCol("raw_features")
    .setNumFeatures(scala.math.pow(2, 10).toInt)

val train_tf: DataFrame = hashingTF.transform(train_tokenized)
    .drop("comment_text_tokenized")

train_tf.select("raw_features").take(2)

In [10]:
%spark
import org.apache.spark.ml.feature.IDF
import org.apache.spark.ml.feature.IDFModel

val idf: IDF = new IDF()
    .setInputCol("raw_features")
    .setOutputCol("features")
val idf_model: IDFModel = idf.fit(train_tf)

val train_tfidf: DataFrame = idf_model.transform(train_tf)
    .drop("raw_features")

train_tfidf.select("features").take(2)

### 4. Feature Exctraction: Word2Vec


In [12]:
%spark
import org.apache.spark.ml.feature.Word2Vec
import org.apache.spark.ml.feature.Word2VecModel

val word2vec: Word2Vec = new Word2Vec()
    .setInputCol("comment_text_tokenized")
    .setOutputCol("features")
val word2vec_model: Word2VecModel = word2vec.fit(train_tokenized)

val train_w2v: DataFrame = word2vec_model.transform(train_tokenized)
    .drop("comment_text_tokenized")

train_w2v.select("features").take(2)

### 5. Train Loop


In [14]:
%spark

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.classification.LogisticRegressionModel
import org.apache.spark.sql.DataFrame
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

def train_loop(train_df: DataFrame, test_df: DataFrame): Unit = {
    val targets: List[String] = List("toxic", "severe_toxic", "obscene", "threat", "insult", "identity_hate")

    for (target <- targets) {
        val lr: LogisticRegression = new LogisticRegression()
            .setMaxIter(500)
            .setRegParam(0.01)
            .setElasticNetParam(0.08)
            .setFeaturesCol("features")
            .setLabelCol(target)
        
        val model_lr: LogisticRegressionModel = lr.fit(train_df
            .withColumn(target, col(target).cast("int")))
        
        val test_df_filtered: DataFrame = test_df
            .withColumn(target, col(target).cast("int"))
            .filter(test_df(target) !== -1)
        val test_predictions: DataFrame = model_lr.transform(test_df_filtered)

        val evaluator: BinaryClassificationEvaluator = new BinaryClassificationEvaluator()
            .setLabelCol(target)
            .setRawPredictionCol("probability")
            .setMetricName("areaUnderROC")
        
        val roc_auc: Double = evaluator.evaluate(test_predictions)
        
        println(s"Target: $target")
        println(s"ROC-AUC: $roc_auc")   
    }
}

### 6. Read Test Data


In [16]:
%spark
// read comments
val test_raw: DataFrame = spark.read
    .option("multiline", "true")
    .option("quote", "\"")
    .option("header", "true")
    .option("escape", "\\")
    .option("escape", "\"")
    .csv("/notebook/data/test.csv")

test_raw.show(5)

In [17]:
%spark
// read labels
val test_labels: DataFrame = spark.read
    .option("multiline", "true")
    .option("quote", "\"")
    .option("header", "true")
    .option("escape", "\\")
    .option("escape", "\"")
    .csv("/notebook/data/test_labels.csv")

test_labels.show(5)

In [18]:
%spark
// join comments and labels, drop NaN and id
val test: DataFrame = test_raw
    .join(test_labels, test_raw("id") === test_labels("id"), "inner")
    .na.drop()
    .drop("id")

test.show(5)

In [19]:
%spark
// tokenize
val test_tokenized: DataFrame = tokenizer.transform(test)
    .drop("comment_text")

test_tokenized.show(5)

### 7. Experiments: HashingTF-IDF

In [21]:
%spark
val pows: List[Int] = List(9, 10, 11, 12, 13)

println("HashingTF-IDF")
println("------------------------")
for (pow <- pows) {
    val num_features: Int = scala.math.pow(2, pow).toInt
    println(s"Num Features: $num_features")
    
    hashingTF.setNumFeatures(num_features)
    val train_tf: DataFrame = hashingTF.transform(train_tokenized)
        .drop("comment_text_tokenized")

    val idf_model: IDFModel = idf.fit(train_tf)
    val train_tfidf = idf_model.transform(train_tf)
        .drop("raw_features")
    
    val test_tf: DataFrame = hashingTF.transform(test_tokenized)
        .drop("comment_text_tokenized")
    val test_tfidf: DataFrame = idf_model.transform(test_tf)
        .drop("raw_features")

    train_loop(train_tfidf, test_tfidf)
    println("------------------------")
}


***Conclusion***: inscreasing numFeatures parameter has positive impact on metrics. 


### 8. Experiments: Word2Vec

In [24]:
%spark
val test_w2v: DataFrame = word2vec_model.transform(train_tokenized)
    .drop("comment_text_tokenized")

println("Word2Vec")
println("------------------------")
train_loop(train_w2v, test_w2v)

***Conslusion:*** Word2Vec showed better results than HashingTF-IDF, which is not suprising, as in general TF-IDF performs worse than Word2Vec, so its approximate version would be too.
