In [1]:
import numpy as np 
import pandas as pd 
import os

from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

spark = SparkSession.builder.master("local[*]").getOrCreate()

In [2]:
# Чтение данных
df = spark.read.csv('the-reddit-covid-dataset-comments.csv', inferSchema=True, header=True)

In [3]:
# Создайте выборку
df = df.sample(withReplacement=False, fraction=0.01, seed=42)

In [4]:
# Задайте новые имена столбцов
new_column_names = ['type', 'id', 'subreddit_id', 'subreddit_name', 'subreddit_nsfw', 'created_utc', 'permalink', 'body', 'sentiment', 'score'] 
# Создайте новый датасет с новыми именами столбцов, сохраняя типы данных
df = df.toDF(*new_column_names)

pd.DataFrame(df.dtypes, columns = ['Column Name','Data type'])

Unnamed: 0,Column Name,Data type
0,type,string
1,id,string
2,subreddit_id,string
3,subreddit_name,string
4,subreddit_nsfw,string
5,created_utc,string
6,permalink,string
7,body,string
8,sentiment,string
9,score,string


In [5]:
# Оставление только указанных столбцов
selected_columns = ['id', 'subreddit_name', 'subreddit_nsfw', 'sentiment', 'score']
df = df.select(selected_columns)


In [6]:
from pyspark.sql.functions import when, col

# Оригинальное количество строк
original_count = df.count()

# Удаление строк с отсутствующими значениями в score или sentiment
df = df.na.drop(subset=['subreddit_nsfw', 'score', 'sentiment'])

# Удаление строк в столбце subreddit_nsfw, где значения не являются 'true' или 'false'
valid_values = ['true', 'false']
df = df.filter(col('subreddit_nsfw').isin(valid_values))

# Количество удаленных строк
filtered_count = original_count - df.count()

# Вывод статистики
print(f"Исходное количество строк: {original_count}")
print(f"Количество удаленных строк: {filtered_count}")
print(f"Количество оставшихся строк: {df.count()}")

# Вывод первых двух строк оставшегося DataFrame
df.show(2)

Исходное количество строк: 484687
Количество удаленных строк: 400306
Количество оставшихся строк: 84381
+-------+--------------+--------------+---------+-----+
|     id|subreddit_name|subreddit_nsfw|sentiment|score|
+-------+--------------+--------------+---------+-----+
|hi1v4vl|        canada|         false|  -0.7269|    1|
|hi1satq|           nrl|         false|  -0.7506|   46|
+-------+--------------+--------------+---------+-----+
only showing top 2 rows



In [7]:
# Фильтрация строк с null значениями
rows_with_null = df.filter(col("subreddit_nsfw").isNull() | col("sentiment").isNull() | col("score").isNull())

# Отображение этих строк
rows_with_null.show()

+---+--------------+--------------+---------+-----+
| id|subreddit_name|subreddit_nsfw|sentiment|score|
+---+--------------+--------------+---------+-----+
+---+--------------+--------------+---------+-----+



In [8]:
# Преобразование столбца "sentiment" в числовой формат
df = df.withColumn("sentiment", col("sentiment").cast(FloatType()))

# Преобразование столбца "sentiment" в числовой формат
df = df.withColumn("score", col("score").cast(FloatType()))

In [9]:
# Фильтрация строк с null значениями
rows_with_null = df.filter(col("subreddit_nsfw").isNull() | col("sentiment").isNull() | col("score").isNull())

# Отображение этих строк
rows_with_null.show()

+-------+--------------------+--------------+---------+-------+
|     id|      subreddit_name|subreddit_nsfw|sentiment|  score|
+-------+--------------------+--------------+---------+-------+
|hi0nq3g|     hermancainaward|         false|     NULL|   NULL|
|hhzavaw|coronaviruscircle...|         false|     NULL|   NULL|
|hhyvh5d|            mntrolls|         false|     NULL|   NULL|
|hhwszps|       ultramarathon|         false|     NULL|   NULL|
|hhwnta9|     covid19positive|         false|     NULL| 0.9628|
|hhwake1|          conspiracy|         false|     NULL|   NULL|
|hhtfnj3|         aznidentity|         false|     NULL|   NULL|
|hhrv8t6|          conspiracy|         false|     NULL|-0.5612|
|hhrkdbb|     qanoncasualties|         false|     NULL|   NULL|
|hhresb4|           askreddit|         false|     NULL|   NULL|
|hhq5kim|               eesti|         false|     NULL|   NULL|
|hhq53ix|      debatevaccines|         false|     NULL|    0.0|
|hhpdv3c|        vaxxhappened|         f

