## Машинное обучение на больших данных. Классификация
Используемый датасет: https://www.kaggle.com/datasets/hm-land-registry/uk-housing-prices-paid

В датасете содержатся данные о продаже недвижимости в Великобритании в период с 1995 г. по 2017 г.

Был использован датасет, получившийся в результате выполнений л/р №1.

## Импорты
Общие модули, которые будут использованы для работы с данными

In [1]:
# Import other modules not related to PySpark
import warnings
warnings.simplefilter(action="ignore", category=FutureWarning)
import os
import sys
import pandas as pd
from pandas import DataFrame
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker as mtick
import matplotlib
from mpl_toolkits.mplot3d import Axes3D
import math
from IPython.core.interactiveshell import InteractiveShell
from datetime import *
import statistics as stats
# This helps auto print out the items without explixitly using 'print'
InteractiveShell.ast_node_interactivity = "all" 
%matplotlib inline
pd.set_option("display.max_columns", None)

Модули библиотеки PySpark, служащие для взаимодействия с движком Spark. Настройка Spark и запуск сессии

In [2]:
# Import PySpark related modules
import pyspark
from pyspark.rdd import RDD

from pyspark.ml import Pipeline, Transformer
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler, OneHotEncoder
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator

from pyspark.sql import Row
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql import functions as f
from pyspark.sql.functions import lit, desc, col, size, array_contains\
, isnan, udf, hour, array_min, array_max, countDistinct
from pyspark.sql.types import *

MAX_MEMORY = '15G'
# Initialize a spark session.
conf = pyspark.SparkConf().setMaster("local[*]") \
        .set('spark.executor.heartbeatInterval', 10000) \
        .set('spark.network.timeout', 10000) \
        .set("spark.core.connection.ack.wait.timeout", "3600") \
        .set("spark.executor.memory", MAX_MEMORY) \
        .set("spark.driver.memory", MAX_MEMORY)
def init_spark():
    spark = SparkSession \
        .builder \
        .appName("Pyspark guide") \
        .config(conf=conf) \
        .getOrCreate()
    return spark

spark = init_spark()

## Подготовка данных
Чтение подготовленного датасета из файла

In [3]:
df = spark.read.csv("data/prepered.csv", header=True, inferSchema=True)
df.limit(10).toPandas()

Unnamed: 0,Transaction unique identifier,Price,Date of Transfer,Town/City,District,County,additional_entry,freehold,new,property_type,terraced,semi_detached,detached,flats,other
0,{81B82214-7FBC-4129-9F6B-4956B4A663AD},25000,1995-08-18,OLDHAM,OLDHAM,GREATER MANCHESTER,0,1,0,terraced,1,0,0,0,0
1,{8046EC72-1466-42D6-A753-4956BF7CD8A2},42500,1995-08-09,GRAYS,THURROCK,THURROCK,0,1,0,semi_detached,0,1,0,0,0
2,{278D581A-5BF3-4FCE-AF62-4956D87691E6},45000,1995-06-30,HIGHBRIDGE,SEDGEMOOR,SOMERSET,0,1,0,terraced,1,0,0,0,0
3,{1D861C06-A416-4865-973C-4956DB12CD12},43150,1995-11-24,BEDFORD,NORTH BEDFORDSHIRE,BEDFORDSHIRE,0,1,0,terraced,1,0,0,0,0
4,{DD8645FD-A815-43A6-A7BA-4956E58F1874},18899,1995-06-23,WAKEFIELD,LEEDS,WEST YORKSHIRE,0,1,0,semi_detached,0,1,0,0,0
5,{895E4E63-203F-476A-9AA9-42389DD0AE5C},81750,1995-05-19,SALISBURY,SALISBURY,WILTSHIRE,0,1,0,semi_detached,0,1,0,0,0
6,{FB195C27-E790-45FD-847A-4238BC94546A},56000,1995-03-10,WITNEY,WEST OXFORDSHIRE,OXFORDSHIRE,0,1,0,semi_detached,0,1,0,0,0
7,{1D6B01EC-DC33-4147-8A21-4238BEB2D4C1},31000,1995-03-02,ST. AUSTELL,RESTORMEL,CORNWALL,0,1,0,semi_detached,0,1,0,0,0
8,{B8D0F817-4553-448D-A2C1-4238BF81C6FA},82000,1995-06-16,GREENFORD,EALING,GREATER LONDON,0,1,0,semi_detached,0,1,0,0,0
9,{6DD27423-CC39-4B31-A848-4238D58268D4},10000,1995-05-17,FERNDALE,RHONDDA,MID GLAMORGAN,0,1,0,terraced,1,0,0,0,0


Добавим в датасет числовые признаки, которые получим, упорядочив графства, районы и города по средней цене в них и проиндексировав их.

