## Вступление

**Описание** 

В проекте необходимо обучить модель линейной регрессии на данных о жилье в Калифорнии в 1990 году. 
В колонках датасета содержатся следующие данные:

- longitude — широта;
- latitude — долгота;
- housing_median_age — медианный возраст жителей жилого массива;
- total_rooms — общее количество комнат в домах жилого массива;
- total_bedrooms — общее количество спален в домах жилого массива;
- population — количество человек, которые проживают в жилом массиве;
- households — количество домовладений в жилом массиве;
- median_income — медианный доход жителей жилого массива;
- median_house_value — медианная стоимость дома в жилом массиве;
- ocean_proximity — близость к океану.

**Цель** 

На основе данных предсказать медианную стоимость дома в жилом массиве — median_house_value. 

**Задачи** 

1. Обучить модель и сделайть предсказания на тестовой выборке.
2. Для оценки качества модели использовать метрики RMSE, MAE и R2.

**План действий:**

1. Инициализировать локальную Spark-сессию.
2. Прочитать содержимое файла housing.csv.
3. Вывести типы данных колонок датасета, используя методы pySpark.
4. Выполнить предобработку данных. 
       * Исследовать данные на наличие пропусков и заполнить их, выбрав значения по своему усмотрению. 
       * Преобразовать колонку с категориальными значениями техникой One hot encoding.
5. Построить две модели линейной регрессии на разных наборах данных:
       * используя все данные из файла;
       * используя только числовые переменные, исключив категориальные.
6. Для построения модели использовать оценщик LinearRegression из библиотеки MLlib.
7. Сравнить результаты работы линейной регрессии на двух наборах данных по метрикам RMSE, MAE и R2.
8. Сделайтб выводы.

## 1. Инициализация локальной Spark-сессии.

In [219]:
import pandas as pd 
import numpy as np

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F

from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

pyspark_version = pyspark.__version__
if int(pyspark_version[:1]) == 3:
    from pyspark.ml.feature import OneHotEncoder    
elif int(pyspark_version[:1]) == 2:
    from pyspark.ml.feature import OneHotEncodeEstimator
        
RANDOM_SEED = 2023

In [220]:
spark = SparkSession.builder \
                    .master("local") \
                    .appName("EDA California Housing") \
                    .getOrCreate()

Спарк-сессия инициализировалась корректно, можно приступать к работе

## 2. Чтение содержимого файла housing.csv.

In [221]:
df = spark.read.load('housing.csv', 
                             format="csv", sep=",", 
                             inferSchema=True, header="true")
df.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

Файл прочитался тоже корректно, не очень удобно все выглядит конечно, но отображено все корреткно

## 3. Вывод типов данных колонок датасета и описательных статистик

In [222]:
df.printSchema() 

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



In [223]:
pd.DataFrame(df.dtypes, columns=['column', 'type'])

Unnamed: 0,column,type
0,longitude,double
1,latitude,double
2,housing_median_age,double
3,total_rooms,double
4,total_bedrooms,double
5,population,double
6,households,double
7,median_income,double
8,median_house_value,double
9,ocean_proximity,string


У нас 10 колонок, 9 из них количественные (вещественные), и 1 признак категориальный (строковый)

In [224]:
df.describe().toPandas()

Unnamed: 0,summary,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,count,20640.0,20640.0,20640.0,20640.0,20433.0,20640.0,20640.0,20640.0,20640.0,20640
1,mean,-119.56970445736148,35.6318614341087,28.639486434108527,2635.7630813953488,537.8705525375618,1425.4767441860463,499.5396802325581,3.8706710029070246,206855.81690891477,
2,stddev,2.003531723502584,2.135952397457101,12.58555761211163,2181.6152515827944,421.3850700740312,1132.46212176534,382.3297528316098,1.899821717945263,115395.6158744136,
3,min,-124.35,32.54,1.0,2.0,1.0,3.0,1.0,0.4999,14999.0,<1H OCEAN
4,max,-114.31,41.95,52.0,39320.0,6445.0,35682.0,6082.0,15.0001,500001.0,NEAR OCEAN


