### Value Counts of Dataset

In [1]:
import pandas as pd

df = pd.read_csv('../data/HateSpeechDatasetBalanced.csv')

In [2]:
df['Label'].value_counts()

Label
1    364525
0    361594
Name: count, dtype: int64

### Make training samples for training and validation of the model

In [3]:
df_label_0_test = df[df['Label'] == 0].iloc[:50000]
df_label_1_test = df[df['Label'] == 1].iloc[:50000]
df_test = pd.concat([df_label_0_test, df_label_1_test])

df_label_0_train1 = df[df['Label'] == 0].iloc[50000:200000]
df_label_1_train1 = df[df['Label'] == 1].iloc[50000:200000]
df_train1 = pd.concat([df_label_0_train1, df_label_1_train1])

df_label_0_train2 = df[df['Label'] == 0].iloc[50000:]
df_label_1_train2 = df[df['Label'] == 1].iloc[50000:]
df_train2 = pd.concat([df_label_0_train2, df_label_1_train2])

df_test = df_test.sample(frac=1, random_state=42).reset_index(drop=True)
df_train1 = df_train1.sample(frac=1, random_state=42).reset_index(drop=True)
df_train2 = df_train2.sample(frac=1, random_state=42).reset_index(drop=True)

df_test.to_csv('../data/test_dataset.csv', index=False)
df_train1.to_csv('../data/train_dataset_1.csv', index=False)
df_train2.to_csv('../data/train_dataset_2.csv', index=False)

### Preprocessing data, train and evaluate Pyspark model

In [None]:
!pip install findspark

In [None]:
import findspark


findspark.init()
findspark.find()

In [None]:
import os
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.sql.functions import col, udf
from pyspark.sql.types import ArrayType, StringType
from pyspark.ml import Transformer
from pyspark.sql import DataFrame


# Инициализация сессии Spark
spark = SparkSession.builder \
    .appName("Text Preprocessing with Lemmatization in PySpark") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.cores", "2") \
    .config("spark.executor.instances", "2") \
    .config("spark.sql.shuffle.partitions", "8") \
    .config("spark.storage.memoryFraction", "0.6") \
    .config("spark.memory.fraction", "0.8") \
    .getOrCreate()


# Шаг 1: Загрузка данных
data_path = "s3a://amamylov-mlops/hate_speech_detection/train_dataset_1.csv"
data = spark.read.csv(data_path, header=True, inferSchema=True)


# Шаг 2: Предобработка данных — удаление пустых строк
data = data.select("Content", "Label").na.drop()

# Шаг 3: Определение этапов препроцессинга
# Токенизация
tokenizer = Tokenizer(inputCol="Content", outputCol="words")

# Удаление стоп-слов
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")

# TF-IDF
hashing_tf = HashingTF(inputCol="filtered_words", outputCol="raw_features", numFeatures=20000)
idf = IDF(inputCol="raw_features", outputCol="features")

# Индексация меток
indexer = StringIndexer(inputCol="Label", outputCol="indexedLabel")

# RandomForest классификатор
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="features", numTrees=100, maxDepth=15)

# Шаг 4: Создание и обучение Pipeline
pipeline = Pipeline(stages=[indexer, tokenizer, remover, hashing_tf, idf, rf])

# Разделение данных на тренировочные и тестовые выборки
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Обучение модели
pipeline_model = pipeline.fit(train_data)

# Шаг 5: Предсказание на тестовых данных
predictions = pipeline_model.transform(test_data)

# Шаг 6: Оценка метрик
# F1, Precision, Recall
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="f1")
f1_score = evaluator_f1.evaluate(predictions)

evaluator_precision = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="weightedPrecision")
precision_score = evaluator_precision.evaluate(predictions)

evaluator_recall = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="weightedRecall")
recall_score = evaluator_recall.evaluate(predictions)

# ROC AUC
evaluator_auc = BinaryClassificationEvaluator(labelCol="indexedLabel", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
roc_auc_score = evaluator_auc.evaluate(predictions)

# Вывод метрик
print(f"F1 Score: {f1_score}")
print(f"Precision: {precision_score}")
print(f"Recall: {recall_score}")
print(f"ROC AUC: {roc_auc_score}")

# Шаг 7: Сохранение Pipeline целиком
model_path = "s3a://amamylov-mlops/hate_speech_detection/model"
pipeline_model.save(model_path)

print(f"Pipeline сохранен в {model_path}")

In [None]:
toxic_label_index = 1
results = predictions.select("Content", "indexedLabel", "prediction")
toxic_comments = results.filter(col("prediction") == toxic_label_index)
toxic_comments.show(truncate=False)

In [None]:
from pyspark.sql import Row
from pyspark.ml import PipelineModel

# Путь к сохраненному пайплайну
model_path = "s3a://amamylov-mlops/hate_speech_detection/model"

# Загрузка сохраненной модели
pipeline_model = PipelineModel.load(model_path)

# Пример нового комментария для предсказания
new_comment = "This is a good comment!"

# Шаг 1: Создаем DataFrame с новым комментарием
data = [Row(Content=new_comment)]
new_data_df = spark.createDataFrame(data)

# Шаг 2: Выполняем предсказание
predictions = pipeline_model.transform(new_data_df)

# Шаг 3: Извлекаем предсказания и вероятности
predicted_label = predictions.select("prediction").collect()[0]["prediction"]
probability = predictions.select("probability").collect()[0]["probability"]

# Отображение результата
print(f"Комментарий: '{new_comment}'")
print(f"Классифицирован как: {'негативный' if predicted_label == 1.0 else 'позитивный'}")
print(f"Вероятности классов: {probability}")

In [None]:
spark.stop()