In [4]:
def translate(dictionary):
    def translate_(column):
        return dictionary.get(column)
    return udf(translate_, IntegerType())

def index_in_order(dataframe, input_col, output_col):
    names = dataframe.select(input_col, "Price").groupBy(input_col).avg().orderBy("avg(Price)").select(input_col).rdd.flatMap(lambda x: x).collect()
    mapping = {name: i for i, name in enumerate(names)}
    
    return dataframe.withColumn(output_col, translate(mapping)(input_col))

In [5]:
df = index_in_order(df, "County", "county_index")
df = index_in_order(df, "District", "district_index")
df = index_in_order(df, "Town/City", "city_index")
df.limit(10).toPandas()

Unnamed: 0,Transaction unique identifier,Price,Date of Transfer,Town/City,District,County,additional_entry,freehold,new,property_type,terraced,semi_detached,detached,flats,other,county_index,district_index,city_index
0,{81B82214-7FBC-4129-9F6B-4956B4A663AD},25000,1995-08-18,OLDHAM,OLDHAM,GREATER MANCHESTER,0,1,0,terraced,1,0,0,0,0,37,87,72
1,{8046EC72-1466-42D6-A753-4956BF7CD8A2},42500,1995-08-09,GRAYS,THURROCK,THURROCK,0,1,0,semi_detached,0,1,0,0,0,81,261,574
2,{278D581A-5BF3-4FCE-AF62-4956D87691E6},45000,1995-06-30,HIGHBRIDGE,SEDGEMOOR,SOMERSET,0,1,0,terraced,1,0,0,0,0,85,250,537
3,{1D861C06-A416-4865-973C-4956DB12CD12},43150,1995-11-24,BEDFORD,NORTH BEDFORDSHIRE,BEDFORDSHIRE,0,1,0,terraced,1,0,0,0,0,67,55,675
4,{DD8645FD-A815-43A6-A7BA-4956E58F1874},18899,1995-06-23,WAKEFIELD,LEEDS,WEST YORKSHIRE,0,1,0,semi_detached,0,1,0,0,0,42,193,247
5,{895E4E63-203F-476A-9AA9-42389DD0AE5C},81750,1995-05-19,SALISBURY,SALISBURY,WILTSHIRE,0,1,0,semi_detached,0,1,0,0,0,104,296,855
6,{FB195C27-E790-45FD-847A-4238BC94546A},56000,1995-03-10,WITNEY,WEST OXFORDSHIRE,OXFORDSHIRE,0,1,0,semi_detached,0,1,0,0,0,116,391,948
7,{1D6B01EC-DC33-4147-8A21-4238BEB2D4C1},31000,1995-03-02,ST. AUSTELL,RESTORMEL,CORNWALL,0,1,0,semi_detached,0,1,0,0,0,88,191,273
8,{B8D0F817-4553-448D-A2C1-4238BF81C6FA},82000,1995-06-16,GREENFORD,EALING,GREATER LONDON,0,1,0,semi_detached,0,1,0,0,0,119,405,936
9,{6DD27423-CC39-4B31-A848-4238D58268D4},10000,1995-05-17,FERNDALE,RHONDDA,MID GLAMORGAN,0,1,0,terraced,1,0,0,0,0,1,0,2


Объявим специальный транформатор, который добавит в датасет колонку весов, соответствующую целевой колонке. Это нужно потому, что количество едениц и нулей в целевой колонке не сбалансированно. Проставление весов даст еденицам, которые в меньшинстве, более высокий вес, в результате чего они будут иметь большее влияние на модель

In [6]:
class WeightSetter(Transformer):
    _paramMap = {}
    _params = None 
    def __init__(self, inputCol, outputCol="weight"):
        self.inputCol = inputCol
        self.outputCol = outputCol
        
    def _transform(self, df):
        (col1, count1), (col2, count2) = df.groupBy(self.inputCol).count().collect()
        val1 = count2 / (count1 + count2)
        return df.withColumn(self.outputCol, f.when(col(self.inputCol)==col1, val1).otherwise(1 - val1))

Разделение датасета на тренировочную и тестовую часть

In [7]:
splits = df.randomSplit([0.95, 0.05])
train = splits[0]
test = splits[1]
train_rows = train.count()
test_rows = test.count()
print("Training Rows:", train_rows, " Testing Rows:", test_rows)

Training Rows: 20060441  Testing Rows: 1054649


## Классификация

**Логистическая регрессия** - статистическая модель, используемая для прогнозирования вероятности возникновения некоторого события путём его сравнения с логистической кривой. Эта регрессия выдаёт ответ в виде вероятности бинарного события (1 или 0).

В PySpark ML задачу логистической регрессии решает класс **LogisticRegression**.

