## Предсказание стоимости жилья

В проекте вам нужно обучить модель линейной регрессии на данных о жилье в Калифорнии в 1990 году. На основе данных нужно предсказать медианную стоимость дома в жилом массиве. Обучите модель и сделайте предсказания на тестовой выборке. Для оценки качества модели используйте метрики RMSE, MAE и R2.

### Импорт библиотек

In [1]:
%%capture
!pip install missingno

In [2]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.feature import Imputer, OneHotEncoder, StandardScaler, VectorAssembler, StringIndexer

from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

import warnings
warnings.filterwarnings('ignore')

RANDOM_SEED = 42

### Инциализация Spark-сессии

In [3]:
# инциализируем спарк-сессию, без ограничений по кол-ву используемых процессоров
spark = SparkSession.builder \
    .master('local[*]') \
    .appName('House_price_predict_regressions') \
    .getOrCreate()

23/05/20 08:17:15 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


### Загрузка данных

In [7]:
data = (spark
        .read
        .options(inferSchema=True, header='true')
        .csv('/datasets/housing.csv') 
        .repartition(16)) # разделим данные на 16 частей, потому что у нас 16 ядер процессора

print('Предоставленные типы данных:')
data.printSchema()

# выведим кол-во данных  
print('Всего строк данных -', data.count())              

Предоставленные типы данных:
root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)

Всего строк данных - 20640


# Подготовка данных

In [8]:
data_train_to_preprocessing, data_test_to_preprocessings = (data
                                         .randomSplit([.8,.2], seed=RANDOM_SEED))

numeric_cols = data.columns[:-1]
numeric_cols.remove('median_house_value')

target = 'median_house_value'
categorical_cols = [data.columns[-1]]

# Создаем Imputer для заполнения пропущенных значений медианой
imputer = Imputer(
            inputCols=numeric_cols,
            outputCols=["{}_imputed".format(c) for c in numeric_cols]
          ).setStrategy("median")

# Создаем StringIndexer для преобразования категориальных признаков в числовые
indexer = StringIndexer(inputCol=categorical_cols[0],
                        outputCol=categorical_cols[0]+'_idx').setHandleInvalid("keep")

# Создаем VectorAssembler для объединения числовых признаков в один вектор
numerical_assembler = VectorAssembler(inputCols=["{}_imputed".format(c) for c in numeric_cols],
                                      outputCol="numerical_features")

# Создаем StandardScaler для стандартизации числовых признаков
scaler = StandardScaler(inputCol="numerical_features",
                        outputCol="scaled_numerical_features",
                        withStd=True, withMean=True)

# Создаем OneHotEncoder для кодирования категориальных признаков
encoder = OneHotEncoder(inputCols=[categorical_cols[0]+'_idx'],
                        outputCols=[categorical_cols[0]+'_ohe'])

# Векторизуем категориальные признаки
categorical_assembler = VectorAssembler(inputCols=[categorical_cols[0]+'_ohe'],
                                        outputCol="categorical_features")

# Создаем VectorAssembler для объединения закодированных категориальных признаков и стандартизированных числовых признаков в один вектор
final_assembler = VectorAssembler(inputCols=["categorical_features", "scaled_numerical_features"],
                                  outputCol="features")

# Объединяем все шаги в единый пайплайн
pipeline = Pipeline(stages=[imputer, indexer, numerical_assembler, scaler, encoder, categorical_assembler, final_assembler])

In [9]:
# обучим пайплайн на тренировочной выборке и приемним к тренировочной и тестовой
model = pipeline.fit(data_train_to_preprocessing)
data_train_preprossed = model.transform(data_train_to_preprocessing).select("scaled_numerical_features", "median_house_value", "features")
data_test_preprossed = model.transform(data_test_to_preprocessings).select("scaled_numerical_features", "median_house_value", "features")

                                                                                

In [10]:
# соединим тренировочку и тестовую выборки
data = data_train_preprossed.unionAll(data_test_preprossed)

In [11]:
# создадим объекты для обучения, включающие только числовые признаки
train_data_numeric, test_data_numeric = (data
                                         .select('scaled_numerical_features', 'median_house_value')
                                         .randomSplit([.8,.2], seed=RANDOM_SEED))

print(train_data_numeric.count(), test_data_numeric.count()) 



16537 4103


                                                                                

In [12]:
# создадим дополнительный объекты для обучения, включающие и числовые и категориальные признаки
train_data, test_data = (data
                         .select("features", 'median_house_value')
                         .randomSplit([.8,.2], seed=RANDOM_SEED))

print(train_data.count(), test_data.count()) 



16537 4103


                                                                                

# Обучение моделей

In [13]:
# Инициализация переменных для хранения лучших значений метрик и модели
best_rmse = 10**10
best_mae = 10**10
best_r2 = -10
best_model = None, None
type_trainig = None

# Используем два датасета для обучения модели: с использованием всех признаков и только числовых признаков
train_dfs = [train_data, train_data_numeric]
test_dfs = [test_data, test_data_numeric]

