In [30]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, HashingTF, IDF, StopWordsRemover, StringIndexer, Word2Vec
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
import pandas as pd

In [31]:
ss = (SparkSession.builder
        .appName('Toxic Comment Classification')
        .config("spark.executor.memory", "1G")
        .config("spark.driver.memory","2G")
        .config("spark.executor.cores","7")
        .config("spark.python.worker.memory","1G")
        .config("spark.default.parallelism","4")
        .getOrCreate())
ss.sparkContext.setLogLevel('ERROR')

In [32]:
def prep_data(file):
    df = pd.read_csv(f'data/{file}.csv')
    return ss.createDataFrame(df)

In [None]:
def hashingTF_idf(data):
    tokenizer = Tokenizer(inputCol="comment_text", outputCol="words")
    wordsData = tokenizer.transform(data)

    hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures")
    tf = hashingTF.transform(wordsData)
    hashingTF.setNumFeatures(1024)

    idf = IDF(inputCol="rawFeatures", outputCol="features")
    idf_model = idf.fit(tf) 
    return idf_model.transform(tf)

In [33]:
def word2vec(data):
    tokenizer = Tokenizer(inputCol="comment_text", outputCol="words")
    words_data = tokenizer.transform(data)

    w2v = Word2Vec(vectorSize=100, inputCol="words", outputCol="features")
    w2v_model = w2v.fit(words_data) 
    return w2v_model.transform(words_data) 

In [34]:
def _get_evaluation(data, cols):
    eval_dict = {}
    for col in cols: 
        evaluation = BinaryClassificationEvaluator(rawPredictionCol=f"{col}_prediction", labelCol=col, metricName='areaUnderROC')
        eval_dict[col] = evaluation.evaluate(data)
    return eval_dict

In [35]:
def learn_model(train,test,marks):
    test_res = test.select('id')
    out_cols = [i for i in train.columns if i not in ["id", "comment_text", "words", "rawFeatures", "features"]]
    extract_prob = F.udf(lambda x: float(x[1]), T.FloatType())

    test_probs = []
    for col in out_cols:
        lr = LogisticRegression(featuresCol="features", labelCol=col, regParam=0.05)
        paramGrid = (ParamGridBuilder().addGrid(lr.regParam, [0.05, 0.1, 0.3]).build())
        cv = CrossValidator(estimator=lr,
                            estimatorParamMaps=paramGrid,
                            evaluator=BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol=col, metricName='areaUnderROC'),
                            numFolds=2)
        cl_lr_model = cv.fit(train)
        res = cl_lr_model.transform(test)
        test_res = test_res.join(res.select('id', 'prediction'), on="id")
        test_res = test_res.withColumnRenamed('prediction', f"{col}_prediction")

    res_for_test = test_res.join(marks, on="id")
    print(_get_evaluation(res_for_test, out_cols))

In [26]:
train = prep_data('train')
train = train.withColumn("toxic",train.toxic.cast('int'))
train = train.withColumn("severe_toxic",train.toxic.cast('int'))
train = train.withColumn("obscene",train.toxic.cast('int'))
train = train.withColumn("threat",train.toxic.cast('int'))
train = train.withColumn("insult",train.toxic.cast('int'))
train = train.withColumn("identity_hate",train.toxic.cast('int'))

test = prep_data('test')
marks = prep_data('marks')

train_data = hashingTF_idf(train)
test_data = hashingTF_idf(test)
learn_model(train_data, test_data, marks) # numFeatures=1048576

                                                                                

{'toxic': 0.6266565141060169, 'severe_toxic': 0.7326115809402718, 'obscene': 0.6604303087240774, 'threat': 0.6624006455250596, 'insult': 0.6737980158686908, 'identity_hate': 0.6929353462580896}


In [28]:
train_data = hashingTF_idf(train)
test_data = hashingTF_idf(test)
learn_model(train_data, test_data, marks) # numFeatures=1024

                                                                                

{'toxic': 0.6266565141060169, 'severe_toxic': 0.7326115809402718, 'obscene': 0.6604303087240774, 'threat': 0.6624006455250596, 'insult': 0.6737980158686908, 'identity_hate': 0.6929353462580896}


In [36]:
train_data = word2vec(train)
test_data = word2vec(test)
learn_model(train_data, test_data, marks)

                                                                                

{'toxic': 0.5717122309532192, 'severe_toxic': 0.6977031348897644, 'obscene': 0.6112809898418207, 'threat': 0.635977357018018, 'insult': 0.5998972935547053, 'identity_hate': 0.5892826276342719}


### Выводы

По умолчанию значение NumFeatures 1048576, попробовал установить 1024(так как почитал, что рекомендуется использовать степени двойки), но не получил никакой разницы.

HashingTF_idf лучше справляется с постаеленной задачей чем word2vec.