In [10]:
# Определение квартилей для каждого столбца
quantiles = df.stat.approxQuantile(["sentiment", "score"], [0.25, 0.75], 0.05)

# Рассчет межквартильного размаха (IQR)
IQR_sentiment = quantiles[0][1] - quantiles[0][0]
IQR_score = quantiles[1][1] - quantiles[1][0]

# Определение границ выбросов
lower_bound_sentiment = quantiles[0][0] - 1.5 * IQR_sentiment
upper_bound_sentiment = quantiles[0][1] + 1.5 * IQR_sentiment

lower_bound_score = quantiles[1][0] - 1.5 * IQR_score
upper_bound_score = quantiles[1][1] + 1.5 * IQR_score

# Фильтрация данных для удаления выбросов
df_filtered = df.filter((col("sentiment").between(lower_bound_sentiment, upper_bound_sentiment)) &
                        (col("score").between(lower_bound_score, upper_bound_score)))

# Показать результат
df_filtered.show(3)

+-------+--------------+--------------+---------+-----+
|     id|subreddit_name|subreddit_nsfw|sentiment|score|
+-------+--------------+--------------+---------+-----+
|hi1v4vl|        canada|         false|  -0.7269|  1.0|
|hi1q4qb|toiletpaperusa|         false|   0.4815|  3.0|
|hi1pi1o|    ukpolitics|         false|  -0.9432|  1.0|
+-------+--------------+--------------+---------+-----+
only showing top 3 rows



In [11]:
# Фильтрация строк с null значениями
rows_with_null = df_filtered.filter(col("subreddit_nsfw").isNull() | col("sentiment").isNull() | col("score").isNull())

# Отображение этих строк
rows_with_null.show()

+---+--------------+--------------+---------+-----+
| id|subreddit_name|subreddit_nsfw|sentiment|score|
+---+--------------+--------------+---------+-----+
+---+--------------+--------------+---------+-----+



In [12]:
from pyspark.sql.functions import when, col
# Замените значения true и false на 1 и 0
df_filtered = df_filtered.withColumn("subreddit_nsfw", when(col("subreddit_nsfw") == "true", 0).when(col("subreddit_nsfw") == "false", 1))

# Покажите результат
df_filtered.show(5)

+-------+--------------+--------------+---------+-----+
|     id|subreddit_name|subreddit_nsfw|sentiment|score|
+-------+--------------+--------------+---------+-----+
|hi1v4vl|        canada|             1|  -0.7269|  1.0|
|hi1q4qb|toiletpaperusa|             1|   0.4815|  3.0|
|hi1pi1o|    ukpolitics|             1|  -0.9432|  1.0|
|hi1p77f|     askreddit|             1|   0.2111|  4.0|
|hi1nawj|      antiwork|             1|     0.61| 11.0|
+-------+--------------+--------------+---------+-----+
only showing top 5 rows



In [13]:
df_filtered.printSchema()

root
 |-- id: string (nullable = true)
 |-- subreddit_name: string (nullable = true)
 |-- subreddit_nsfw: integer (nullable = true)
 |-- sentiment: float (nullable = true)
 |-- score: float (nullable = true)



In [14]:
df_filtered = df_filtered.na.fill(0)

In [24]:
data1 = df_filtered
data2 = df_filtered


In [25]:
# Фильтрация строк с null значениями
rows_with_null = data1.filter(col("subreddit_nsfw").isNull() | col("sentiment").isNull() | col("score").isNull())

# Отображение этих строк
rows_with_null.show()

