# Лабораторная работа №2 по дисциплине "Системы обработки больших данных"
## Подключаем библиотеки и отфильтрованный датасет из ЛР № 1
### Задача регрессии - Gradient-boosted tree regression:

In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator, BinaryClassificationEvaluator

spark = SparkSession.builder.master("local[*]").getOrCreate()
filename_data = 'processed_data/iran_201906_2_tweets_csv_hashed.csv'
csv = spark.read.csv(filename_data, inferSchema=True, header=True)
csv.show(10)
csv.printSchema()

+-------------------+-------------------+----------------------+--------------+---------------+---------------------+--------------+----------+-----------+-----------+----------+-------------+
|            tweetid|             userid|user_reported_location|follower_count|following_count|account_creation_date|tweet_language|is_retweet|quote_count|reply_count|like_count|retweet_count|
+-------------------+-------------------+----------------------+--------------+---------------+---------------------+--------------+----------+-----------+-----------+----------+-------------+
|1088195741615501312|1032591398657314817|             Palestine|       10238.0|        10118.0|           2018-08-23|            fa|     false|       90.0|       90.0|      90.0|         90.0|
|1100751010597912576|1032591398657314817|             Palestine|       10238.0|        10118.0|           2018-08-23|            fa|     false|       68.0|       68.0|      68.0|         68.0|
| 931250203084115968| 7706617090736

#### Будем использовать только числовые признаки:

In [2]:
data = csv.select("follower_count", "following_count", "quote_count", "reply_count", "like_count", "retweet_count")
data.show(20)

+--------------+---------------+-----------+-----------+----------+-------------+
|follower_count|following_count|quote_count|reply_count|like_count|retweet_count|
+--------------+---------------+-----------+-----------+----------+-------------+
|       10238.0|        10118.0|       90.0|       90.0|      90.0|         90.0|
|       10238.0|        10118.0|       68.0|       68.0|      68.0|         68.0|
|       20256.0|         3842.0|       97.0|       97.0|      97.0|         97.0|
|       20256.0|         3842.0|       62.0|       62.0|      62.0|         62.0|
|       10805.0|         1916.0|       97.0|       97.0|      97.0|         97.0|
|       18099.0|         9861.0|       11.0|       11.0|      11.0|         11.0|
|       18099.0|         9861.0|       12.0|       12.0|      12.0|         12.0|
|       18099.0|         9861.0|       16.0|       16.0|      16.0|         16.0|
|       15259.0|         1172.0|      349.0|      349.0|     349.0|        349.0|
|       18099.0|

#### Разделим данные на обучающую и тестовую выборку:
##### Будем использовать 70% данных для обучения и 30% для тестирования.

In [3]:
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1]
train_rows = train.count()
test_rows = test.count()
print("Training Rows:", train_rows, " Testing Rows:", test_rows)

Training Rows: 93158  Testing Rows: 39952


#### Определим конвейер:
##### Конвейер состоит из серии этапов преобразования и оценки, которые обычно подготавливают фрейм данных для моделирования, а затем обучают прогнозирующую модель. В этом случае вы создадите конвейер с тремя этапами:
1. <b>VectorAssembler:</b> Создает вектор непрерывных числовых признаков.

2. <b>MinMaxScaler:</b> Нормализует непрерывные числовые характеристики.

3. <b>Gradient-boosted tree regression:</b> Обучает модель регрессии.

In [4]:
numVect = VectorAssembler(inputCols = ["follower_count", "following_count", "quote_count", "reply_count", "like_count", "retweet_count"], outputCol="numFeatures")
minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), outputCol="features")

gbtr = GBTRegressor(featuresCol = 'features', labelCol='quote_count', maxIter = 5, maxDepth=2)

pipeline = Pipeline(stages=[numVect, minMax, gbtr])

#### Запускаем конвейер:

In [5]:
piplineModel = pipeline.fit(train)

#### Генерируем предсказываемые значения:

In [6]:
prediction = piplineModel.transform(test)
predicted = prediction.select("features", "prediction", "quote_count")
predicted.show(20, truncate=False)

