# Описание проекта

В проекте нужно обучить модель линейной регрессии на данных о жилье в Калифорнии в 1990 году.
На основе данных нужно предсказать медианную стоимость дома в жилом массиве — median_house_value. Мы Обучим модель и сделаем предсказания на тестовой выборке. Для оценки качества модели использовать метрики RMSE, MAE и R2.


# Описание данных

Данные были собраны в рамках переписи населения в США. Каждая строка содержит агрегированную статистику о жилом массиве. Жилой массив — минимальная географическая единица с населением от 600 до 3000 человек в зависимости от штата. Одна строка в данных содержит статистику в среднем о 1425.5 обитателях жилого массива.

В колонках датасета содержатся следующие данные:

- `longitude` — широта;
- `latitude` — долгота;
- `housing_median_age` — медианный возраст жителей жилого массива;
- `total_rooms` — общее количество комнат в домах жилого массива;
- `total_bedrooms` — общее количество спален в домах жилого массива;
- `population` — количество человек, которые проживают в жилом массиве;
- `households` — количество домовладений в жилом массиве;
- `median_income` — медианный доход жителей жилого массива;
- `median_house_value` — медианная стоимость дома в жилом массиве;
- `ocean_proximity` — близость к океану.

В большинстве колонок хранятся количественные данные, кроме одной — `ocean_proximity`. Она хранит категориальные значения.

# Инициализация и загрузка

In [2]:
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import DataFrame

from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

pyspark_version = pyspark.__version__
if int(pyspark_version[:1]) == 3:
    from pyspark.ml.feature import OneHotEncoder
    OneHotEncoder = OneHotEncoder
elif int(pyspark_version[:1]) == 2:
    from pyspark.ml.feature import OneHotEncodeEstimator
    OneHotEncoder = OneHotEncodeEstimator

RANDOM_SEED = 1337


In [3]:
spark = SparkSession.builder.master('local').appName('MLR - linear regression').getOrCreate()

In [4]:
try:
    data = spark.read.option('header', 'true').csv('/datasets/housing.csv', inferSchema=True)
except Exception:
    data = spark.read.option('header', 'true').csv('housing.csv', inferSchema=True)

In [5]:
# посмотрим как прочитались данные
data.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



In [6]:
print('Общее количество строк:', data.count())

Общее количество строк: 20640


# Предобработка данных

In [7]:
data.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [8]:
data.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in data.columns]).toPandas().transpose().reset_index().rename(columns={'index':'col', 0:'count of nan values'})

# пропуски имеет только поле total_bedrooms

Unnamed: 0,col,count of nan values
0,longitude,0
1,latitude,0
2,housing_median_age,0
3,total_rooms,0
4,total_bedrooms,207
5,population,0
6,households,0
7,median_income,0
8,median_house_value,0
9,ocean_proximity,0


In [9]:
print('Количество уникальных строк в датасете:', data.distinct().count())
# Видно, что полных дубликатов в датасете нет

Количество уникальных строк в датасете: 20640


In [10]:
df = data.fillna({'total_bedrooms' : data.select(F.median(F.col('total_bedrooms'))).collect()[0][0]})

In [11]:
df.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in data.columns]).toPandas().transpose().reset_index().rename(columns={'index':'col', 0:'count of nan values'})

Unnamed: 0,col,count of nan values
0,longitude,0
1,latitude,0
2,housing_median_age,0
3,total_rooms,0
4,total_bedrooms,0
5,population,0
6,households,0
7,median_income,0
8,median_house_value,0
9,ocean_proximity,0


**Промежуточный вывод:**

На этапе предобработки данных были обнаружены пропуски в данных и проверено наличие дубликатов.

# Обучение

In [12]:
cat_cols = ['ocean_proximity']
num_cols = ['longitude', 'latitude', 'housing_median_age', 'total_rooms', 'total_bedrooms', 'population', 'households', 'median_income']
target = ['median_house_value']

In [13]:
train_data, test_data = df.randomSplit([0.75, 0.25], seed=RANDOM_SEED)

