# Предсказание стоимости жилья

Цель:
- предсказание медианной стоимости дома в жилом массиве

Задачи:
- загрузить и изучить данные
- проверить и заполнить пропущенные значения
- обучить модель линейной регрессии
    - на всех признаках
    - только на числовых
- оценить качество моделей по метрикам RMSE, MAE и R2


## Загрузка библиотек и данных

In [1]:
import pandas as pd
import pyspark
import re

from operator import and_
from functools import reduce

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.window import Window 

import pyspark.sql.functions as F

from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler, OneHotEncoder
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.pipeline import Pipeline

import seaborn as sns
import matplotlib.pyplot as plt


In [2]:
RANDOM_SEED = 7

spark = SparkSession.builder \
                    .master("local") \
                    .appName("California housing") \
                    .getOrCreate()

# Колонку индекса удаляем сразу на этапе импорта

df = spark.read.csv('../datasets/housing.csv', inferSchema = True, header=True).drop('_c0')


23/03/16 23:48:10 WARN Utils: Your hostname, MacBook-Pro-2.local resolves to a loopback address: 127.0.0.1; using 192.168.0.44 instead (on interface en0)
23/03/16 23:48:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/16 23:48:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


                                                                                

## Подготовка данных

In [3]:
df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



In [4]:
df.describe().toPandas() 


                                                                                

Unnamed: 0,summary,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,count,20640.0,20640.0,20640.0,20640.0,20433.0,20640.0,20640.0,20640.0,20640.0,20640
1,mean,-119.56970445736148,35.6318614341087,28.639486434108527,2635.7630813953488,537.8705525375618,1425.4767441860463,499.5396802325581,3.8706710029070246,206855.81690891477,
2,stddev,2.003531723502584,2.135952397457101,12.58555761211163,2181.6152515827944,421.3850700740312,1132.46212176534,382.3297528316098,1.899821717945263,115395.6158744136,
3,min,-124.35,32.54,1.0,2.0,1.0,3.0,1.0,0.4999,14999.0,<1H OCEAN
4,max,-114.31,41.95,52.0,39320.0,6445.0,35682.0,6082.0,15.0001,500001.0,NEAR OCEAN


### Выделение признаков

Разделим признаки на числовые и категориальные, выделим целевой признак.

In [5]:
cat_features = ['ocean_proximity']
num_features = ['longitude',
                'latitude',
                'housing_median_age',
                'total_rooms',
                'total_bedrooms',
                'population',
                'households',
                'median_income'
               ]
target = 'median_house_value'

### Пропущенные значения

In [6]:
print(f"{'Пропущенные значения':<25} {'nan':>5} {'null':>5} {'zero values':>12} {'empty str':>10}")
for c in df.columns:
    print(f"  {c:<20} {df.filter(F.isnan(c)).count():>5} {df.filter(F.isnull(c)).count():>5} {df.filter(F.col(c)==0).count():>12} {df.filter(F.col(c)=='').count():>10}")


Пропущенные значения        nan  null  zero values  empty str
  longitude                0     0            0          0
  latitude                 0     0            0          0
  housing_median_age       0     0            0          0
  total_rooms              0     0            0          0
  total_bedrooms           0   207            0          0
  population               0     0            0          0
  households               0     0            0          0
  median_income            0     0            0          0
  median_house_value       0     0            0          0
  ocean_proximity          0     0            0          0


In [7]:
print('Корреляция total_bedrooms')
for c in num_features+[target]:
    print(f"  {c:<25} {df.corr('total_bedrooms', c):>5.2f}")


Корреляция total_bedrooms
  longitude                  0.07
  latitude                  -0.07
  housing_median_age        -0.32
  total_rooms                0.92
  total_bedrooms             1.00
  population                 0.87
  households                 0.97
  median_income             -0.01
  median_house_value         0.05


В колонке `total_bedrooms` есть пропущенные значения.

Наибольшая корреляция у этого признака с `households`. Сгруппируем датасет по `households`, получим медианное количество спален и заполним пропуски этим значением.

В некоторых случаях в `households` не хватило данных для заполнения, поэтому используем аналогично группировки и медиану по `total_rooms` и `population`.



In [8]:
for c in ['households','total_rooms','population']:
    # создаем окно по соответствующему признаку
    window = Window().partitionBy(c)
    # добавляем временный столбец с медианами по окнам
    df = df.withColumn('median_bedrooms',F.percentile_approx('total_bedrooms', 0.5).over(window))
    # если пустое значение, то берем его из оконной медианы
    df = df.withColumn('total_bedrooms', F.coalesce('total_bedrooms', 'median_bedrooms'))

# временный стобец удалем
df = df.drop('median_bedrooms')
print(f"Пропуски в total_bedrooms {df.filter(F.isnull('total_bedrooms')).count()}")


[Stage 130:>                                                        (0 + 1) / 1]

Пропуски в total_bedrooms 0


                                                                                

