In [1]:
# Import other modules not related to PySpark
import os
import sys
import pandas as pd
from pandas import DataFrame
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker as mtick
import matplotlib
from mpl_toolkits.mplot3d import Axes3D
import math
from IPython.core.interactiveshell import InteractiveShell
from datetime import *
import statistics as stats
import seaborn as sns
# This helps auto print out the items without explixitly using 'print'
InteractiveShell.ast_node_interactivity = "all" 
%matplotlib inline

In [2]:
# Import PySpark related modules
import pyspark
from pyspark.rdd import RDD
from pyspark.sql import Row
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql import functions
from pyspark.sql.functions import lit, desc, col, size, array_contains\
, isnan, udf, hour, array_min, array_max, countDistinct
from pyspark.sql.types import *

MAX_MEMORY = '4G'
# Initialize a spark session.
conf = pyspark.SparkConf().setMaster("local[*]") \
        .set('spark.executor.heartbeatInterval', 10000) \
        .set('spark.network.timeout', 10000) \
        .set("spark.core.connection.ack.wait.timeout", "3600") \
        .set("spark.executor.memory", MAX_MEMORY) \
        .set("spark.driver.memory", MAX_MEMORY)
def init_spark():
    spark = SparkSession \
        .builder \
        .appName("Pyspark guide") \
        .config(conf=conf) \
        .getOrCreate()
    return spark

spark = init_spark()

path = 'date/wiki/small/'

filename_data_item = path + 'item.csv'
filename_data_item_aliase = path + 'item_aliases.csv'
filename_data_page = path + 'page.csv'
filename_data_property_aliase = path + 'property_aliases.csv'
filename_data_property = path + 'property.csv'
filename_data_statements = path + 'statements.csv'

# Load the main data set into pyspark data frame 
df_item = spark.read.csv(filename_data_item, header=True, sep=',', inferSchema=True)
df_item_aliase = spark.read.csv(filename_data_item_aliase, header=True, sep=',', inferSchema=True)
df_page = spark.read.csv(filename_data_page, header=True, sep=',', inferSchema=True)
df_property_aliase = spark.read.csv(filename_data_property_aliase, header=True, sep=',', inferSchema=True)
df_property = spark.read.csv(filename_data_property, header=True, sep=',', inferSchema=True)
df_statements = spark.read.csv(filename_data_statements, header=True, sep=',', inferSchema=True)

print('Data frame type: ' + str(type(df_item)))

Data frame type: <class 'pyspark.sql.dataframe.DataFrame'>


# **Извлечение признаков. Подготовка датасета**

In [3]:
# Получение столбцов всех Датафреймов
df_item.columns
df_item_aliase.columns
df_page.columns
df_property_aliase.columns
df_property.columns
df_statements.columns

['item_id', 'en_label', 'en_description']

['item_id', 'en_alias']

['page_id', 'item_id', 'title', 'views']

['property_id', 'en_alias']

['property_id', 'en_label', 'en_description']

['source_item_id', 'edge_property_id', 'target_item_id']

In [4]:
# Извлечем количесвто псевдонимов для каждого элемента
df_item_with_aliase = df_item.join(df_item_aliase, 'item_id', 'left')
df_count_aliase = df_item_with_aliase.groupBy('item_id').count().sort(desc('count'))
df_item_with_aliase = df_item.join(df_count_aliase, 'item_id', 'left')
df_item_with_aliase = df_item_with_aliase.withColumnRenamed('count', 'count_item_aliase')
df_item_with_aliase.show(4)

+-------+--------+--------------------+-----------------+
|item_id|en_label|      en_description|count_item_aliase|
+-------+--------+--------------------+-----------------+
|      1|Universe|totality of space...|                4|
|      3|    life|matter capable of...|                1|
|      5|   human|common name of Ho...|                5|
|      4|   death|permanent cessati...|               20|
+-------+--------+--------------------+-----------------+
only showing top 4 rows



In [5]:
# Подсчет длинны описания элемента
df_item_with_aliase = df_item_with_aliase.withColumn("en_description_length", functions.length("en_description"))
df_item_with_aliase = df_item_with_aliase.drop(df_item_with_aliase.en_description)
df_item_with_aliase.sort('count_item_aliase')
df_item_with_aliase.show(4)


DataFrame[item_id: int, en_label: string, count_item_aliase: bigint, en_description_length: int]

+-------+--------+-----------------+---------------------+
|item_id|en_label|count_item_aliase|en_description_length|
+-------+--------+-----------------+---------------------+
|      1|Universe|                4|                   34|
|      3|    life|                1|                   72|
|      5|   human|                5|                   68|
|      4|   death|               20|                   38|
+-------+--------+-----------------+---------------------+
only showing top 4 rows



