## **Импорты**
Общие модули, которые будут использованы для работы с данными

In [23]:
# Import other modules not related to PySpark
import warnings
warnings.simplefilter(action="ignore", category=FutureWarning)
import os
import sys
import pandas as pd
from pandas import DataFrame
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker as mtick
import matplotlib
from mpl_toolkits.mplot3d import Axes3D
import math
from IPython.core.interactiveshell import InteractiveShell
from datetime import *
import statistics as stats
# This helps auto print out the items without explixitly using 'print'
InteractiveShell.ast_node_interactivity = "all"
%matplotlib inline
pd.set_option("display.max_columns", None)

Модули библиотеки PySpark, служащие для взаимодействия с движком Spark. Настройка Spark и запуск сессии

In [24]:
# Import PySpark related modules
import pyspark
from pyspark.rdd import RDD

from pyspark.ml import Pipeline, Transformer
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler, OneHotEncoder
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator

from pyspark.sql import Row
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql import functions as f
from pyspark.sql.functions import lit, desc, col, size, array_contains\
, isnan, udf, hour, array_min, array_max, countDistinct, when
from pyspark.sql.types import *

MAX_MEMORY = '15G'
# Initialize a spark session.
conf = pyspark.SparkConf().setMaster("local[*]") \
        .set('spark.executor.heartbeatInterval', 10000) \
        .set('spark.network.timeout', 10000) \
        .set("spark.core.connection.ack.wait.timeout", "3600") \
        .set("spark.executor.memory", MAX_MEMORY) \
        .set("spark.driver.memory", MAX_MEMORY)
def init_spark():
    spark = SparkSession \
        .builder \
        .appName("Pyspark guide") \
        .config(conf=conf) \
        .getOrCreate()
    return spark

spark = init_spark()

## Подготовка данных
Чтение подготовленного датасета из файла

In [25]:
df = spark.read.csv("data/iranian_tweets_csv_hashed.csv", header=True, inferSchema=True)
df.limit(5).toPandas()