+-----------------------------+------------------+-----------+
|features                     |prediction        |quote_count|
+-----------------------------+------------------+-----------+
|(6,[1],[0.05688723835246456])|0.2758940726795321|0.0        |
|(6,[1],[0.05688723835246456])|0.2758940726795321|0.0        |
|(6,[1],[0.05688723835246456])|0.2758940726795321|0.0        |
|(6,[1],[0.05688723835246456])|0.2758940726795321|0.0        |
|(6,[1],[0.05688723835246456])|0.2758940726795321|0.0        |
|(6,[1],[0.05688723835246456])|0.2758940726795321|0.0        |
|(6,[1],[0.05688723835246456])|0.2758940726795321|0.0        |
|(6,[1],[0.05688723835246456])|0.2758940726795321|0.0        |
|(6,[1],[0.05688723835246456])|0.2758940726795321|0.0        |
|(6,[1],[0.05688723835246456])|0.2758940726795321|0.0        |
|(6,[1],[0.05688723835246456])|0.2758940726795321|0.0        |
|(6,[1],[0.05688723835246456])|0.2758940726795321|0.0        |
|(6,[1],[0.05688723835246456])|0.2758940726795321|0.0  

#### Выводим метрики для оценки модели:

In [7]:
regressionEvaluator = RegressionEvaluator(predictionCol="prediction", labelCol="quote_count", metricName="rmse")
# RMSE
rmse = regressionEvaluator.evaluate(prediction)
print(f"The RMSE for the Gradient-boosted tree regression model is {rmse:0.2f}")
# MSE
mse = regressionEvaluator.setMetricName("mse").evaluate(prediction)
print(f"The MSE for the Gradient-boosted tree regression model is {mse:0.2f}")
# R2
r2 = regressionEvaluator.setMetricName("r2").evaluate(prediction)
print(f"The R2 for the Gradient-boosted tree regression model is {r2:0.2f}")
# MAE
mae = regressionEvaluator.setMetricName("mae").evaluate(prediction)
print(f"The MAE for the Gradient-boosted tree regression model is {mae:0.2f}")

The RMSE for the Gradient-boosted tree regression model is 32.32
The MSE for the Gradient-boosted tree regression model is 1044.42
The R2 for the Gradient-boosted tree regression model is 0.53
The MAE for the Gradient-boosted tree regression model is 3.90


#### Создадим параметрическую сетку для настройки модели:

In [8]:
param_grid = (ParamGridBuilder()
    .addGrid(gbtr.maxDepth, [2, 4, 6])
    .addGrid(gbtr.maxBins, [20, 60])
    .addGrid(gbtr.maxIter, [10, 20])
    .build())

#### Устанавливаем оптимальные параметры для модели с помощью кросс-валидации:

In [9]:
cv = CrossValidator(estimator=pipeline, \
                    estimatorParamMaps=param_grid, \
                    evaluator=RegressionEvaluator(
                                predictionCol="prediction", \
                                labelCol="quote_count", \
                                metricName="rmse"), \
                    numFolds=2)

#### Запускаем конвейер с оптимальными параметрами:

In [10]:
cv_model = cv.fit(train)

#### Предсказываем результаты на тестовом наборе данных:

In [11]:
newPrediction = cv_model.transform(test)

#### Выведем метрики для оценки модели после кросс-валидации:

In [12]:
# RMSE
rmse = regressionEvaluator.evaluate(newPrediction)
print(f"The RMSE for the Gradient-boosted tree regression model is {rmse:0.2f}")
# MSE
mse = regressionEvaluator.setMetricName("mse").evaluate(newPrediction)
print(f"The MSE for the Gradient-boosted tree regression model is {mse:0.2f}")
# R2
r2 = regressionEvaluator.setMetricName("r2").evaluate(newPrediction)
print(f"The R2 for the Gradient-boosted tree regression model is {r2:0.2f}")
# MAE
mae = regressionEvaluator.setMetricName("mae").evaluate(newPrediction)
print(f"The MAE for the Gradient-boosted tree regression model is {mae:0.2f}")

The RMSE for the Gradient-boosted tree regression model is 2.28
The MSE for the Gradient-boosted tree regression model is 749.18
The R2 for the Gradient-boosted tree regression model is 0.66
The MAE for the Gradient-boosted tree regression model is 2.28