In [6]:
# Подсчет количества элементов и свойств на странице
from pyspark.sql.functions import sum,avg,max,count
df_page.groupBy('page_id').count().sort(desc('count')).show(4)

df_page_with_item = df_page.join(df_item_with_aliase, 'item_id', 'left')

df_page_with_item.show(4)

+-------+-----+
|page_id|count|
+-------+-----+
|   6466|    1|
|  12027|    1|
| 182678|    1|
|  67492|    1|
+-------+-----+
only showing top 4 rows

+-------+-------+--------+-----+--------+-----------------+---------------------+
|item_id|page_id|   title|views|en_label|count_item_aliase|en_description_length|
+-------+-------+--------+-----+--------+-----------------+---------------------+
|      1|  31880|Universe|37815|Universe|                4|                   34|
|      3|  18393|    Life|21420|    life|                1|                   72|
|      5| 682482|   Human|60142|   human|                5|                   68|
|      4|   8221|   Death|25512|   death|               20|                   38|
+-------+-------+--------+-----+--------+-----------------+---------------------+
only showing top 4 rows



In [7]:
# Подсчет количества входящих ссылок к item
df_target_item = df_statements.groupBy('target_item_id').agg(count('source_item_id'))
df_target_item = df_target_item.withColumnRenamed('count(source_item_id)', 'count_in_links_to_item')

df_target_item = df_target_item.join(df_page_with_item, df_target_item.target_item_id == df_page_with_item.item_id, 'left')

df_target_item.show(4)

+--------------+----------------------+-------+-------+------------+------+--------------------+-----------------+---------------------+
|target_item_id|count_in_links_to_item|item_id|page_id|       title| views|            en_label|count_item_aliase|en_description_length|
+--------------+----------------------+-------+-------+------------+------+--------------------+-----------------+---------------------+
|           148|                   171|    148|   5405|       China|263504|People's Republic...|                8|                   18|
|           833|                    98|    833|3607937|    Malaysia|106863|            Malaysia|                6|                   49|
|         43527|                     1|  43527| 152833|Sitting Bull| 22312|        Sitting Bull|                3|                   41|
|        196615|                     1|   NULL|   NULL|        NULL|  NULL|                NULL|             NULL|                 NULL|
+--------------+----------------------+--

In [8]:
# Подсчет количества выходящих ссылок от item
df_source_item = df_statements.groupBy('source_item_id').agg(count('target_item_id'))
df_source_item = df_source_item.withColumnRenamed('count(target_item_id)', 'count_out_links_to_item')

df_target_item = df_target_item.join(df_source_item, df_source_item.source_item_id == df_target_item.item_id, 'left')

df_target_item = df_target_item.drop('target_item_id')
df_target_item = df_target_item.drop('source_item_id')

df_target_item.show(4)

+----------------------+-------+-------+------------+------+--------------------+-----------------+---------------------+-----------------------+
|count_in_links_to_item|item_id|page_id|       title| views|            en_label|count_item_aliase|en_description_length|count_out_links_to_item|
+----------------------+-------+-------+------------+------+--------------------+-----------------+---------------------+-----------------------+
|                   171|    148|   5405|       China|263504|People's Republic...|                8|                   18|                    609|
|                    98|    833|3607937|    Malaysia|106863|            Malaysia|                6|                   49|                    300|
|                     1|  43527| 152833|Sitting Bull| 22312|        Sitting Bull|                3|                   41|                   NULL|
|                     1|   NULL|   NULL|        NULL|  NULL|                NULL|             NULL|                 NULL|   

In [9]:
# Удаление столбцов, которые не будут использоваться в дальнейшем
df_target_item = df_target_item.drop('item_id')
df_target_item = df_target_item.drop('page_id')
df_target_item = df_target_item.drop('title')
df_target_item = df_target_item.drop('en_label')

In [10]:
from pyspark.sql.functions import col, count, when
# Добавления бинарного признака для каждого существующего признака, обозначающего наличие значения в соответствующем признаке
columns_with_missing_data = ['views', 'count_item_aliase', 'en_description_length', 'count_out_links_to_item']
for column in columns_with_missing_data:
    df_target_item = df_target_item.withColumn(column + "_was_is_missing", (when(col(column).isNull() | (col(column) == ""), 1).otherwise(0)) )
df_target_item.select('views', 'views_was_is_missing', 'count_item_aliase', 'count_item_aliase_was_is_missing').show(4)