+---+--------------+--------------+---------+-----+
| id|subreddit_name|subreddit_nsfw|sentiment|score|
+---+--------------+--------------+---------+-----+
+---+--------------+--------------+---------+-----+



In [26]:
# Фильтрация строк с null значениями
rows_with_null = data2.filter(col("subreddit_nsfw").isNull() | col("sentiment").isNull() | col("score").isNull())

# Отображение этих строк
rows_with_null.show()

+---+--------------+--------------+---------+-----+
| id|subreddit_name|subreddit_nsfw|sentiment|score|
+---+--------------+--------------+---------+-----+
+---+--------------+--------------+---------+-----+



In [27]:
# Создание вектора признаков
feature_columns = ["subreddit_nsfw", "sentiment"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features", handleInvalid="skip")
df_regression = assembler.transform(data1)
df_regression = df_regression.na.fill(0)  # Заменить null на 0

In [28]:
# Разделение на обучающую и тестовую выборки
(train_data_regression, test_data_regression) = df_regression.randomSplit([0.8, 0.2], seed=123)

In [29]:
# RandomForestRegressor
rf_regression = RandomForestRegressor(featuresCol="features", labelCol="score")

In [30]:
# Подбор гиперпараметров
param_grid_rf_regression = ParamGridBuilder() \
    .addGrid(rf_regression.numTrees, [10, 20, 30]) \
    .addGrid(rf_regression.maxDepth, [5, 10, 15]) \
    .build()

In [31]:
# Кросс-валидация
evaluator_rf_regression = RegressionEvaluator(labelCol="score", predictionCol="prediction", metricName="rmse")
cv_rf_regression = CrossValidator(estimator=rf_regression, estimatorParamMaps=param_grid_rf_regression, evaluator=evaluator_rf_regression, numFolds=3)

In [32]:
# Обучение модели
model_rf_regression = cv_rf_regression.fit(train_data_regression)
predictions_rf_regression = model_rf_regression.transform(test_data_regression)

In [33]:
# Оценка качества модели
rmse_rf_regression = evaluator_rf_regression.evaluate(predictions_rf_regression)
print("RandomForestRegressor RMSE:", rmse_rf_regression)

RandomForestRegressor RMSE: 2.503135601859707


In [34]:
# Создание вектора признаков
feature_columns_classification = ["subreddit_nsfw", "sentiment"]
assembler_classification = VectorAssembler(inputCols=feature_columns_classification, outputCol="features")
df_classification = assembler_classification.transform(data2)

In [35]:
# Добавление бинарного признака (пример: score > 0.5)
threshold_score_classification = 1
df_classification = df_classification.withColumn("binary_feature", when(df_classification["score"] > threshold_score_classification, 1).otherwise(0))

In [36]:
# Разделение на обучающую и тестовую выборки
(train_data_classification, test_data_classification) = df_classification.randomSplit([0.8, 0.2], seed=123)

In [37]:
# Исключение столбца score
df_classification = df_classification.drop("score")

In [38]:
# LogisticRegression
lr_classification = LogisticRegression(featuresCol="features", labelCol="binary_feature")

In [39]:
# Подбор гиперпараметров
param_grid_lr_classification = ParamGridBuilder() \
    .addGrid(lr_classification.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(lr_classification.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

In [40]:
# Кросс-валидация
evaluator_lr_classification = BinaryClassificationEvaluator(labelCol="binary_feature", metricName="areaUnderROC")
cv_lr_classification = CrossValidator(estimator=lr_classification, estimatorParamMaps=param_grid_lr_classification, evaluator=evaluator_lr_classification, numFolds=3)

In [41]:
# Обучение модели
model_lr_classification = cv_lr_classification.fit(train_data_classification)
predictions_lr_classification = model_lr_classification.transform(test_data_classification)

In [42]:
# Оценка качества модели
area_under_roc_lr_classification = evaluator_lr_classification.evaluate(predictions_lr_classification)
print("LogisticRegression Area under ROC:", area_under_roc_lr_classification)

LogisticRegression Area under ROC: 0.5