На глаз уже видно, что есть прпоуски в одном признаке. В целом по полученным описательным статистикам трудно сказать, все ли впорядке. Смущает больше всего минимальное значение в 1 год в признаке housing_median_age (медианный возраст жителей жилого массива). То есть есть в нашем датасете жилмассивы где медианный возраст это годовалые дети. Ясли-детский дом как будто какой-то =(
Максимальный медианный доход жилмассива - 15к долларов; максимальная медианная стоимость жилмассива это полмиллиона долларов.

## 4. Предобработка данных

In [225]:
for column in df.columns:
    check_col = F.col(column).cast('float').isNull()
    print(column, df.filter(check_col).count())

longitude 0
latitude 0
housing_median_age 0
total_rooms 0
total_bedrooms 207
population 0
households 0
median_income 0
median_house_value 0
ocean_proximity 20640


Определили, что у нас есть пропуски в признаке total_bedrooms. Значение 20640 у признака ocean_proximity ложно, так как категориальный формат у нас не прошел, как "вещественный" и соотвественно дал значение True в методе isNull(). Но так как у нас ровно столько же значений - 20640, то можно считать, что пропусков в категориальном признаке ocean_proximity тоже нет.

Заполним пропуски в колонке total_bedrooms медианным значением

In [226]:
median = df.approxQuantile("total_bedrooms", [0.5], 0.01)[0]

df = df.na.fill(median, subset=["total_bedrooms"])


In [227]:
# for column in df.columns:
#     check_col = F.col(column).cast('float').isNull()
#     print(column, df.filter(check_col).count())

Успешно заполнили пропуски. Идем дальше.

### Разграничим категораильные и количественные признаки.

In [228]:
nums = df.drop('ocean_proximity', 'median_house_value').columns
cats = ['ocean_proximity']

df_nums = df.drop('ocean_proximity')

### Разделение на выборки

На этом этапе сразу начали готовить 2 датасета. 1 - со всеми признаками. 2 - только с количественными признаками.

In [229]:
# Перемешаем данные
df = df.orderBy(F.rand(RANDOM_SEED))
df_nums = df_nums.orderBy(F.rand(RANDOM_SEED))

In [230]:
train_data, test_data = df.randomSplit([.8,.2], seed=RANDOM_SEED)
print(train_data.count(), test_data.count())

train_data_nums, test_data_nums = df_nums.randomSplit([.8,.2], seed=RANDOM_SEED)
print(train_data_nums.count(), test_data_nums.count())

16519 4121
16519 4121


Успешно разделили оба датасета на обучающую и тестовые выборки в соотношении 80/20

### Масштабируем все количественные признаки

In [231]:
numerical_assembler = VectorAssembler(inputCols=nums, outputCol="nums_f")

train_data = numerical_assembler.transform(train_data)
test_data = numerical_assembler.transform(test_data)

train_data_nums = numerical_assembler.transform(train_data_nums)
test_data_nums = numerical_assembler.transform(test_data_nums)

In [232]:
nums

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income']

In [233]:
standardScaler = StandardScaler(inputCol='nums_f', outputCol="nums_f_scaled")
standardScaler_model = standardScaler.fit(train_data)

train_data = standardScaler_model.transform(train_data)
test_data = standardScaler_model.transform(test_data)

standardScaler_model_nums = standardScaler.fit(train_data_nums)

train_data_nums = standardScaler_model_nums.transform(train_data_nums)
test_data_nums = standardScaler_model_nums.transform(test_data_nums)


### Кодируем категориальные признаки

In [234]:
indexer = StringIndexer(inputCols=cats, outputCols=[c+'_idx' for c in cats]) 
indexer_model = indexer.fit(train_data)

train_data = indexer_model.transform(train_data)
test_data = indexer_model.transform(test_data)

In [235]:
encoder = OneHotEncoder(inputCols=[c+'_idx' for c in cats], outputCols=[c+'_ohe' for c in cats])
encoder_model = encoder.fit(train_data)

train_data = encoder_model.transform(train_data)
test_data = encoder_model.transform(test_data)

In [236]:
cat_assembler = VectorAssembler(inputCols=[c+'_ohe' for c in cats], outputCol="cats_f")
train_data = cat_assembler.transform(train_data)
test_data = cat_assembler.transform(test_data)

In [237]:
train_data.columns

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income',
 'median_house_value',
 'ocean_proximity',
 'nums_f',
 'nums_f_scaled',
 'ocean_proximity_idx',
 'ocean_proximity_ohe',
 'cats_f']

Все значения успешно перевели в векторный формат

### Склеиваем все признаки

In [238]:
all_features = ['cats_f','nums_f_scaled']

final_assembler = VectorAssembler(inputCols=all_features, outputCol="features") 
train_data = final_assembler.transform(train_data)
test_data = final_assembler.transform(test_data)

train_data.select(all_features).show(3)