Unnamed: 0,tweetid,userid,user_display_name,user_screen_name,user_reported_location,user_profile_description,user_profile_url,follower_count,following_count,account_creation_date,account_language,tweet_language,tweet_text,tweet_time,tweet_client_name,in_reply_to_tweetid,in_reply_to_userid,quoted_tweet_tweetid,is_retweet,retweet_userid,retweet_tweetid,latitude,longitude,quote_count,reply_count,like_count,retweet_count,hashtags,urls,user_mentions,poll_choices
0,533622371429543936,299148448,Maria Luis,marialuis91,"Nantes, France",journaliste indépendante/un vrai journaliste e...,,8012,1450,2011-05-15,en,fr,@bellisarobz Ces photos illustrent parfaitemen...,2014-11-15 14:07,Twitter Web Client,533621573400670208,574356455,,False,,,,,0.0,0.0,0.0,0.0,,[http://fr.awdnews.com/divertissements/5757-ce...,[574356455],
1,527205814906654721,299148448,Maria Luis,marialuis91,"Nantes, France",journaliste indépendante/un vrai journaliste e...,,8012,1450,2011-05-15,en,en,@ParkerLampe An inquiry by congress confirms t...,2014-10-28 21:10,Twitter Web Client,527205439554207744,438542436,,False,,,,,0.0,0.0,0.0,0.0,,[http://www.awdnews.com/top-news/10080-an-inqu...,[438542436],
2,545166827350134784,299148448,Maria Luis,marialuis91,"Nantes, France",journaliste indépendante/un vrai journaliste e...,,8012,1450,2011-05-15,en,en,@hadeelhmaidi @wordpressdotcom CIA predict thi...,2014-12-17 10:41,Twitter Web Client,545166512714051584,256625456,,False,,,,,0.0,0.0,0.0,0.0,,[http://www.awdnews.com/political/10404-cia-pr...,"[823905, 256625456]",
3,538045437316321280,299148448,Maria Luis,marialuis91,"Nantes, France",journaliste indépendante/un vrai journaliste e...,,8012,1450,2011-05-15,en,fr,@MartinYannis l'avis bizarre de marcel tonton ...,2014-11-27 19:03,Twitter Web Client,538045077609013248,919513172,,False,,,,,0.0,0.0,0.0,0.0,,[http://fr.awdnews.com/divertissements/5796-to...,[919513172],
4,530053681668841472,299148448,Maria Luis,marialuis91,"Nantes, France",journaliste indépendante/un vrai journaliste e...,,8012,1450,2011-05-15,en,fr,@courrierinter Les laboratoires US de guerre b...,2014-11-05 17:47,Twitter Web Client,530044405785579520,83864876,,False,,,,,0.0,0.0,1.0,2.0,,[http://fr.awdnews.com/soci%C3%A9t%C3%A9/5723-...,[83864876],


Берём из датасета подходящии столбцы и избаляемся от строк с пропущенными значениями

In [26]:
df = df['follower_count', 'following_count', 'user_reported_location', 'account_language', 'tweet_language']
df = df.dropna()

Меняем тип данных столбцов follower_count и following_count на double и создадаём бинарный признак label со значениями 1 или 0. В качестве метки выбран столбец following_count, если following_count больше 3200, то значение label будет 1, а если меньше или равно, то 0

In [27]:
df = df.withColumn('follower_count', col('follower_count').cast('double'))
df = df.withColumn('following_count', col('following_count').cast('double'))
df = df.withColumn('label', when(col('following_count') > 3200, 1).otherwise(0))

Создём строковые индексы для столбцов account_language, tweet_language и  user_reported_location

In [28]:
def str_to_index(df, coll_name):
  # StringIndexer для преобразования столбцов в числовые индексы
  indexer = StringIndexer(inputCol=coll_name, outputCol=coll_name+'_index')
  df = indexer.fit(df).transform(df)
  df = df.drop(coll_name)
  return df

df = str_to_index(df, 'account_language')
df = str_to_index(df, 'tweet_language')
df = str_to_index(df, 'user_reported_location')
pd.DataFrame(df.dtypes, columns = ['Column Name','Data type'])

Unnamed: 0,Column Name,Data type
0,follower_count,double
1,following_count,double
2,label,int
3,account_language_index,double
4,tweet_language_index,double
5,user_reported_location_index,double


Разделение датасета на тренировочную и тестовую часть

In [29]:
train, test = df.randomSplit([0.8, 0.2])
train_rows = train.count()
test_rows = test.count()
print("Training Rows:", train_rows, " Testing Rows:", test_rows)

Training Rows: 447355  Testing Rows: 112003


# **Регрессия**

**Градиентный бустинг** — метод машинного обучения, который создает решающую модель прогнозирования в виде ансамбля слабых моделей прогнозирования, обычно деревьев решений. Он строит модель поэтапно, позволяя оптимизировать произвольную дифференцируемую функцию потерь.

В PySpark ML присутсвует класс **GBTRegressor**(Gradient Boosting Tree Regressor), осуществляющий обучение с использованием градиентного бустинга на основе анасамбля деревьев решений.

Построим пайплайн для осуществления обучения и предсказания.

In [30]:
# категориальные признаки
cat_columns = ['label']

# числовые признаки
num_columns = ['user_reported_location_index', 'account_language_index', 'tweet_language_index']

cat_vector = VectorAssembler(inputCols = cat_columns, outputCol="catFeatures")
cat_indexer = VectorIndexer(inputCol = "catFeatures", outputCol = "idxCatFeatures")
num_vector = VectorAssembler(inputCols = num_columns, outputCol="numFeatures")
min_max = MinMaxScaler(inputCol = "numFeatures", outputCol="normFeatures")
feat_vect = VectorAssembler(inputCols=["idxCatFeatures", "normFeatures"], outputCol="features")

# модель регрессии
gbt_regressor = GBTRegressor(featuresCol="features", labelCol = "following_count")

regression_pipeline = Pipeline(stages = [
    cat_vector,
    cat_indexer,
    num_vector,
    min_max,
    feat_vect,
    gbt_regressor
])


Проведем обучение модели при помощи пайплайна



In [31]:
regression_pipline_model = regression_pipeline.fit(train)

Получим предсказания обученой модели



In [32]:
regression_prediction = regression_pipline_model.transform(test)


Посмотрим на первые строки получившегося датасета



In [33]:
regression_prediction.select("prediction", col("following_count").alias("expected")).limit(10).toPandas()


Unnamed: 0,prediction,expected
0,714.930773,0.0
1,563.546624,1.0
2,206.05783,1.0
3,206.05783,1.0
4,206.05783,1.0
5,206.05783,1.0
6,206.05783,1.0
7,206.05783,1.0
8,206.05783,1.0
9,206.05783,1.0


Проведем оценку полученной модели используя метрику

In [34]:
regression_evaluator = RegressionEvaluator(labelCol="following_count", predictionCol="prediction", metricName="r2")
regression_evaluator.evaluate(regression_prediction)

0.7822282608993073

## **Классификация**

**Логистическая регрессия** - статистическая модель, используемая для прогнозирования вероятности возникновения некоторого события путём его сравнения с логистической кривой. Эта регрессия выдаёт ответ в виде вероятности бинарного события (1 или 0).

В PySpark ML задачу логистической регрессии решает класс **LogisticRegression**.

Построим пайплайн для осуществления обучения и предсказания.

In [35]:
# необходимые столбцы
selected_columns = ['follower_count', 'user_reported_location_index', 'account_language_index', 'tweet_language_index']

# модель логистической регрессии
logistic_regression = LogisticRegression(featuresCol='features', labelCol='label')

# пайплайн для объединения этапов
classification_pipeline = Pipeline(stages = [
    VectorAssembler(inputCols=selected_columns, outputCol='features'),
    logistic_regression
])

Проведем обучение модели при помощи пайплайна



In [36]:
classification_model = classification_pipeline.fit(train)

Получим предсказания на тестовых данных

In [37]:
classification_prediction = classification_model.transform(test)
classification_prediction.select("rawPrediction", "prediction", col('label').alias("expected")).limit(10).toPandas()

Unnamed: 0,rawPrediction,prediction,expected
0,"[0.7509870953830244, -0.7509870953830244]",0.0,0
1,"[3.4969394528468203, -3.4969394528468203]",0.0,0
2,"[3.3014185558909435, -3.3014185558909435]",0.0,0
3,"[3.3014185558909435, -3.3014185558909435]",0.0,0
4,"[3.3014185558909435, -3.3014185558909435]",0.0,0
5,"[3.3014185558909435, -3.3014185558909435]",0.0,0
6,"[3.3014185558909435, -3.3014185558909435]",0.0,0
7,"[3.3014185558909435, -3.3014185558909435]",0.0,0
8,"[3.3014185558909435, -3.3014185558909435]",0.0,0
9,"[3.3014185558909435, -3.3014185558909435]",0.0,0


Проведем оценку полученной модели используя метрику ROC AUC

In [38]:
classification_evaluator = BinaryClassificationEvaluator(labelCol='label', rawPredictionCol="rawPrediction", metricName="areaUnderROC")
classification_evaluator.evaluate(classification_prediction)

0.904474740954655

Определим функцию для построения матрицы несоответствий

In [39]:
def confusion_matrix(df, predictionCol, labelCol):
    tp = float(df.filter((col(predictionCol)==1.0) & (col(labelCol)==1)).count())
    fp = float(df.filter((col(predictionCol)==1.0) & (col(labelCol)==0)).count())
    tn = float(df.filter((col(predictionCol)==0.0) & (col(labelCol)==0)).count())
    fn = float(df.filter((col(predictionCol)==0.0) & (col(labelCol)==1)).count())
    pr = tp / (tp + fp)
    re = tp / (tp + fn)
    f1 = 2*pr*re/(re+pr)
    metrics = spark.createDataFrame([
     ("TP", tp),
     ("FP", fp),
     ("TN", tn),
     ("FN", fn),
     ("Precision", pr),
     ("Recall", re),
     ("F1", f1)],["metric", "value"])
    return metrics.toPandas()

И построим её для полученных предсказаний

In [40]:
confusion_matrix(classification_prediction, "prediction", "label")

Unnamed: 0,metric,value
0,TP,15694.0
1,FP,1865.0
2,TN,86729.0
3,FN,7715.0
4,Precision,0.893787
5,Recall,0.670426
6,F1,0.766159


# **Подбор гиперпараметров**

Воспользуемся методом кросс-валидации для подбора оптимальных параметров регрессии

In [41]:
paramGrid = ParamGridBuilder().addGrid(logistic_regression.regParam, [0.3, 0.1]).addGrid(logistic_regression.maxIter, [10, 5]).addGrid(logistic_regression.threshold,  [0.4, 0.3]).build()
cv = CrossValidator(estimator=classification_pipeline, evaluator=BinaryClassificationEvaluator(labelCol="label"), estimatorParamMaps=paramGrid, numFolds=2)

Обучим модель

In [42]:
model = cv.fit(train)

И получим новые предсказания

In [43]:
newPrediction = model.transform(test)

Проведём оценку полученных предсказаний

In [44]:
classification_evaluator.evaluate(newPrediction)

0.8947698903646487

Можем видеть, что точность предсказаний практически не изменилась.

Построим матрицу несоответствий

In [45]:
confusion_matrix(newPrediction, "prediction", "label")

Unnamed: 0,metric,value
0,TP,15617.0
1,FP,2075.0
2,TN,86519.0
3,FN,7792.0
4,Precision,0.882715
5,Recall,0.667137
6,F1,0.759933