In [16]:
indexer = StringIndexer(inputCol=cat_cols[0], outputCol=cat_cols[0]+'_idx')
i = indexer.fit(train_data)
df_train = i.transform(train_data)
df_test = i.transform(test_data)

In [17]:
encoder = OneHotEncoder(inputCol=cat_cols[0]+'_idx', outputCol=cat_cols[0]+'_ohe')
e = encoder.fit(df_train)
df_train = e.transform(df_train)
df_test = e.transform(df_test)

In [18]:
categorical_assembler = VectorAssembler(inputCols=[cat_cols[0]+'_ohe'], outputCol='cat_feature')
df_train = categorical_assembler.transform(df_train)
df_test = categorical_assembler.transform(df_test)

In [19]:
numerical_assembler = VectorAssembler(inputCols=num_cols, outputCol='num_features')
df_train = numerical_assembler.transform(df_train)
df_test  = numerical_assembler.transform(df_test)

In [20]:
standard_scaler = StandardScaler(inputCol='num_features', outputCol='num_features_scaled')
s = standard_scaler.fit(df_train)
df_train = s.transform(df_train)
df_test = s.transform(df_test)

In [21]:
df_train = VectorAssembler(inputCols=['num_features_scaled', 'cat_feature'], outputCol='features').transform(df_train).select(['features', 'num_features_scaled', target[0]])
df_test = VectorAssembler(inputCols=['num_features_scaled', 'cat_feature'], outputCol='features').transform(df_test).select(['features', 'num_features_scaled', target[0]])

In [22]:
lr_1 = LinearRegression(labelCol=target[0], featuresCol='features')
lr_2 = LinearRegression(labelCol=target[0], featuresCol='num_features_scaled')
model_1 = lr_1.fit(df_train)
model_2 = lr_2.fit(df_train)

In [23]:
predictions_1 = model_1.transform(df_test)
predictions_2 = model_2.transform(df_test)

In [24]:
class Metric_model(object):
    '''Class for get metrics for linear regression'''
    def __init__(self, data: DataFrame) -> None:
        '''init metod, create predictions'''
        self.predictions = data

    def get_rmse(self) -> float:
        '''get rmse metric for given data'''
        self.rmse = RegressionEvaluator(labelCol='median_house_value', predictionCol='prediction', metricName='rmse').evaluate(self.predictions)
        return self.rmse

    def get_mae(self) -> float:
        '''get mae metric for given data'''
        self.mae = RegressionEvaluator(labelCol='median_house_value', predictionCol='prediction', metricName='mae').evaluate(self.predictions)
        return self.mae

    def get_r2(self) -> float:
        '''get r2 metric for given data'''
        self.r2 = RegressionEvaluator(labelCol='median_house_value', predictionCol='prediction', metricName='r2').evaluate(self.predictions)
        return self.r2

In [25]:
m1 = Metric_model(predictions_1)
m2 = Metric_model(predictions_2)

In [26]:
print(f'Метрики для модели, которая обучалась с категориальными признаками.\n- RMSE: {m1.get_rmse()}\n- MAE: {m1.get_mae()}\n- R2: {m1.get_r2()}')

Метрики для модели, которая обучалась с категориальными признаками.
- RMSE: 69384.90655109576
- MAE: 50370.513218278786
- R2: 0.6313211846421067


In [27]:
print(f'Метрики для модели, которая обучалась без категориальных признаков.\n- RMSE: {m2.get_rmse()}\n- MAE: {m2.get_mae()}\n- R2: {m2.get_r2()}')

Метрики для модели, которая обучалась без категориальных признаков.
- RMSE: 70323.11283842972
- MAE: 51401.07031616258
- R2: 0.621283400717439


In [28]:
spark.stop()

# Выводы

В ходе данного проекта были загружены и изучены данные о жилье в Калифорнии в 1990 году. Данные были предобработанны, проверены на наличие пропусков, заполненны пропуски. С использованием библиотеки Spark на основе данных были обучены две модели линейной регрессии. Первая модель обучалась на полных данных с использованием категориальных признаков, а вторая моедль обучалась только на численных полях. На обеих моделях были получены предсказания и вычислены метрики. Значения метрик показывают, что модель, которая использовала категориальные признаки, делает предсказания точнее