+------+--------------------+-----------------+--------------------------------+
| views|views_was_is_missing|count_item_aliase|count_item_aliase_was_is_missing|
+------+--------------------+-----------------+--------------------------------+
|263504|                   0|                8|                               0|
|106863|                   0|                6|                               0|
| 22312|                   0|                3|                               0|
|  NULL|                   1|             NULL|                               1|
+------+--------------------+-----------------+--------------------------------+
only showing top 4 rows



In [11]:
from pyspark.ml.feature import Imputer

# Initialize the Imputer
imputer = Imputer(
    inputCols= columns_with_missing_data, #specifying the input column names
    outputCols=columns_with_missing_data, #specifying the output column names
    strategy="mean"                  # or "median" if you want to use the median value
)
model = imputer.fit(df_target_item)

#Transform the dataset
imputed_df = model.transform(df_target_item)

imputed_df.select(columns_with_missing_data).show(4)

+------+-----------------+---------------------+-----------------------+
| views|count_item_aliase|en_description_length|count_out_links_to_item|
+------+-----------------+---------------------+-----------------------+
|263504|                8|                   18|                    609|
|106863|                6|                   49|                    300|
| 22312|                3|                   41|                     71|
| 10409|                2|                   33|                     71|
+------+-----------------+---------------------+-----------------------+
only showing top 4 rows



In [12]:
# Сформированные датафреймы
print('Data overview df_target_item')
df_target_item.printSchema()
print('Columns overview df_target_item')
pd.DataFrame(df_target_item.dtypes, columns = ['Column Name','Data type'])

Data overview df_target_item
root
 |-- count_in_links_to_item: long (nullable = false)
 |-- views: integer (nullable = true)
 |-- count_item_aliase: long (nullable = true)
 |-- en_description_length: integer (nullable = true)
 |-- count_out_links_to_item: long (nullable = true)
 |-- views_was_is_missing: integer (nullable = false)
 |-- count_item_aliase_was_is_missing: integer (nullable = false)
 |-- en_description_length_was_is_missing: integer (nullable = false)
 |-- count_out_links_to_item_was_is_missing: integer (nullable = false)

Columns overview df_target_item


Unnamed: 0,Column Name,Data type
0,count_in_links_to_item,bigint
1,views,int
2,count_item_aliase,bigint
3,en_description_length,int
4,count_out_links_to_item,bigint
5,views_was_is_missing,int
6,count_item_aliase_was_is_missing,int
7,en_description_length_was_is_missing,int
8,count_out_links_to_item_was_is_missing,int


In [13]:
df_target_item = df_target_item.withColumn("views",df_target_item.views.cast('int'))

# **Задача регрессии**
## **Линейная регрессии**

In [14]:
columns = ['count_out_links_to_item', 'count_in_links_to_item', 'views_was_is_missing', 'count_item_aliase', 'en_description_length']
df_target = imputed_df.select(columns)
df_target.show(4)

+-----------------------+----------------------+--------------------+-----------------+---------------------+
|count_out_links_to_item|count_in_links_to_item|views_was_is_missing|count_item_aliase|en_description_length|
+-----------------------+----------------------+--------------------+-----------------+---------------------+
|                    609|                   171|                   0|                8|                   18|
|                    300|                    98|                   0|                6|                   49|
|                     71|                     1|                   0|                3|                   41|
|                     71|                     1|                   1|                2|                   33|
+-----------------------+----------------------+--------------------+-----------------+---------------------+
only showing top 4 rows



In [15]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

selected_columns = ['count_in_links_to_item', 'views_was_is_missing', 'count_item_aliase', 'en_description_length']
data_subset = df_target.select(columns)
# Создадим столбец features, который объединяет все признаки в один вектор
assembler = VectorAssembler(inputCols=selected_columns, outputCol="features")
 # = assembler.transform(df_target)
#final_data = data_subset.select("features", "count_out_links_to_item")

train_data, test_data = data_subset.randomSplit([0.7, 0.3], seed=42)

# Создание модели LinearRegression
lr = LinearRegression(featuresCol="features", labelCol="count_out_links_to_item", predictionCol="predicted_count_out_links_to_item")

# Создание конвейера
pipeline = Pipeline(stages=[assembler, lr])

# Обучение модели на обучающей выборке
lr_model = pipeline.fit(train_data)

# Прогнозирование на тестовой выборке
predictions = lr_model.transform(test_data)