Построим пайплайн для осуществления обучения и предсказания. 

In [8]:
lr = LogisticRegression(featuresCol="features", labelCol="new", weightCol="weight", maxIter=10, regParam=0.3)

classification_pipeline = Pipeline(stages = [
    #VectorAssembler(inputCols = ["additional_entry", "freehold", "terraced", "semi_detached", "detached", "flats", "other"], outputCol="catFeatures"), 
    VectorAssembler(inputCols = ["Price", "county_index", "district_index", "city_index"], outputCol="numFeatures"),
    MinMaxScaler(inputCol = "numFeatures", outputCol="normFeatures"),
    VectorAssembler(inputCols=["additional_entry", "freehold", "terraced", "semi_detached", "detached", "flats", "other", "normFeatures"], outputCol="features"),
    WeightSetter(inputCol = "new", outputCol = "weight"),
    lr
])

Проведем обучение модели при помощи пайплайна

In [9]:
classification_model = classification_pipeline.fit(train)

Получим предсказания обученой модели

In [10]:
classification_prediction = classification_model.transform(test)

In [11]:
classification_prediction.select("rawPrediction", "prediction", col("new").alias("expected")).limit(10).toPandas()

Unnamed: 0,rawPrediction,prediction,expected
0,"[-0.26526547841855375, 0.26526547841855375]",1.0,0
1,"[-0.25988564095865635, 0.25988564095865635]",1.0,0
2,"[0.29705441239014363, -0.29705441239014363]",0.0,0
3,"[0.6113011185691646, -0.6113011185691646]",0.0,0
4,"[0.4072801588351568, -0.4072801588351568]",0.0,0
5,"[0.3642178912376316, -0.3642178912376316]",0.0,0
6,"[0.3001789226104967, -0.3001789226104967]",0.0,0
7,"[0.3028967580730879, -0.3028967580730879]",0.0,0
8,"[0.31798500348773295, -0.31798500348773295]",0.0,0
9,"[0.343235628851267, -0.343235628851267]",0.0,0


Проведем оценку полученной модели используя метрику ROC AUC

In [12]:
classification_evaluator = BinaryClassificationEvaluator(labelCol="new", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
classification_evaluator.evaluate(classification_prediction)

0.6739207592935774

Определим функцию для построения матрицы несоответствий

In [13]:
def confusion_matrix(df, predictionCol, labelCol):
    tp = float(df.filter((col(predictionCol)==1.0) & (col(labelCol)==1)).count())
    fp = float(df.filter((col(predictionCol)==1.0) & (col(labelCol)==0)).count())
    tn = float(df.filter((col(predictionCol)==0.0) & (col(labelCol)==0)).count())
    fn = float(df.filter((col(predictionCol)==0.0) & (col(labelCol)==1)).count())
    pr = tp / (tp + fp)
    re = tp / (tp + fn)
    f1 = 2*pr*re/(re+pr)
    metrics = spark.createDataFrame([
     ("TP", tp),
     ("FP", fp),
     ("TN", tn),
     ("FN", fn),
     ("Precision", pr),
     ("Recall", re),
     ("F1", f1)],["metric", "value"])
    return metrics.toPandas()

И построим её для полученных предсказаний

In [14]:
confusion_matrix(classification_prediction, "prediction", "new")

Unnamed: 0,metric,value
0,TP,70772.0
1,FP,350550.0
2,TN,596194.0
3,FN,37133.0
4,Precision,0.167976
5,Recall,0.655873
6,F1,0.267454


## Подбор параметров

Воспользуемся методом кросс-валидации для подбора оптимальных параметров регрессии 

In [15]:
paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.3, 0.1]).addGrid(lr.maxIter, [10, 5]).addGrid(lr.threshold,  [0.4, 0.3]).build() 
cv = CrossValidator(estimator=classification_pipeline, evaluator=BinaryClassificationEvaluator(labelCol="new"), estimatorParamMaps=paramGrid, numFolds=2)

Обучим модель

In [16]:
model = cv.fit(train)

И получим новые предсказания

In [17]:
newPrediction = model.transform(test)

Проведём оценку полученных предсказаний

In [18]:
classification_evaluator.evaluate(newPrediction)

0.6749729684813583

Можем видеть, что точность предсказаний практически не изменилась.

Построим матрицу несоответствий

In [19]:
confusion_matrix(newPrediction, "prediction", "new")

Unnamed: 0,metric,value
0,TP,85379.0
1,FP,531925.0
2,TN,414819.0
3,FN,22526.0
4,Precision,0.138309
5,Recall,0.791242
6,F1,0.23546


По низким значениям TN и FN можно понять, что модель практически не предсказывает значения, равные 0, а значит не подходит для адекватной классификации, и исходный параметр лучше подходили для этой цели.