### Проверка категориальных значений

In [9]:
df.select('ocean_proximity').distinct().collect()


[Row(ocean_proximity='ISLAND'),
 Row(ocean_proximity='NEAR OCEAN'),
 Row(ocean_proximity='NEAR BAY'),
 Row(ocean_proximity='<1H OCEAN'),
 Row(ocean_proximity='INLAND')]

Неявных дубликатов, ошибок не наблюдается.

### Дополнительные признаки

Добавим дополнительные признаки:
- количество комнат в домохозяйстве
- количество проживающих в домохозяйстве
- отношение количества спален к общему количеству комнат

In [10]:
df = df.withColumn('rooms_per_household', F.col('total_rooms')/F.col('households'))
df = df.withColumn('population_in_household', F.col('population')/F.col('households'))
df = df.withColumn('bedroom_index', F.col('total_bedrooms')/F.col('total_rooms'))

# соберем новые признаки в дополнительный список
new_num_features = ['rooms_per_household','population_in_household','bedroom_index']


In [11]:
print('Корреляция median_house_value')
for c in num_features + new_num_features:
    print(f"  {c:<25} {df.corr('median_house_value', c):>5.2f}")


Корреляция median_house_value
  longitude                 -0.05
  latitude                  -0.14
  housing_median_age         0.11
  total_rooms                0.13
  total_bedrooms             0.05
  population                -0.02
  households                 0.07
  median_income              0.69
  rooms_per_household        0.15
  population_in_household   -0.02
  bedroom_index             -0.26


По предварительной оценке наиболее выражена связь стоимости жилья и медианного дохода.

Также можно отметить обратную корреляцию с `bedroom_index`, то есть в более дорогом жилье больше дополнительных комнат общего назначения (доля спальных комнат ниже).

### Выбросы

In [12]:
outliers = {}
total_count = df.count()
print(f"{'количество выбросов':>35}{'доля':>8}")
for c in num_features + new_num_features + [target]:
    Q1 = df.approxQuantile(c,[0.25],relativeError=0)[0]
    Q3 = df.approxQuantile(c,[0.75],relativeError=0)[0]
    IQR = Q3 - Q1
    less_Q1 =  Q1 - 1.5*IQR
    more_Q3 =  Q3 + 1.5*IQR
    outliers |= { c: { 'less_Q1': less_Q1,
                       'more_Q3': more_Q3
                     }
                }
    outliers_count = df.filter((df[c] > more_Q3) | (df[c] < less_Q1)).count()
    print(f"{c:<25}{outliers_count:>6}{outliers_count/total_count:>8.1%}")


                количество выбросов    доля
longitude                     0    0.0%
latitude                      0    0.0%
housing_median_age            0    0.0%
total_rooms                1286    6.2%
total_bedrooms             1278    6.2%
population                 1196    5.8%
households                 1220    5.9%
median_income               680    3.3%
rooms_per_household         511    2.5%
population_in_household     711    3.4%
bedroom_index               592    2.9%
median_house_value         1071    5.2%


In [13]:
def remove_outliers(data):
    data = data.where(
        reduce(and_, (
            (val['less_Q1'] < df[col]) & (df[col] < val['more_Q3'])
            for col, val in outliers.items()
        ))
    )
    return data


In [14]:
total_count = df.count()
count_wo_outliers = remove_outliers(df).count()
print(f"Количество записей в исходном датасете {total_count}")
print(f"Количество записей без выбросов {count_wo_outliers}")
print(f"• доля выбросов {1-count_wo_outliers/total_count:.1%}")


Количество записей в исходном датасете 20640
Количество записей без выбросов 16339
• доля выбросов 20.8%


В результате экспериментов получилось, что удаление выбросов снижает качество модели.
Поэтому для обучения буду использовать полный датасет. 

## Обучение моделей

### Разделение датасета на обучающую и тестовую выборки 

In [15]:
train_data, test_data = df.randomSplit([.8,.2], seed=RANDOM_SEED)

print(f'''
    Обучающая выборка {train_data.count()} 
    тестовая выборка {test_data.count()}
    ''')



    Обучающая выборка 16481 
    тестовая выборка 4159
    


### Масштабирование и кодирование признаков

In [16]:
# категориальные – только один океан
indexer = StringIndexer(inputCols=['ocean_proximity'], 
                        outputCols=['ocean_proximity_num'])

encoder = OneHotEncoder(inputCols=['ocean_proximity_num'],
                        outputCols=['ocean_proximity_ohe'])

# числовые
num_assembler = VectorAssembler(inputCols=num_features + new_num_features,
                                outputCol='num_features')

scaler = StandardScaler(inputCol='num_features',
                        outputCol='num_features_ss')

### Подбор гиперпараметров и кроссвалидация