+-------------+--------------------+
|       cats_f|       nums_f_scaled|
+-------------+--------------------+
|(4,[2],[1.0])|[-62.029642565664...|
|(4,[2],[1.0])|[-62.004701012562...|
|(4,[2],[1.0])|[-62.004701012562...|
+-------------+--------------------+
only showing top 3 rows



In [239]:
# only_nums_features = ['nums_f_scaled']

# final_assembler_nums = VectorAssembler(inputCols=only_nums_features, outputCol="nums_features") 
# df_nums = final_assembler_nums.transform(df)

# df_nums.select(only_nums_features).show(3)

## 5. Построение моделей

### Модель со всеми признаками

In [240]:
# создаем модель
lr = LinearRegression(featuresCol="features", labelCol="median_house_value", regParam=0.01)

In [241]:
# обучаем
lr_model = lr.fit(train_data)

In [242]:
# предсказываем значения
predictions = lr_model.transform(test_data)

In [243]:
# определяем метрики RMSE, MAE, R2

evaluator = RegressionEvaluator(labelCol="median_house_value", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)

evaluator = RegressionEvaluator(labelCol="median_house_value", predictionCol="prediction", metricName="mae")
mae = evaluator.evaluate(predictions)

evaluator = RegressionEvaluator(labelCol="median_house_value", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)

print("RMSE:", rmse)
print("MAE:", mae)
print("R2:", r2)


RMSE: 69492.99122412229
MAE: 49769.85420928313
R2: 0.6435510492870997


RMSE, MAE - чем ближе к нулю, тем более точные предсказания дает модель. Значение R2 близкое к 1 указывает на то, что модель очень хорошо объясняет данные, почти вся вариация в целевой переменной объясняется моделью. В нашем случае у нас слишком высокие показатели RMSE, MAE и R2 далек от единицы. Но и он больше 0.5, что в целом немного утешает.

### Модель только с количественными признаками

In [244]:
# создаем модель
lr_nums = LinearRegression(featuresCol="nums_f_scaled", labelCol="median_house_value", regParam=0.01)

In [245]:
# обучаем
lr_nums_model = lr_nums.fit(train_data_nums)

In [246]:
# предсказываем значения
predictions_nums = lr_nums_model.transform(test_data_nums)

In [247]:
# определяем метрики RMSE, MAE, R2

evaluator_nums = RegressionEvaluator(labelCol="median_house_value", predictionCol="prediction", metricName="rmse")
rmse_nums = evaluator_nums.evaluate(predictions_nums)

evaluator_nums = RegressionEvaluator(labelCol="median_house_value", predictionCol="prediction", metricName="mae")
mae_nums = evaluator_nums.evaluate(predictions_nums)

evaluator_nums = RegressionEvaluator(labelCol="median_house_value", predictionCol="prediction", metricName="r2")
r2_nums = evaluator_nums.evaluate(predictions_nums)

print("RMSE:", rmse_nums)
print("MAE:", mae_nums)
print("R2:", r2_nums)


RMSE: 70610.79420878772
MAE: 50974.94052752422
R2: 0.6319917778044624


Здесь происходит такая же ситуация, что и с предыдущей моделью. Но на вид они чуть хуже.

## 6. Сравнение метрик моделей

In [248]:
# Найдем разницу между полученными значениями.

print('delta RMSE:', rmse_nums - rmse)
print('delta MAE:', mae_nums - mae)
print('delta R2:', r2_nums - r2)

delta RMSE: 1117.8029846654244
delta MAE: 1205.0863182410903
delta R2: -0.011559271482637357


delta RMSE и delta MAE получились положительными, а delta R2 меньше нуля. Это гвоорит о том, что первая модель (со всеми признаками) справилась чуточку лучше, чем вторая модель (только с количественными признаками). Разница конечно не велика, но тем неменее. Можно судить, что значение отдаленности жилмасива от побережья значим для модели.

## Вывод

1. В рамках данного проекта применили некоторые навыки Spark и  DataFrame
2. Проделали тот же привычный путь при класическом машинном обучении (от предобработки до предсказания значений модель) но с использованием библиотеки MLlib
3. Обучили две модели. Оценили их качество на тестовой выборке с помощью RMSE, MAE и R2. Результаты получились не самыми лучшими.
4. После сравнения результатов, выявили, что "кпд" у модели, которая обучалась на всех признаках, чуточку больше, чем у той, что обучаслась только на количественных признаках.

In [249]:
# Закрываем сессию
spark.stop()