### Задача классификации - LogisticRegression:
#### Используем уже имеющийся бинарный признак - "is_retweet"

In [13]:
csv = csv.withColumn("label", when(col("is_retweet") == True, 1).otherwise(0))
csv.show(10)
csv.printSchema()

+-------------------+-------------------+----------------------+--------------+---------------+---------------------+--------------+----------+-----------+-----------+----------+-------------+-----+
|            tweetid|             userid|user_reported_location|follower_count|following_count|account_creation_date|tweet_language|is_retweet|quote_count|reply_count|like_count|retweet_count|label|
+-------------------+-------------------+----------------------+--------------+---------------+---------------------+--------------+----------+-----------+-----------+----------+-------------+-----+
|1088195741615501312|1032591398657314817|             Palestine|       10238.0|        10118.0|           2018-08-23|            fa|     false|       90.0|       90.0|      90.0|         90.0|    0|
|1100751010597912576|1032591398657314817|             Palestine|       10238.0|        10118.0|           2018-08-23|            fa|     false|       68.0|       68.0|      68.0|         68.0|    0|
| 931

#### Будем использовать только числовые признаки и бинарный:

In [14]:
data = csv.select("follower_count", "following_count", "quote_count", "reply_count", "like_count", "retweet_count", "label")
data.show(20)

+--------------+---------------+-----------+-----------+----------+-------------+-----+
|follower_count|following_count|quote_count|reply_count|like_count|retweet_count|label|
+--------------+---------------+-----------+-----------+----------+-------------+-----+
|       10238.0|        10118.0|       90.0|       90.0|      90.0|         90.0|    0|
|       10238.0|        10118.0|       68.0|       68.0|      68.0|         68.0|    0|
|       20256.0|         3842.0|       97.0|       97.0|      97.0|         97.0|    0|
|       20256.0|         3842.0|       62.0|       62.0|      62.0|         62.0|    0|
|       10805.0|         1916.0|       97.0|       97.0|      97.0|         97.0|    0|
|       18099.0|         9861.0|       11.0|       11.0|      11.0|         11.0|    0|
|       18099.0|         9861.0|       12.0|       12.0|      12.0|         12.0|    0|
|       18099.0|         9861.0|       16.0|       16.0|      16.0|         16.0|    0|
|       15259.0|         1172.0|

#### Разделим данные на обучающую и тестовую выборку:
##### Будем использовать 70% данных для обучения и 30% для тестирования.

In [15]:
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1]
train_rows = train.count()
test_rows = test.count()
print("Training Rows:", train_rows, " Testing Rows:", test_rows)

Training Rows: 93147  Testing Rows: 39963


#### Определим конвейер:
##### Конвейер состоит из серии этапов преобразования и оценки, которые обычно подготавливают фрейм данных для моделирования, а затем обучают прогнозирующую модель. В этом случае вы создадите конвейер с тремя этапами:
1. <b>VectorAssembler:</b> Создает вектор непрерывных числовых признаков.

2. <b>MinMaxScaler:</b> Нормализует непрерывные числовые характеристики.

3. <b>Logistic Regression:</b> Обучает модель классификации.

In [16]:
numVect = VectorAssembler(inputCols = ["follower_count", "following_count", "quote_count", "reply_count", "like_count", "retweet_count"], outputCol="numFeatures")
minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), outputCol="features")

lr = LogisticRegression(labelCol="label", 
                        featuresCol="features", 
                        maxIter=10,
                        regParam=0.3)

pipeline = Pipeline(stages=[numVect, minMax, lr])

#### Запускаем конвейер:

In [17]:
piplineModel = pipeline.fit(train)

#### Генерируем предсказываемые значения:

In [18]:
prediction = piplineModel.transform(test)
predicted = prediction.select("features", "prediction", "label")
predicted.show(20, truncate=False)

