In [1]:
import pyspark

In [2]:
import pandas as pd
import numpy as np
import os
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import col
from pyspark.sql import SQLContext, SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, Word2Vec
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [3]:
spark = SparkSession.builder\
        .master('local[1]')\
        .appName('ToxicComment')\
        .getOrCreate()

sc = spark.sparkContext

22/12/17 23:51:42 WARN Utils: Your hostname, MAINFRAME resolves to a loopback address: 127.0.1.1; using 192.168.1.68 instead (on interface wlp3s0)
22/12/17 23:51:42 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/17 23:51:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
df = spark.read.csv('train.csv', sep=',', quote='\"', escape='\"', multiLine=True, header=True, inferSchema=True)

In [5]:
labels = ['toxic', 'severe_toxic', 'obscene', 'threat', 'insult', 'identity_hate']
train_df, val_df = df.randomSplit(weights=[0.8, 0.2], seed=0)

In [6]:
def tf_idf_result(num_features):
    tokenizer = Tokenizer(inputCol="comment_text", outputCol="splitted")
    tf  = HashingTF(inputCol="splitted", outputCol="tf_features", numFeatures=num_features)
    idf = IDF(inputCol="tf_features", outputCol="idf_features")
    pipeline = Pipeline(stages=[tokenizer, tf, idf])
    model = pipeline.fit(train_df)
    train_data = model.transform(train_df)
    val_data = model.transform(val_df)

    res = {}

    for label in labels:
        classifier = LogisticRegression(featuresCol='idf_features', labelCol=label).fit(train_data)
        preds = classifier.transform(val_data).select(col(label).alias("label"), "prediction")
        metrics = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='label', 
                                                metricName='areaUnderROC')
        res[label] = metrics.evaluate(preds)
    return res

In [7]:
for numFeatures in [100, 150, 200]:
    res = tf_idf_result(numFeatures)
    print("TF/IDF numFeatures:", numFeatures, res)

                                                                                

TF/IDF numFeatures: 100 {'toxic': 0.5259407229136341, 'severe_toxic': 0.5266930305469325, 'obscene': 0.5190325188943047, 'threat': 0.5048839383081477, 'insult': 0.5204514316053923, 'identity_hate': 0.5099313542152994}


                                                                                

TF/IDF numFeatures: 150 {'toxic': 0.5441220980779745, 'severe_toxic': 0.5370152345810655, 'obscene': 0.5411554030814869, 'threat': 0.5149945474372956, 'insult': 0.5320253657584176, 'identity_hate': 0.5116518943152326}


[Stage 1765:>                                                       (0 + 1) / 1]

TF/IDF numFeatures: 200 {'toxic': 0.5588773195006544, 'severe_toxic': 0.556261273205821, 'obscene': 0.5643436669819449, 'threat': 0.5200965882536221, 'insult': 0.5428172687182542, 'identity_hate': 0.5114637888507688}


                                                                                

Для небольшого числа фичей (100) значения AUROC получились небольшими. По мере увеличения параметра значение метрики растет. Целесообразно обучать модель на существенно большем числе фичей (от 1000).

In [8]:
tokenizer = Tokenizer(inputCol="comment_text", outputCol="splitted")
word2Vec = Word2Vec(vectorSize=40, seed=17, inputCol="splitted", outputCol="features", windowSize=10)
pipeline = Pipeline(stages=[tokenizer, word2Vec])
model = pipeline.fit(train_df)
train_data = model.transform(train_df)
val_data = model.transform(val_df)

res_w2v = {}

for label in labels:
    classifier = LogisticRegression(featuresCol='features', labelCol=label).fit(train_data)
    preds = classifier.transform(val_data).select(col(label).alias("label"), "prediction")
    metrics = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='label', 
                                            metricName='areaUnderROC')
    res_w2v[label] = metrics.evaluate(preds)

print("Word2Vec metrics:", res_w2v)

[Stage 1776:>                                                       (0 + 1) / 1]

22/12/18 00:00:25 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/12/18 00:00:25 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS


[Stage 2087:>                                                       (0 + 1) / 1]

Word2Vec metrics: {'toxic': 0.7430592490521165, 'severe_toxic': 0.5853900658050697, 'obscene': 0.7349591492950127, 'threat': 0.5100015578750584, 'insult': 0.6809162912041663, 'identity_hate': 0.5331567655689046}


                                                                                