In [3]:
# conda install openjdk
# conda install pyspark
# conda install -c conda-forge findspark

In [1]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StandardScaler, Word2Vec, StopWordsRemover, VectorAssembler, StringIndexer, HashingTF, IDF, Tokenizer
import pyspark.sql.functions as f
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder



spark = SparkSession \
    .builder \
    .appName("HTFIDF") \
    .master("local[*]") \
    .config("spark.executor.memory", "50g") \
    .config("spark.driver.memory", "30g") \
    .config("spark.memory.offHeap.enabled",True) \
    .config("spark.memory.offHeap.size","8g") \
    .getOrCreate()

sc = spark.sparkContext

df = spark.read.option("header",True).option("quote", "\"").option("escape", "\"").option("multiline", True).csv("data/train.csv")

df.printSchema()



root
 |-- id: string (nullable = true)
 |-- comment_text: string (nullable = true)
 |-- toxic: string (nullable = true)
 |-- severe_toxic: string (nullable = true)
 |-- obscene: string (nullable = true)
 |-- threat: string (nullable = true)
 |-- insult: string (nullable = true)
 |-- identity_hate: string (nullable = true)



In [2]:
#Приведем к нижнему регистру, а затем токенизуем и удалим стоп слова

filtered_df = df.withColumn('comment_text', f.lower(f.col('comment_text')))

tokenizer = Tokenizer(inputCol='comment_text', outputCol='words')
filtered_df = tokenizer.transform(filtered_df)
remover = StopWordsRemover(inputCol='words', outputCol='filtered')
filtered_df = remover.transform(filtered_df)


#пайплайн
NUM_FEATURES = 10

hashing = HashingTF(inputCol='filtered', outputCol='raw_features', numFeatures=NUM_FEATURES)
idf = IDF(inputCol='raw_features', outputCol='features')
label_stringIdx = StringIndexer(inputCol = "toxic", outputCol = "label")
lr = LogisticRegression(maxIter=10)

pipeline = Pipeline(stages= [hashing, idf, label_stringIdx, lr])
model = pipeline.fit(filtered_df)

rescale_df = model.transform(filtered_df)
rescale_df.select('label', 'prediction').show(truncate=False)

+-----+----------+
|label|prediction|
+-----+----------+
|0.0  |0.0       |
|0.0  |0.0       |
|0.0  |0.0       |
|0.0  |0.0       |
|0.0  |0.0       |
|0.0  |0.0       |
|1.0  |0.0       |
|0.0  |0.0       |
|0.0  |0.0       |
|0.0  |0.0       |
|0.0  |0.0       |
|0.0  |0.0       |
|1.0  |0.0       |
|0.0  |0.0       |
|0.0  |0.0       |
|0.0  |0.0       |
|1.0  |0.0       |
|0.0  |0.0       |
|0.0  |0.0       |
|0.0  |0.0       |
+-----+----------+
only showing top 20 rows



In [3]:
param_grid = ParamGridBuilder().addGrid(hashing.numFeatures, 
                                        [10, 20, 50, 100, 200, 500, 1000]).build()

cv = CrossValidator(estimator=pipeline, estimatorParamMaps=param_grid, 
                    evaluator=BinaryClassificationEvaluator(), seed=42)

model = cv.fit(filtered_df)

params = [{p.name: v for p, v in m.items()} for m in model.getEstimatorParamMaps()]

pd.DataFrame.from_dict([
    {model.getEvaluator().getMetricName(): metric, **ps} 
    for ps, metric in zip(params, model.avgMetrics)])

Unnamed: 0,areaUnderROC,numFeatures
0,0.637126,10
1,0.652118,20
2,0.700205,50
3,0.739779,100
4,0.770147,200
5,0.804789,500
6,0.828474,1000


## TF_IDF

In [4]:
def tf_idf_cv(target_name, df = df, num_feature_list = [10, 20, 50, 100, 200, 500, 1000]):
    
    df = df.withColumn('comment_text', f.lower(f.col('comment_text')))
    tokenizer = Tokenizer(inputCol='comment_text', outputCol='words')
    df = tokenizer.transform(df)
    remover = StopWordsRemover(inputCol='words', outputCol='filtered')
    df = remover.transform(df)
    
    hashing = HashingTF(inputCol='filtered', outputCol='raw_features', numFeatures=10)
    idf = IDF(inputCol='raw_features', outputCol='features')
    label_stringIdx = StringIndexer(inputCol = target_name, outputCol = "label")
    lr = LogisticRegression(maxIter=10)

    pipeline = Pipeline(stages= [hashing, idf, label_stringIdx, lr])
    param_grid = ParamGridBuilder().addGrid(hashing.numFeatures, 
                                            num_feature_list).build()
    
    cv = CrossValidator(estimator=pipeline, estimatorParamMaps=param_grid, 
                        evaluator=BinaryClassificationEvaluator(), seed=42)
    model = cv.fit(df)
    params = [{p.name: v for p, v in m.items()} for m in model.getEstimatorParamMaps()]
    
    output = pd.DataFrame.from_dict([{model.getEvaluator().getMetricName(): metric, **ps} 
        for ps, metric in zip(params, model.avgMetrics)])
    
    return output


