## Предсказание стоимости жилья

В проекте вам нужно обучить модель линейной регрессии на данных о жилье в Калифорнии в 1990 году. На основе данных нужно предсказать медианную стоимость дома в жилом массиве — median_house_value. Обучите модель и сделайте предсказания на тестовой выборке. Для оценки качества модели используйте метрики RMSE, MAE и R2.

В колонках датасета содержатся следующие данные:
- longitude — широта;
- latitude — долгота;
- housing_median_age — медианный возраст жителей жилого массива;
- total_rooms — общее количество комнат в домах жилого массива;
- total_bedrooms — общее количество спален в домах жилого массива;
- population — количество человек, которые проживают в жилом массиве;
- households — количество домовладений в жилом массиве;
- median_income — медианный доход жителей жилого массива;
- median_house_value — медианная стоимость дома в жилом массиве;
- ocean_proximity — близость к океану.

# Подготовка данных

- Инициализируйте локальную Spark-сессию.
- Прочитайте содержимое файла /datasets/housing.csv.
- Выведите типы данных колонок датасета. Используйте методы pySpark.
- Выполните предобработку данных:
    - Исследуйте данные на наличие пропусков и заполните их, выбрав значения по своему усмотрению.
    - Преобразуйте колонку с категориальными значениями техникой One hot encoding.

Импортируем необходимые модули.

In [1]:
import pandas as pd 
import numpy as np
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.ml import Pipeline
# from pyspark.ml.classification import LogisticRegression
import pyspark.ml.evaluation as evals
import pyspark.ml.tuning as tune

Создадими локальную Spark-сесссию и загрузим датасет.

In [2]:
spark = SparkSession.builder \
                    .master("local[*]") \
                    .appName("California Housing") \
                    .getOrCreate()

file_path = "datasets/housing.csv"

housing = spark.read.load(file_path, format='csv', sep=',', inferSchema=True, header='true')
housing.show(5)
print('Number of rows:', housing.count())
print('Number of distinct rows:', housing.distinct().count())

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

Выведем пропущенные значения в каждой колонке.

In [3]:
def nan_count(df):
    return df.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

nan_count(housing)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|        0|       0|                 0|          0|           207|         0|         0|            0|                 0|              0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+



Заменим пропущенные значения нулем. Возможно, что они образовались в результате их незаполнения по причине отсутствия ванной комнаты.

In [4]:
housing = housing.na.fill(0)

Создадим несколько новых столбцов с признаками:
- Отношение количества комнат total_rooms к количеству домовладений households (rooms_per_household).
- Отношение количества жителей population к количеству домовладений households (population_in_household).
- Отношение количества спален total_bedrooms к общему количеству комнат total_rooms (bedroom_index).

In [5]:
housing = housing.withColumn('rooms_per_household', F.col('total_rooms') / F.col('households')) \
                 .withColumn('population_in_household', F.col('population') / F.col('households')) \
                 .withColumn('bedroom_index', F.col('total_bedrooms') / F.col('total_rooms'))

housing.printSchema()

root
 |-- longitude: double (nullable = false)
 |-- latitude: double (nullable = false)
 |-- housing_median_age: double (nullable = false)
 |-- total_rooms: double (nullable = false)
 |-- total_bedrooms: double (nullable = false)
 |-- population: double (nullable = false)
 |-- households: double (nullable = false)
 |-- median_income: double (nullable = false)
 |-- median_house_value: double (nullable = false)
 |-- ocean_proximity: string (nullable = true)
 |-- rooms_per_household: double (nullable = true)
 |-- population_in_household: double (nullable = true)
 |-- bedroom_index: double (nullable = true)



Удалим столбцы с координатами домов и проверим наличие пропущенных значений после обработки.

In [6]:
housing = housing.drop('longitude', 'latitude')
housing.show(5)

nan_count(housing)

+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+-----------------------+-------------------+
|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|rooms_per_household|population_in_household|      bedroom_index|
+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+-----------------------+-------------------+
|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|  6.984126984126984|     2.5555555555555554|0.14659090909090908|
|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|  6.238137082601054|      2.109841827768014|0.15579659106916466|
|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|  

В результате подготовки данных заменены пропущенные значения и добавлены новые признаки.

# Обучение моделей

- Постройте две модели линейной регрессии на разных наборах данных:
    - используя все данные из файла;
    - используя только числовые переменные, исключив категориальные.
- Для построения модели используйте оценщик LinearRegression из библиотеки MLlib.

In [9]:
housing = housing.withColumnRenamed('median_house_value', 'label')
train, test = housing.randomSplit([0.8, 0.2], seed=515)

Создадим конвейер для трансформации признаков.

In [None]:
ocean_proximity_indexer = StringIndexer(inputCol='ocean_proximity', outputCol='ocean_proximity_idx')
ocean_proximity_encoder = OneHotEncoder(inputCol='ocean_proximity_idx', outputCol='ocean_proximity_ohe')

scaler...

vec_assembler_all_cols = VectorAssembler(inputCols=['ocean_proximity_ohe', ...], outputCol='features')
housing_pipe_all_cols = Pipeline(stages=[ocean_proximity_indexer, ocean_proximity_encoder, ..., vec_assembler_all_cols])
train_all_cols = housing_pipe.fit(train).transform(train)
test_all_cols = housing_pipe.transform(test)

vec_assembler_num_cols = VectorAssembler(inputCols=[...], outputCol='features')
housing_pipe_num_cols = Pipeline(stages=[..., vec_assembler_num_cols])
train_num_cols = housing_pipe.fit(train).transform(train)
test_num_cols = housing_pipe.transform(test)

In [None]:
# Create a LogisticRegression Estimator
lr = LogisticRegression()

# Create a BinaryClassificationEvaluator
evaluator = evals.BinaryClassificationEvaluator(metricName='areaUnderROC')

# Create the parameter grid
grid = tune.ParamGridBuilder()

# Add the hyperparameter
grid = grid.addGrid(lr.regParam, np.arange(0, .1, .01))
grid = grid.addGrid(lr.elasticNetParam, [0, 1])

# Build the grid
grid = grid.build()

# Create the CrossValidator
cv = tune.CrossValidator(estimator=lr,
                         estimatorParamMaps=grid,
                         evaluator=evaluator,
                         numFolds=5,
                         seed=515)

# Fit cross validation models
models = cv.fit(training)

# Extract the best model
best_lr = models.bestModel

# Use the model to predict the test set
test_results = best_lr.transform(test)

# Evaluate the predictions
print(evaluator.evaluate(test_results))

In [None]:
# отключим соединение со Spark
# spark.stop()

# Анализ результатов

- Сравните результаты работы линейной регрессии на двух наборах данных по метрикам RMSE, MAE и R2. 
- Сделайте выводы.