In [1]:
# Disable warnings from pandas that are caused by pyspark
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

# Import other modules not related to PySpark
import numpy as np 
import pandas as pd 
import os

In [2]:
# Import PySpark related modules
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.types import FloatType

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler

from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.classification import RandomForestClassifier

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator
from pyspark.ml.feature import StandardScaler, MinMaxScaler

spark = SparkSession.builder.master("local[*]").getOrCreate()

## Построение линейной регрессии ##

### Подготовка данных ###

Загрузим данные из файла.

In [3]:
df = spark.read.csv('./data/clean_data.csv', inferSchema=True, header=True)

In [4]:
df.show(10)

+---------+---------+---------+------+-------------+-----+------+-----+-----+------------+-----------+
|    price|  geo_lat|  geo_lon|region|building_type|level|levels|rooms| area|kitchen_area|object_type|
+---------+---------+---------+------+-------------+-----+------+-----+-----+------------+-----------+
|2500000.0|43.634693|39.727165|  2843|            2|    2|     8|    0| 25.0|         5.0|          1|
|1500000.0|  54.8887|38.605206|    81|            3|    2|     2|    2| 40.0|         6.0|          1|
|4235680.0|55.696846|37.907326|    81|            1|   25|    25|    1| 36.8|         9.3|         11|
|3690000.0| 55.57747|38.205257|    81|            2|    6|    10|    1| 43.0|        11.0|          1|
|2310000.0|55.016808| 83.00891|  9654|            1|    1|    10|    3|59.02|         8.5|          1|
|2300000.0| 54.94714|82.958595|  9654|            1|    4|    10|    2|63.13|       12.05|         11|
|2460000.0|55.053036| 82.89764|  9654|            3|    5|    19|    1| 4

Обозначаем Y и список X.

In [5]:
features = ['region', 'levels', 
            'area', 'kitchen_area']
target = 'price'
attributes = features + [target]
sample = df.select(attributes)

Векторизуем X.

In [6]:
assembler = VectorAssembler(inputCols=features,
                            outputCol='features')
output = assembler.transform(sample)

In [7]:
output.show()