# Список метрик, которые будем использовать для оценки модели
metrics = ["rmse", "mae", "r2"]

# Проходимся по каждому датасету для обучения модели
for i, (train, test) in enumerate(zip(train_dfs, test_dfs)):
    # Обучение модели с использованием всех признаков
    if i == 0:
        lr = LinearRegression(labelCol=target, featuresCol="features")

    # Обучение модели только на числовых признаках
    if i == 1:
        lr = LinearRegression(labelCol=target, featuresCol="scaled_numerical_features")
    
    # Определяем сетку гиперпараметров для перебора
    paramGrid = ParamGridBuilder() \
        .addGrid(lr.regParam, [0.1, 0.01]) \
        .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.5, 1.0]) \
        .build()

    # Инициализация TrainValidationSplit для кросс-валидации и выбора лучших гиперпараметров
    tvs = TrainValidationSplit(estimator=lr,
                               estimatorParamMaps=paramGrid,
                               evaluator=RegressionEvaluator(labelCol="median_house_value", predictionCol="prediction", metricName="rmse"),
                               trainRatio=0.8)

    # Обучение модели и выбор наилучшего набора гиперпараметров
    model = tvs.fit(train)

    # Получение предсказаний на тестовых данных
    predictions = model.transform(test)

    # Получение метрик на тестовых данных
    metrics_dict = {}
    for metric in metrics:
        evaluator = RegressionEvaluator(labelCol="median_house_value", predictionCol="prediction", metricName=metric)
        metric_value = evaluator.evaluate(predictions)
        metrics_dict[metric] = metric_value

    # Сохраняем лучшую модель и метрики
    if metrics_dict["rmse"] < best_rmse and metrics_dict["mae"] < best_mae and metrics_dict["r2"] > best_r2:
        best_rmse = metrics_dict["rmse"]
        best_mae = metrics_dict["mae"]
        best_r2 = metrics_dict["r2"]
        best_model = lr, model
        type_trainig = 'модель была обучена на всех признаков' if i == 0 else 'модель была обучена только на числовых признаков'

    print(f"Results for {'all features' if i == 0 else 'only numerical features'}:")
    print(f"RMSE: {metrics_dict['rmse']}")
    print(f"MAE: {metrics_dict['mae']}")
    print(f"R^2: {metrics_dict['r2']}")

spark.stop()

23/05/20 08:19:06 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/05/20 08:19:06 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
23/05/20 08:19:07 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
23/05/20 08:19:07 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
                                                                                

Results for all features:
RMSE: 68621.40670562946
MAE: 49863.62712955019
R^2: 0.6308764834999265


                                                                                

Results for only numerical features:
RMSE: 72285.74027976584
MAE: 52447.10177271138
R^2: 0.6191282157677731


# Анализ результатов

In [11]:
print(f'''
Процесс выполнения проекта состоял из нескольких частей:
    1) предобработка данных:
        1.1. Индексация категориальных переменных;
        1.2. Очистка от пропусков путём подставления медианного значения для числовых переменных;
        1.3. Векторизация числовых переменных;
        1.4. Станадртизация числовых, векторизованных переменных;
        1.5. Кодировка категориальных переменных - OHE;
        1.6. Векторизация категориальных переменных;
        1.7. Сведение категориальных и числовых переменных в единый фрейм.
    
    2) разделение признаков на 2 обучающие и тестовые группы:
        1.1. Содержащая все признаки;
        1.2. Содержащая только числовые признаки.
        
    3) Обучение модели и выбор лучшей модели.
        1.1. Модели были обучены на 2-х фреймах, из пункта 2.
        1.2. Был реализован подбор лучший параметров и кросс-валидация, с целью - избежать переобучения модели
    
    В результате обучения - лучшей моделью стала модель {best_model}
    У этой модели, с такими результатами предпобработки получились следующие метрики качества::
        rmse - {round(best_rmse)}
        mae - {round(best_mae)}
        best_r2 - {round(best_r2, 2)}
    При этом {type_trainig}.
''')


Процесс выполнения проекта состоял из нескольких частей:
    1) предобработка данных:
        1.1. Индексация категориальных переменных;
        1.2. Очистка от пропусков путём подставления медианного значения для числовых переменных;
        1.3. Векторизация числовых переменных;
        1.4. Станадртизация числовых, векторизованных переменных;
        1.5. Кодировка категориальных переменных - OHE;
        1.6. Векторизация категориальных переменных;
        1.7. Сведение категориальных и числовых переменных в единый фрейм.
    
    2) разделение признаков на 2 обучающие и тестовые группы:
        1.1. Содержащая все признаки;
        1.2. Содержащая только числовые признаки.
        
    3) Обучение модели и выбор лучшей модели.
        1.1. Модели были обучены на 2-х фреймах, из пункта 2.
        1.2. Был реализован подбор лучший параметров и кросс-валидация, с целью - избежать переобучения модели
    
    В результате обучения - лучшей моделью стала модель (LinearRegression_f9123