In [17]:
def print_metrics(pred_data,pred_col):

    metrics = { 'rmse' : None,
                'mae'  : None,
                'r2'   : None
              }

    evaluator = RegressionEvaluator(labelCol='median_house_value', 
                                    predictionCol=pred_col)

    for m in metrics.keys():
        metrics[m] = round(evaluator.setMetricName(m).evaluate(pred_data),2)
        print(f"{m.upper():<10} {metrics[m]}")

    return metrics


In [18]:
def cv_train(pipeline, train, test):
        
    paramGrid = ParamGridBuilder() \
        .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
        .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
        .build()
    
    crossval = CrossValidator(estimator=pipeline,
                              estimatorParamMaps=paramGrid,
                              evaluator=RegressionEvaluator(labelCol='median_house_value'),
                              seed=RANDOM_SEED,
                              numFolds=3
                             )

    cvModel = crossval.fit(train)

    print('Тестовая выборка')
    print_metrics(cvModel.transform(test),'prediction')

    print('\nГиперпараметры лучшей модели')
    [print(f"  {re.sub(r'.+__', '',str(idx)):<20} {val}") 
     for idx,val in cvModel.bestModel.stages[-1].extractParamMap().items()];
    
    return cvModel


### Обучение на полном датасете

In [None]:
%%time
# финальная сборка и модель
fin_assembler = VectorAssembler(inputCols=['ocean_proximity_ohe', 'num_features_ss'],
                                outputCol='features_full') 

lr = LinearRegression(labelCol='median_house_value', featuresCol='features_full')

# все в пайплайн
pipe_cat = Pipeline(stages=[indexer, encoder])
pipe_num = Pipeline(stages=[num_assembler, scaler])
pipeline = Pipeline(stages=[pipe_cat, pipe_num, fin_assembler, lr])

model_full = cv_train(pipeline, train_data, test_data)


                                                                                

23/03/16 23:49:48 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/03/16 23:49:48 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS


                                                                                

23/03/16 23:49:53 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


                                                                                

### Обучение только на числовых признаках, исключая категориальные

In [None]:
%%time
# берем пайплайн с числовыми масштабированными признаками
lr = LinearRegression(labelCol='median_house_value', featuresCol='num_features_ss')
pipeline = Pipeline(stages=[pipe_num, lr])

model_num = cv_train(pipeline, train_data, test_data)


### Сравнение с константными моделями

In [None]:
total_median_house_value = round(df.approxQuantile('median_house_value',[0.5],relativeError=0)[0],2)
total_mean_house_value = round(df.select(F.mean('median_house_value').alias('mean')).collect()[0]['mean'],2)

print('Метрики константных моделей')
print(f"\n{'• медиана':<10} {total_median_house_value}")
print_metrics(test_data.withColumn('pred_const', F.lit(total_median_house_value)),'pred_const')
print(f"\n{'• среднее':<10} {total_mean_house_value}")
print_metrics(test_data.withColumn('pred_const', F.lit(total_mean_house_value)),'pred_const');


Использование медианы и среднего для предсказания целевого признака показывает результат многократно хуже чем обученные модели.

In [None]:
spark.stop()

## Анализ результатов

**Для работы использован датасет по жилью в Калифорнии**
- `longitude` — широта;
- `latitude` — долгота;
- `housing_median_age` — медианный возраст жителей жилого массива;
- `total_rooms` — общее количество комнат в домах жилого массива;
- `total_bedrooms` — общее количество спален в домах жилого массива;
- `population` — количество человек, которые проживают в жилом массиве;
- `households` — количество домовладений в жилом массиве;
- `median_income` — медианный доход жителей жилого массива;
- `median_house_value` — медианная стоимость дома в жилом массиве;
- `ocean_proximity` — близость к океану.

**В ходе работы**
- загружен и изучен предоставленный датасет
- Заполнены пропущенные значения, проверена корректность текстовых данных.
- Созданы три дополнительных признака.
- Найдены выбросы, проведены эксперименты по обучению с удалением выбросов и без удаления.

**Обучены модели линейной регрессии**
- на полном датасете
- только на числовых признаках

**Сделаны предсказания и рассчитаны метрики**
- RMSE
- MAE
- R2

**Выводы**
- качество модели выше без удаления выбросов из обучающей выборки
- качество модели выше при обучении на всем комплексе признаков (числовые и категориальные)
- качество обученных моделей кратно выше по сравнению с константными моделями (медиана и среднее)

**Лучшая модель**
- линейная регрессия, обученная на полном датасете
- метрики модели
    - RMSE       68988.78
    - MAE        49412.64
    - R2         0.64
- гиперпараметры лучшей модели
    - aggregationDepth     2
    - elasticNetParam      0.5
    - epsilon              1.35
    - featuresCol          features_full
    - fitIntercept         True
    - labelCol             median_house_value
    - loss                 squaredError
    - maxBlockSizeInMB     0.0
    - maxIter              100
    - predictionCol        prediction
    - regParam             0.01
    - solver               auto
    - standardization      True
    - tol                  1e-06