In [5]:
toxic = tf_idf_cv('toxic')
print('toxic done')

obscene = tf_idf_cv('obscene')
print('obscene done')

threat = tf_idf_cv('threat')
print('threat done')

insult = tf_idf_cv('insult')
print('insult done')

identity = tf_idf_cv('identity_hate')
print('identity_hate done')


toxic done
obscene done
threat done
insult done
identity_hate done


In [6]:
toxic['target'] = "toxic"
obscene['target'] = "obscene"
threat['target'] = "threat"
insult['target'] = "insult"
identity['target'] = "identity_hate"


pd.pivot_table(pd.concat([toxic, obscene, threat, insult, identity]),
               values='areaUnderROC', index='numFeatures',columns=['target']).reset_index()

target,numFeatures,identity_hate,insult,obscene,threat,toxic
0,10,0.614105,0.650869,0.657162,0.647158,0.637126
1,20,0.632986,0.666278,0.672023,0.646438,0.652118
2,50,0.678879,0.73124,0.745945,0.709965,0.700205
3,100,0.713338,0.770487,0.783732,0.75541,0.739779
4,200,0.740816,0.793148,0.810831,0.811542,0.770147
5,500,0.784982,0.826758,0.836866,0.827022,0.804789
6,1000,0.799305,0.849031,0.855401,0.845876,0.828474


## Word2Vec


In [7]:
def w2v_cv(target_name, df = df, VECTORSIZE = 3):
    
    df = df.withColumn('comment_text', f.lower(f.col('comment_text')))
    tokenizer = Tokenizer(inputCol='comment_text', outputCol='words')
    df = tokenizer.transform(df)
    remover = StopWordsRemover(inputCol='words', outputCol='filtered')
    df = remover.transform(df)
    
    word2Vec = Word2Vec(vectorSize=VECTORSIZE, minCount=0, inputCol='filtered', outputCol='features')
    label_stringIdx = StringIndexer(inputCol = target_name, outputCol = "label")
    lr = LogisticRegression(maxIter=10)

    pipeline = Pipeline(stages= [word2Vec, label_stringIdx, lr])
    param_grid = ParamGridBuilder().addGrid(word2Vec.vectorSize, [VECTORSIZE]).build()
    
    cv = CrossValidator(estimator=pipeline, estimatorParamMaps=param_grid, 
                        evaluator=BinaryClassificationEvaluator(), seed=42)
    model = cv.fit(df)
    
    params = [{p.name: v for p, v in m.items()} for m in model.getEstimatorParamMaps()]
    
    output = pd.DataFrame.from_dict([{model.getEvaluator().getMetricName(): metric, **ps} 
        for ps, metric in zip(params, model.avgMetrics)])
    
    return output

In [8]:
toxic_w2v = w2v_cv('toxic')
print('toxic_w2v done')

obscene_w2v = w2v_cv('obscene')
print('obscene_w2v done')

threat_w2v = w2v_cv('threat')
print('threat_w2v done')

insult_w2v = w2v_cv('insult')
print('insult_w2v done')

identity_hate_w2v = w2v_cv('identity_hate')
print('identity_hate_w2v done')


toxic_w2v done
obscene_w2v done
threat_w2v done
insult_w2v done
identity_hate_w2v done


In [9]:
toxic_w2v['target'] = "toxic"
obscene_w2v['target'] = "obscene"
threat_w2v['target'] = "threat"
insult_w2v['target'] = "insult"
identity_hate_w2v['target'] = "identity_hate"

pd.concat([toxic_w2v, obscene_w2v, threat_w2v, insult_w2v, identity_hate_w2v])

Unnamed: 0,areaUnderROC,vectorSize,target
0,0.810609,3,toxic
1,0.852763,3,obscene
2,0.843421,3,threat
3,0.834328,3,insult
4,0.888761,3,identity_hate
