### HW5: Toxic Comment Classification with Spark

In [1]:
#!pip install pyspark

In [2]:
from pyspark.sql import SparkSession
from pyspark.ml.functions import vector_to_array
import pyspark.sql.functions as F
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.classification import FMClassifier
from pyspark.ml.classification import LinearSVC
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MultilabelClassificationEvaluator

Start spark local session

In [3]:
spark = SparkSession.builder\
        .master('local[32]')\
        .appName('HW5')\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

sc = spark.sparkContext

Read data

In [4]:
train = spark.read.csv('train.csv', sep=',', quote='\"', escape='\"', multiLine=True, header=True, inferSchema=True)
test = spark.read.csv('test.csv', sep=',', quote='\"', escape='\"', multiLine=True, header=True, inferSchema=True)
test_labels = spark.read.csv('test_labels.csv', sep=',', quote='\"', escape='\"', multiLine=True, header=True, inferSchema=True)
test = test.join(test_labels, 'id')

Train model

In [None]:
res = []

for numFeatures in range(10, 200, 10):
    scores = []
    tokenizer = Tokenizer(inputCol="comment_text", outputCol="words")
    hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="tf", numFeatures=20)
    idf = IDF(inputCol=hashingTF.getOutputCol(), outputCol="features")
    preprocessing = [tokenizer, hashingTF, idf,]

    pipeline = Pipeline(stages=[tokenizer, hashingTF, idf,])

    targets = ['toxic', 'severe_toxic', 'obscene', 'threat', 'insult', 'identity_hate',]
    for target in targets:

        model = LogisticRegression(featuresCol=idf.getOutputCol(), labelCol=target, maxIter=10, regParam=0.3, elasticNetParam=0.8)
        metrics = BinaryClassificationEvaluator(labelCol=model.getLabelCol(), rawPredictionCol=model.getRawPredictionCol())

        pipeline = Pipeline(stages=preprocessing + [model,])
        pipeline = pipeline.fit(train)
        scores.append(metrics.evaluate(pipeline.transform(test)))

    res.append((numFeatures, sum(scores) / len(targets)))

In [None]:
print(res)