+------+------+-----+------------+---------+--------------------+
|region|levels| area|kitchen_area|    price|            features|
+------+------+-----+------------+---------+--------------------+
|  2843|     8| 25.0|         5.0|2500000.0|[2843.0,8.0,25.0,...|
|    81|     2| 40.0|         6.0|1500000.0| [81.0,2.0,40.0,6.0]|
|    81|    25| 36.8|         9.3|4235680.0|[81.0,25.0,36.8,9.3]|
|    81|    10| 43.0|        11.0|3690000.0|[81.0,10.0,43.0,1...|
|  9654|    10|59.02|         8.5|2310000.0|[9654.0,10.0,59.0...|
|  9654|    10|63.13|       12.05|2300000.0|[9654.0,10.0,63.1...|
|  9654|    19| 41.0|        12.0|2460000.0|[9654.0,19.0,41.0...|
|  9654|    10| 58.0|         8.0|2160000.0|[9654.0,10.0,58.0...|
|  9654|    10| 59.0|        19.0|3100000.0|[9654.0,10.0,59.0...|
|  9654|    18| 74.0|         7.0|2050000.0|[9654.0,18.0,74.0...|
|  2922|    23| 38.0|        12.1|2280000.0|[2922.0,23.0,38.0...|
|    81|     4| 39.8|        10.0|3200000.0|[81.0,4.0,39.8,10.0]|
|  2604|  

### Построение простейшей модели ###

Делим векторизованную выборку на train и test.

In [8]:
train, test = output.randomSplit([0.95, 0.05])

Создаем объект для модели линейной регрессии.

In [9]:
simple_lr = LinearRegression(featuresCol='features', labelCol='price')

Обучаем модель.

In [10]:
simple_model = simple_lr.fit(train)

Оценка коэффициента детерминации модели.

In [11]:
simple_sum = simple_model.summary
print("RMSE: %f" % simple_sum.rootMeanSquaredError)
print("r2: %f" % simple_sum.r2)

RMSE: 1505014.274937
r2: 0.370568


Оценка модели на тестовой выборке.

In [12]:
simle_test = simple_model.transform(test)
simle_test.select("prediction","price","features").show(5)
simple_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="price",metricName="r2")
print("R Squared (R2) on test data = %g" % simple_evaluator.evaluate(simle_test))

+------------------+---------+-------------------+
|        prediction|    price|           features|
+------------------+---------+-------------------+
|1194491.6765482042|9000000.0|  [3.0,3.0,6.0,2.0]|
| 1772500.319194454|1900000.0| [3.0,3.0,18.0,3.0]|
| 2167462.618557431|2200000.0| [3.0,3.0,26.0,4.0]|
|2844272.6366917593|4500000.0|[3.0,3.0,36.5,10.8]|
| 3036256.399812459|5800000.0|[3.0,3.0,41.2,10.0]|
+------------------+---------+-------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.373421


Вывод: ...

### Нормализация данных ###

Проведем нормализацию значений к значениям диапозона [0, 1].

In [13]:
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
scalerModel = scaler.fit(output)
scaledData = scalerModel.transform(output)

In [14]:
print("Features scaled to range: [%f, %f]" % (scaler.getMin(), scaler.getMax()))
scaledData.select("features", "scaledFeatures").show(truncate=False)

Features scaled to range: [0.000000, 1.000000]
+-------------------------+-----------------------------------------------------------------------------------+
|features                 |scaledFeatures                                                                     |
+-------------------------+-----------------------------------------------------------------------------------+
|[2843.0,8.0,25.0,5.0]    |[0.04589157307909833,0.18421052631578946,0.22438346648141716,0.18217710095882686]  |
|[81.0,2.0,40.0,6.0]      |[0.0012604023592146724,0.02631578947368421,0.3980548801667246,0.2385786802030457]  |
|[81.0,25.0,36.8,9.3]     |[0.0012604023592146724,0.631578947368421,0.3610049785805256,0.42470389170896794]   |
|[81.0,10.0,43.0,11.0]    |[0.0012604023592146724,0.23684210526315788,0.4327891629037861,0.52058657642414]    |
|[9654.0,10.0,59.02,8.5]  |[0.15595055344590775,0.23684210526315788,0.6182702327196944,0.3795826283135928]    |
|[9654.0,10.0,63.13,12.05]|[0.15595055344590775,0.2368421

Делим векторизованную и нормализованную выборку на train и test.

In [15]:
train_norm, test_norm = scaledData.randomSplit([0.95, 0.05])

### Построение модели на нормализованных данных ###

Создаем новый объект для модели линейной регрессии.

In [16]:
norm_lr = LinearRegression(featuresCol='scaledFeatures', labelCol='price')

Создаем объект модели.

In [17]:
norm_model = norm_lr.fit(train_norm)

Оценка коэффициента детерминации.

In [18]:
norm_sum = norm_model.summary
print("RMSE: %f" % norm_sum.rootMeanSquaredError)
print("r2: %f" % norm_sum.r2)

RMSE: 1504966.513911
r2: 0.370636


Оценка модели на тестовой выборке.

In [19]:
norm_test = norm_model.transform(test_norm)
norm_test.select("prediction","price","scaledFeatures").show(5)
norm_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="price",metricName="r2")
print("R Squared (R2) on test data = %g" % norm_evaluator.evaluate(norm_test))

+------------------+---------+--------------------+
|        prediction|    price|      scaledFeatures|
+------------------+---------+--------------------+
|3552295.9880668204|4000000.0|[0.0,0.0,0.590251...|
| 3205773.533298606|2350000.0|[0.0,0.0263157894...|
|1903098.8862979573|6589900.0|[0.0,0.0526315789...|
|2166975.1838930096|2200000.0|[0.0,0.0526315789...|
| 2574231.009407202|5800000.0|[0.0,0.0526315789...|
+------------------+---------+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.372141


Вывод: Результат примерно тот же. Теперь попробуем изменить гиперпараметры.

### Подбор гиперпараметров ###

Создаем объект линейной регрессии.

In [20]:
par_lr = LinearRegression(featuresCol='scaledFeatures', labelCol='price')

Оперделяем наборы параметров, на которых будет происходить поиск наиболее удачной модели.

In [21]:
grid_search = ParamGridBuilder().addGrid(par_lr.regParam, [0.0, 0.01, 0.1, 0.25, 0.5, 1.0]).addGrid(par_lr.elasticNetParam, [0.5, 1.0]).build()

### Построение модели на нормализованных данных с гиперпараметрами ###

Определение метрики качества.

In [22]:
par_evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='price', metricName="r2")

Запуск кроссвалидации, которая осуществляет подбор наилучших параметров модели.

In [23]:
cv = CrossValidator(estimator=par_lr,
                    estimatorParamMaps=grid_search,
                    evaluator=par_evaluator)
cv_model = cv.fit(train_norm)

Оценка качества модели.

In [24]:
par_sum = cv_model.bestModel.summary
print("RMSE: %f" % par_sum.rootMeanSquaredError)
print("r2: %f" % par_sum.r2)

RMSE: 1504966.513911
r2: 0.370636


Оценка коэффициента детерминации на тестовой выборке.

In [25]:
par_test = cv_model.bestModel.transform(test_norm)
par_test.select("prediction","price","features").show(5)
print("R Squared (R2) on test data = %g" % par_evaluator.evaluate(par_test))

+------------------+---------+------------------+
|        prediction|    price|          features|
+------------------+---------+------------------+
| 3552296.119136994|4000000.0|[3.0,1.0,56.6,9.8]|
|3205773.6624871437|2350000.0|[3.0,2.0,48.8,7.0]|
|1903099.0026139005|6589900.0|[3.0,3.0,21.5,2.0]|
| 2166975.292768276|2200000.0|[3.0,3.0,26.0,4.0]|
|2574231.1074509313|5800000.0|[3.0,3.0,33.0,7.0]|
+------------------+---------+------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.372141


## Построение модели классификации ##

### Подготовка данных. ###

Данные (датасет) уже был загружен в df на этапе построения регрессии, поэтому подготовка данных 
в виде загрузки датасета из файла не требуется.

Для начала необходимо определить среднюю площадь жилья. Среднее значение площади будет выступать разделителем
2-ух категорий.

In [26]:
df.describe().show()

+-------+------------------+-----------------+-----------------+------------------+------------------+-----------------+------------------+------------------+-----------------+------------------+------------------+
|summary|             price|          geo_lat|          geo_lon|            region|     building_type|            level|            levels|             rooms|             area|      kitchen_area|       object_type|
+-------+------------------+-----------------+-----------------+------------------+------------------+-----------------+------------------+------------------+-----------------+------------------+------------------+
|  count|           3770519|          3770519|          3770519|           3770519|           3770519|          3770519|           3770519|           3770519|          3770519|           3770519|           3770519|
|   mean|3387280.3516600765|53.84268174798873|50.23187692441584|3719.4153221346983|2.0523766091617626|6.034259474624051|11.027972011280145|1

In [27]:
mean_area = df.select(mean('area')).collect()[0][0]
mean_area

48.16346755977078

In [28]:
print(df.where(df['area']> 48.5).count()/df.count())

0.4240161632921091


Наша модель классификации должна определять к какой категории по площади относится жилье, обозначенное в сделке. 
Если площадь жилья > 48.16346755977078, то жилье относится к 1 категории, иначе ко 2 категории.

Добавим бинарный признак в датафрейм.

In [29]:
new_df = df.select("*", ((col("area") > mean_area).cast("Int").alias("area_type")))
new_df.show(20)

+---------+---------+---------+------+-------------+-----+------+-----+-----+------------+-----------+---------+
|    price|  geo_lat|  geo_lon|region|building_type|level|levels|rooms| area|kitchen_area|object_type|area_type|
+---------+---------+---------+------+-------------+-----+------+-----+-----+------------+-----------+---------+
|2500000.0|43.634693|39.727165|  2843|            2|    2|     8|    0| 25.0|         5.0|          1|        0|
|1500000.0|  54.8887|38.605206|    81|            3|    2|     2|    2| 40.0|         6.0|          1|        0|
|4235680.0|55.696846|37.907326|    81|            1|   25|    25|    1| 36.8|         9.3|         11|        0|
|3690000.0| 55.57747|38.205257|    81|            2|    6|    10|    1| 43.0|        11.0|          1|        0|
|2310000.0|55.016808| 83.00891|  9654|            1|    1|    10|    3|59.02|         8.5|          1|        1|
|2300000.0| 54.94714|82.958595|  9654|            1|    4|    10|    2|63.13|       12.05|      

Далее определим наиболее значимые (с большей корреляцией) признаки, которые необходимо включить в список 
признаков для обучения модели.

Согласно таблицы корреляции можно выделить следующие параметры: price, kitchen_area, rooms.

In [30]:
features = ['rooms', 'price', 'kitchen_area']
target = 'area_type'
attributes = features + [target]
sample = new_df.select(attributes)

Векторизуем features.

In [31]:
assembler = VectorAssembler(inputCols=features,
                            outputCol='features')
output = assembler.transform(sample)

In [32]:
output.show()

+-----+---------+------------+---------+--------------------+
|rooms|    price|kitchen_area|area_type|            features|
+-----+---------+------------+---------+--------------------+
|    0|2500000.0|         5.0|        0| [0.0,2500000.0,5.0]|
|    2|1500000.0|         6.0|        0| [2.0,1500000.0,6.0]|
|    1|4235680.0|         9.3|        0| [1.0,4235680.0,9.3]|
|    1|3690000.0|        11.0|        0|[1.0,3690000.0,11.0]|
|    3|2310000.0|         8.5|        1| [3.0,2310000.0,8.5]|
|    2|2300000.0|       12.05|        1|[2.0,2300000.0,12...|
|    1|2460000.0|        12.0|        0|[1.0,2460000.0,12.0]|
|    2|2160000.0|         8.0|        1| [2.0,2160000.0,8.0]|
|    2|3100000.0|        19.0|        1|[2.0,3100000.0,19.0]|
|    3|2050000.0|         7.0|        1| [3.0,2050000.0,7.0]|
|    1|2280000.0|        12.1|        0|[1.0,2280000.0,12.1]|
|    1|3200000.0|        10.0|        0|[1.0,3200000.0,10.0]|
|    2|4760301.0|        14.1|        1|[2.0,4760301.0,14.1]|
|    1|2

Нормализуем данные.

In [33]:
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
scalerModel = scaler.fit(output)
scaledData = scalerModel.transform(output)

In [34]:
print("Features scaled to range: [%f, %f]" % (scaler.getMin(), scaler.getMax()))
scaledData.select("features", "scaledFeatures").show(truncate=False)

Features scaled to range: [0.000000, 1.000000]
+---------------------+------------------------------------------------------------+
|features             |scaledFeatures                                              |
+---------------------+------------------------------------------------------------+
|[0.0,2500000.0,5.0]  |[0.0,0.24707855347860863,0.18217710095882686]               |
|[2.0,1500000.0,6.0]  |[0.2222222222222222,0.14412915619585503,0.2385786802030457] |
|[1.0,4235680.0,9.3]  |[0.1111111111111111,0.4257657633543384,0.42470389170896794] |
|[1.0,3690000.0,11.0] |[0.1111111111111111,0.3695883362450854,0.52058657642414]    |
|[3.0,2310000.0,8.5]  |[0.3333333333333333,0.22751816799488547,0.3795826283135928] |
|[2.0,2300000.0,12.05]|[0.2222222222222222,0.2264886740220579,0.5798082346305697]  |
|[1.0,2460000.0,12.0] |[0.1111111111111111,0.2429605775872985,0.5769881556683587]  |
|[2.0,2160000.0,8.0]  |[0.2222222222222222,0.21207575840247242,0.3513818386914834] |
|[2.0,3100000.0,19

Разделение данных на тестувую и обучающую выборки.

In [35]:
train_norm, test_norm = scaledData.randomSplit([0.95, 0.05])

### Построение модели с дефолтными параметрами ###

Создадим объект модели.

In [52]:
norm_rf = RandomForestClassifier(featuresCol = 'scaledFeatures', labelCol = 'area_type')

Обучаем модель.

In [53]:
norm_model = norm_rf.fit(train_norm)

In [55]:
norm_test = norm_model.transform(test_norm)
norm_test.select("prediction","area_type","scaledFeatures").show(5)
summ = norm_model.summary
print(summ.accuracy)
print(summ.areaUnderROC)

+----------+---------+--------------------+
|prediction|area_type|      scaledFeatures|
+----------+---------+--------------------+
|       0.0|        1|[0.0,0.0102949397...|
|       0.0|        0|[0.0,0.0133834216...|
|       0.0|        0|[0.0,0.0360322890...|
|       0.0|        1|[0.0,0.0411797589...|
|       0.0|        0|[0.0,0.0463272287...|
+----------+---------+--------------------+
only showing top 5 rows

0.9240119561236613
0.9655635199368947


In [39]:
preds_and_labels = norm_test.select(['prediction','area_type']).withColumn('area_type', F.col('area_type').cast(FloatType())).orderBy('prediction')
preds_and_labels = preds_and_labels.select(['prediction','area_type'])
metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
print(metrics.confusionMatrix().toArray())

[[99844.  7790.]
 [ 6569. 73884.]]


In [40]:
#DEBUG
#norm_test.select("prediction","area_type","scaledFeatures").where(norm_test["prediction"]==1.0).show(20)
#DEBUG

### Подбор гиперпараметров для модели классификации ###

Гиперпараметры модели клафиссикации "Рандомный лес": 'numTrees', 'maxDepth'.

Создаем объект модели.

In [41]:
par_rf = RandomForestClassifier(featuresCol = 'scaledFeatures', labelCol = 'area_type')

Определение наборов параметров.

In [45]:
grid_search = ParamGridBuilder().addGrid(par_rf.numTrees, [1, 3, 5, 10]).addGrid(par_rf.maxDepth, [1, 3, 5]).build()

### Построение модели классификации на нормализованных данных с гиперпараметрами ###

Определение метрики качества.

In [46]:
par_evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='area_type', metricName="r2")

Запуск кроссвалидации, которая осуществляет подбор наилучших параметров модели.

In [47]:
cv = CrossValidator(estimator=par_rf,
                    estimatorParamMaps=grid_search,
                    evaluator=par_evaluator)
cv_model = cv.fit(train_norm)

Оценка качества модели.

In [51]:
par_sum = cv_model.bestModel.summary
print(par_sum.accuracy)
print(par_sum.areaUnderROC)

0.9248091799090674
0.9593072406287176


In [49]:
predictions = cv_model.bestModel.transform(test_norm)
predictions.select('prediction', 'probability', 'area_type').show(25)

+----------+--------------------+---------+
|prediction|         probability|area_type|
+----------+--------------------+---------+
|       0.0|[0.73264907135874...|        1|
|       0.0|[0.97538867952839...|        0|
|       0.0|[0.97538867952839...|        0|
|       0.0|[0.97538867952839...|        1|
|       0.0|[0.97538867952839...|        0|
|       0.0|[0.97538867952839...|        0|
|       0.0|[0.97538867952839...|        0|
|       0.0|[0.97538867952839...|        0|
|       0.0|[0.97538867952839...|        0|
|       0.0|[0.97538867952839...|        0|
|       0.0|[0.97538867952839...|        0|
|       0.0|[0.97538867952839...|        0|
|       0.0|[0.97538867952839...|        0|
|       0.0|[0.97538867952839...|        0|
|       0.0|[0.97538867952839...|        0|
|       0.0|[0.97538867952839...|        0|
|       0.0|[0.97538867952839...|        0|
|       0.0|[0.97538867952839...|        0|
|       0.0|[0.97538867952839...|        0|
|       0.0|[0.97538867952839...

In [50]:
preds_and_labels = predictions.select(['prediction','area_type']).withColumn('area_type', F.col('area_type').cast(FloatType())).orderBy('prediction')
preds_and_labels = preds_and_labels.select(['prediction','area_type'])
metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
print(metrics.confusionMatrix().toArray())

[[100354.   7280.]
 [  6895.  73558.]]