In [16]:
# Оценка качества модели
evaluator = RegressionEvaluator(labelCol="count_out_links_to_item", predictionCol="predicted_count_out_links_to_item", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data: {:.3f}".format(rmse))

evaluator_r2 = RegressionEvaluator(labelCol="count_out_links_to_item", predictionCol="predicted_count_out_links_to_item", metricName="r2")
r2 = evaluator_r2.evaluate(predictions)
print("R-squared (R2) on test data: {:.3f}".format(r2))

Root Mean Squared Error (RMSE) on test data: 15.401
R-squared (R2) on test data: 0.047


In [17]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder


# Определение сетки параметров для кросс-валидации
param_grid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()


# Создание объекта CrossValidator
cv = CrossValidator(estimator=pipeline,
                            estimatorParamMaps=param_grid,
                            evaluator=evaluator,
                            numFolds=4) 
train_data.show(2)

# Обучение и подбор гиперпараметров
cv_model = cv.fit(train_data)

best_cv_model = cv_model.bestModel

# Оценка производительности на тестовом наборе
cv_prediction = best_cv_model.transform(test_data)

+-----------------------+----------------------+--------------------+-----------------+---------------------+
|count_out_links_to_item|count_in_links_to_item|views_was_is_missing|count_item_aliase|en_description_length|
+-----------------------+----------------------+--------------------+-----------------+---------------------+
|                      2|                     1|                   0|                1|                   20|
|                      4|                     1|                   0|                1|                   37|
+-----------------------+----------------------+--------------------+-----------------+---------------------+
only showing top 2 rows



In [18]:
# Оценка качества модели
evaluator = RegressionEvaluator(labelCol="count_out_links_to_item", predictionCol="predicted_count_out_links_to_item", metricName="rmse")
rmse = evaluator.evaluate(cv_prediction)
print("Root Mean Squared Error (RMSE) on test data: {:.3f}".format(rmse))

evaluator_r2 = RegressionEvaluator(labelCol="count_out_links_to_item", predictionCol="predicted_count_out_links_to_item", metricName="r2")
r2 = evaluator_r2.evaluate(cv_prediction)
print("R-squared (R2) on test data: {:.3f}".format(r2))

Root Mean Squared Error (RMSE) on test data: 15.400
R-squared (R2) on test data: 0.048


## **Градиентный бустинг**

In [19]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Выберем только нужные колонки
selected_columns = ['count_out_links_to_item', 'count_in_links_to_item', 'views_was_is_missing', 'count_item_aliase', 'en_description_length']
data_subset = df_target.select(selected_columns)

# Создадим столбец features, который объединяет все признаки в один вектор
assembler = VectorAssembler(inputCols=selected_columns[1:], outputCol="features")
data_subset = assembler.transform(data_subset)


# Разделим данные на обучающую и тестовую выборки
(training_data, test_data) = data_subset.randomSplit([0.8, 0.2], seed=42)

# Инициализация модели GBTRegressor
gbt = GBTRegressor(featuresCol="features", labelCol="count_out_links_to_item", seed=42)

# Создадим сетку параметров для подбора
param_grid = (ParamGridBuilder()
              .addGrid(gbt.maxDepth, [5, 5])
              .addGrid(gbt.maxIter, [10, 10])
              .build())

# Инициализация оценщика (evaluator)
evaluator_mse = RegressionEvaluator(labelCol="count_out_links_to_item", predictionCol="prediction", metricName="mse")

# Инициализация кросс-валидации
cross_val = CrossValidator(estimator=gbt, estimatorParamMaps=param_grid, evaluator=evaluator_mse, numFolds=3)

# Обучение модели на обучающей выборке с кросс-валидацией
cv_model = cross_val.fit(training_data)

# Прогнозирование на тестовой выборке
predictions = cv_model.transform(test_data)

# Вывод предсказаний и выбранных признаков
predictions.select("count_out_links_to_item", "prediction", *selected_columns[1:]).show()


+-----------------------+------------------+----------------------+--------------------+-----------------+---------------------+
|count_out_links_to_item|        prediction|count_in_links_to_item|views_was_is_missing|count_item_aliase|en_description_length|
+-----------------------+------------------+----------------------+--------------------+-----------------+---------------------+
|                      4| 68.63910991340485|                     1|                   0|                1|                   39|
|                      5| 67.76538811455913|                     1|                   0|                1|                   37|
|                      5| 65.35985189853736|                     2|                   0|                1|                   29|
|                      7| 69.18864115222257|                     1|                   0|                1|                   45|
|                      8| 64.03236002839013|                     4|                   0|         

In [21]:
# Оценка качества модели
evaluator = RegressionEvaluator(labelCol="count_out_links_to_item", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data: {:.3f}".format(rmse))

evaluator_r2 = RegressionEvaluator(labelCol="count_out_links_to_item", predictionCol="prediction", metricName="r2")
r2 = evaluator_r2.evaluate(predictions)
print("R-squared (R2) on test data: {:.3f}".format(r2))

Root Mean Squared Error (RMSE) on test data: 17.904
R-squared (R2) on test data: 0.046