+-----------------------------+----------+-----+
|features                     |prediction|label|
+-----------------------------+----------+-----+
|(6,[1],[0.05688723835246456])|0.0       |0    |
|(6,[1],[0.05688723835246456])|0.0       |0    |
|(6,[1],[0.05688723835246456])|0.0       |0    |
|(6,[1],[0.05688723835246456])|0.0       |0    |
|(6,[1],[0.05688723835246456])|0.0       |0    |
|(6,[1],[0.05688723835246456])|0.0       |0    |
|(6,[1],[0.05688723835246456])|0.0       |0    |
|(6,[1],[0.05688723835246456])|0.0       |0    |
|(6,[1],[0.05688723835246456])|0.0       |0    |
|(6,[1],[0.05688723835246456])|0.0       |0    |
|(6,[1],[0.05688723835246456])|0.0       |0    |
|(6,[1],[0.05688723835246456])|0.0       |0    |
|(6,[1],[0.05688723835246456])|0.0       |0    |
|(6,[1],[0.05688723835246456])|0.0       |0    |
|(6,[1],[0.05688723835246456])|0.0       |0    |
|(6,[1],[0.05688723835246456])|0.0       |0    |
|(6,[1],[0.05688723835246456])|0.0       |0    |
|(6,[1],[0.056887238

#### Оценка модели классификации:

In [19]:
tp = float(predicted.filter("prediction == 1.0 AND label == 1").count())
fp = float(predicted.filter("prediction == 1.0 AND label == 0").count())
tn = float(predicted.filter("prediction == 0.0 AND label == 0").count())
fn = float(predicted.filter("prediction == 0.0 AND label == 1").count())
pr = tp / (tp + fp)
re = tp / (tp + fn)
metrics = spark.createDataFrame([
 ("TP", tp),
 ("FP", fp),
 ("TN", tn),
 ("FN", fn),
 ("Precision", pr),
 ("Recall", re),
 ("F1", 2*pr*re/(re+pr))],["metric", "value"])
metrics.show()

+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|           17527.0|
|       FP|            5786.0|
|       TN|           13846.0|
|       FN|            2804.0|
|Precision|0.7518122935701111|
|   Recall|0.8620825340612858|
|       F1| 0.803180276784896|
+---------+------------------+



#### Оценка качества модели бинарной классификации с использованием метрики AUR:

In [20]:
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
aur = evaluator.evaluate(prediction)
print ("AUR = ", aur)

AUR =  0.7933823807068797


#### Настройку модели классификации с использованием кросс-валидации:

In [21]:
paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.3, 0.1]).addGrid(lr.maxIter, [10, 5]).addGrid(lr.threshold, 
                                                                                            [0.4, 0.3]).build()
cv = CrossValidator(estimator=pipeline, evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid, 
                    numFolds=2)

#### Запускаем конвейер:

In [22]:
cv_model = cv.fit(train)

#### Предсказываем результаты на тестовом наборе данных:

In [23]:
newPrediction = cv_model.transform(test)

#### Выведем метрики для оценки модели классификации после кросс-валидации:

In [24]:
tp2 = float(newPrediction.filter("prediction == 1.0 AND label == 1").count())
fp2 = float(newPrediction.filter("prediction == 1.0 AND label == 0").count())
tn2 = float(newPrediction.filter("prediction == 0.0 AND label == 0").count())
fn2 = float(newPrediction.filter("prediction == 0.0 AND label == 1").count())
pr2 = tp2 / (tp2 + fp2)
re2 = tp2 / (tp2 + fn2)
metrics2 = spark.createDataFrame([
 ("TP", tp2),
 ("FP", fp2),
 ("TN", tn2),
 ("FN", fn2),
 ("Precision", pr2),
 ("Recall", re2),
 ("F1", 2*pr2*re2/(re2+pr2))],["metric", "value"])
metrics2.show()

+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|           17530.0|
|       FP|            6122.0|
|       TN|           13510.0|
|       FN|            2801.0|
|Precision|0.7411635379671909|
|   Recall|0.8622300919777679|
|       F1|0.7971261623809198|
+---------+------------------+



#### Оценка качества модели бинарной классификации с использованием метрики AUR после кросс-валидации:

In [25]:
evaluator2 = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction", metricName="areaUnderROC")
aur2 = evaluator2.evaluate(newPrediction)
print( "AUR2 = ", aur2)

AUR2 =  0.7751961380834235
