# Описание проекта
В данном проекте необходимо обучить модель линейной регрессии на данных о жилье в Калифорнии в 1990 году. Данный датасет был рассмотрен ранее в четвёртой теме курса. Колонки датасета содержат следующие данные:
- longitude — широта;
- latitude — долгота;
- housing_median_age — медианный возраст жителей жилого массива;
- total_rooms — общее количество комнат в домах жилого массива;
- total_bedrooms — общее количество спален в домах жилого массива;
- population — количество человек, которые проживают в жилом массиве;
- households — количество домовладений в жилом массиве;
- median_income — медианный доход жителей жилого массива;
- median_house_value — медианная стоимость дома в жилом массиве;
- ocean_proximity — близость к океану.

Задачей является построение модели, которая будет предсказывать медианную стоимость дома в жилом массиве (median_house_value) на основе имеющихся данных. Для оценки качества модели будут использоваться метрики RMSE (Root Mean Squared Error), MAE (Mean Absolute Error) и R2 (коэффициент детерминации).

# Инструкция по выполнению проекта
Для успешного выполнения проекта следуйте следующим шагам:

1. Инициализируйте локальную Spark-сессию.
2. Прочитайте содержимое файла /datasets/housing.csv с помощью Spark.
3. Выведите типы данных колонок датасета с использованием методов PySpark.
4. Выполните предобработку данных, включая обработку пропусков и кодирование категориальных переменных с использованием One-Hot Encoding.
5. Постройте две модели линейной регрессии на разных наборах данных:
   - Используя все данные из файла;
   - Используя только числовые переменные и исключив категориальные.
6. Для построения моделей используйте оценщик LinearRegression из библиотеки MLlib.
7. Сравните результаты работы линейной регрессии на двух наборах данных по метрикам RMSE, MAE и R2.
8. Сделайте выводы на основе результатов сравнения моделей.



# Импорт 

In [1]:
#import
import pandas as pd
import numpy as np
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import col,isnan, when, count

from pyspark.ml.regression import LinearRegression
from pyspark.mllib.evaluation import RegressionMetrics

from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StringIndexer

from pyspark.sql import SparkSession

In [2]:
#spark
spark  = SparkSession.builder.appName('HousePriceAnalyze').getOrCreate()

In [3]:
data = spark.read.option('header', 'true')\
    .csv('/datasets/housing.csv', inferSchema = True)

                                                                                

In [4]:
data.show(10)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [5]:
data.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



In [6]:
feature_columns = ['longitude'
                   ,'latitude'
                   ,'housing_median_age'
                   ,'total_rooms'
                   ,'total_bedrooms'
                   ,'population'
                   ,'households'
                   ,'median_income'
                   ,'median_house_value'
                   ,'ocean_proximity']

In [7]:
data.describe().show(10, False)

                                                                                

+-------+-------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+---------------+
|summary|longitude          |latitude         |housing_median_age|total_rooms       |total_bedrooms    |population        |households       |median_income     |median_house_value|ocean_proximity|
+-------+-------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+---------------+
|count  |20640              |20640            |20640             |20640             |20433             |20640             |20640            |20640             |20640             |20640          |
|mean   |-119.56970445736148|35.6318614341087 |28.639486434108527|2635.7630813953488|537.8705525375618 |1425.4767441860465|499.5396802325581|3.8706710029070246|206855.81690891474|null           |
|stddev |2.003531723

In [8]:
pd.DataFrame(data.dtypes, columns = ['Name', 'Type'])

Unnamed: 0,Name,Type
0,longitude,double
1,latitude,double
2,housing_median_age,double
3,total_rooms,double
4,total_bedrooms,double
5,population,double
6,households,double
7,median_income,double
8,median_house_value,double
9,ocean_proximity,string


# Подготовка данных

In [9]:
#Проверка на наличие пропусков 
for column in data.columns:
    count_of_nulls = data.filter(col(column).isNull() | isnan(col(column))).count()
    print(column, count_of_nulls, count_of_nulls)

longitude 0 0
latitude 0 0
housing_median_age 0 0
total_rooms 0 0
total_bedrooms 207 207
population 0 0
households 0 0
median_income 0 0
median_house_value 0 0
ocean_proximity 0 0


Пропуски в поле total_bedrooms 207 шт

In [10]:
#Заполним единицами, так как как минимум одна кровать будет 
data = data.na.fill(value=1,subset=["total_bedrooms"])

In [11]:
data.select(feature_columns).toPandas().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 20640 entries, 0 to 20639
Data columns (total 10 columns):
 #   Column              Non-Null Count  Dtype  
---  ------              --------------  -----  
 0   longitude           20640 non-null  float64
 1   latitude            20640 non-null  float64
 2   housing_median_age  20640 non-null  float64
 3   total_rooms         20640 non-null  float64
 4   total_bedrooms      20640 non-null  float64
 5   population          20640 non-null  float64
 6   households          20640 non-null  float64
 7   median_income       20640 non-null  float64
 8   median_house_value  20640 non-null  float64
 9   ocean_proximity     20640 non-null  object 
dtypes: float64(9), object(1)
memory usage: 1.6+ MB


# Промежуточный вывод 1

Проанализировал данные датасет. Обнаружены и заполнены пропуски в аттрибуте 'total_bedrooms'

# Обучение моделей

In [12]:
train_data, test_data = data.randomSplit([0.75, 0.25], 12345)

sIndexer = StringIndexer(inputCol='ocean_proximity', outputCol='ocean_proximity_ind', handleInvalid='skip')
ohEncoder = OneHotEncoder(inputCol="ocean_proximity_ind", outputCol="ocean_proximity_index") 

sIndexer_fitted = sIndexer.fit(train_data)
train_data = sIndexer_fitted.transform(train_data)
test_data = sIndexer_fitted.transform(test_data)

fitted_ohe = ohEncoder.fit(train_data)

train_data = fitted_ohe.transform(train_data)
test_data = fitted_ohe.transform(test_data)

train_data = train_data.drop('ocean_proximity')
test_data = test_data.drop('ocean_proximity')

                                                                                


<div class="alert alert-block alert-info">
<b>Совет: </b>В идеале обучение OneHotEncoder и других трансформеров (включая StringIndexer и StandardScaler...) через fit должно производиться на обучающей выборке, а не на всей. Так будет предотвращена утечка данных (из теста в трейн). В твоем случае разбиение на выборки происходит после их обучения. Получается, что модель ещё до обучения немного заглядывает в тестовую выборку (условно из будущего).
    
Нужно разбить датасет на две выборки: обучающую и тестовую. Вызов fit допустим только для обучающей, transform - для обеих. Так будет предотвращено переобучение моделя. Но полагаться на случай мы не можем. Поэтому должны использовать две выборки и предотвращать утечку. Если у тебя остаются вопросы, не стесняйся, пожалуйста, задать их в Пачке преподавателю. Тема утечки очень часто поднимается на собеседованиях.
</div>


# Промежуточный вывод 2 

Категориальный признак ocean_proximity был закодирован с помощью OneHotEncoder и добавлен в датасет, как 'ocean_proximity_index'

In [13]:
feature_assembly = VectorAssembler(inputCols = [  'longitude'
                                                , 'latitude'
                                                , 'housing_median_age'
                                                , 'total_rooms'
                                                , 'total_bedrooms'
                                                , 'population'
                                                , 'households'
                                                , 'median_income']
                                    , outputCol = 'features')
train_data = feature_assembly.transform(train_data)
test_data = feature_assembly.transform(test_data)

scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=False)
scalerData = scaler.fit(train_data)

train_data = scalerData.transform(train_data)
test_data = scalerData.transform(test_data)

                                                                                

In [14]:
lmodel = LinearRegression(featuresCol = 'scaled_features', labelCol = 'median_house_value')
lregressor = lmodel.fit(train_data)

print()
print("Scores on train dataset for model with numeric features only")
train_pred_results = lregressor.evaluate(train_data)
print("MAE score on train dataset is: %2f"% train_pred_results.meanAbsoluteError)
print("R2 score on train dataset is: %2f"% train_pred_results.r2)
print("RMSE score on train dataset is: %2f"% train_pred_results.rootMeanSquaredError)

print()
print("Scores on test dataset for model with numeric features only")
test_pred_results = lregressor.evaluate(test_data)
print("MAE score on test dataset is: %2f"% test_pred_results.meanAbsoluteError)
print("R2 score on test dataset is: %2f"% test_pred_results.r2)
print("RMSE score on test dataset is: %2f"% test_pred_results.rootMeanSquaredError)

23/12/07 09:45:24 WARN Instrumentation: [ad0eff08] regParam is zero, which might cause numerical instability and overfitting.
23/12/07 09:45:24 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/12/07 09:45:24 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
23/12/07 09:45:24 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
23/12/07 09:45:24 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
                                                                                


Scores on train dataset for model with numeric features only


                                                                                

MAE score on train dataset is: 51437.035618
R2 score on train dataset is: 0.628248
RMSE score on train dataset is: 70262.489470

Scores on test dataset for model with numeric features only
MAE score on test dataset is: 50298.380008
R2 score on test dataset is: 0.652517
RMSE score on test dataset is: 68287.493740


In [15]:
category_feature_assembly = VectorAssembler(
    inputCols=["scaled_features", "ocean_proximity_index"],
    outputCol="features_with_categorial")

train_data = category_feature_assembly.transform(train_data)
test_data = category_feature_assembly.transform(test_data)

In [16]:
lmodelv2 = LinearRegression(featuresCol = 'features_with_categorial', labelCol = 'median_house_value')
lregressorv2 = lmodelv2.fit(train_data)

print()
print("Scores on train dataset for model with category features")
train_pred_results_v2 = lregressorv2.evaluate(train_data)
print("MAE score on train dataset is: %2f"% train_pred_results_v2.meanAbsoluteError)
print("R2 score on train dataset is: %2f"% train_pred_results_v2.r2)
print("RMSE score on train dataset is: %2f"% train_pred_results.rootMeanSquaredError)

print()
print("Scores on test dataset for model with category features")
test_pred_results_v2 = lregressorv2.evaluate(test_data)
print("MAE score on test dataset is: %2f"% test_pred_results_v2.meanAbsoluteError)
print("R2 score on test dataset is: %2f"% test_pred_results_v2.r2)
print("RMSE score on test dataset is: %2f"% test_pred_results_v2.rootMeanSquaredError)

23/12/07 09:45:30 WARN Instrumentation: [5a315a24] regParam is zero, which might cause numerical instability and overfitting.
                                                                                


Scores on train dataset for model with category features


                                                                                

MAE score on train dataset is: 50334.583039
R2 score on train dataset is: 0.638080
RMSE score on train dataset is: 70262.489470

Scores on test dataset for model with category features
MAE score on test dataset is: 49066.207802
R2 score on test dataset is: 0.663423
RMSE score on test dataset is: 67207.344611


In [17]:
spark.stop()

# Анализ результатов

# Итоговый вывод

В ходе анализа предоставленного датасета '/datasets/housing.csv' были проведены следующие шаги:

1. Обнаружены пропуски в числовом атрибуте 'total_bedrooms', которые были заполнены дефолтным значением '1'.

2. Категориальный атрибут 'ocean_proximity' со значениями [ISLAND, NEAR OCEAN, NEAR BAY, <1H OCEAN, INLAND] был преобразован для использования в модели. Для этой цели были использованы StringIndexer и OneHotEncoder для трансформации категориального атрибута в числовой формат.

Далее были обучены две модели линейной регрессии:

### Первая модель, обученная только на числовых атрибутах:

Результаты на обучающей и тестовой выборках:

- MAE (Mean Absolute Error) на обучающей выборке: 51437.04
- R2 (коэффициент детерминации) на обучающей выборке: 0.6282
- RMSE (Root Mean Squared Error) на обучающей выборке: 70262.49

- MAE (Mean Absolute Error) на тестовой выборке: 50298.38
- R2 (коэффициент детерминации) на тестовой выборке: 0.6525
- RMSE (Root Mean Squared Error) на тестовой выборке: 68287.49

### Вторая модель, обученная на всех доступных атрибутах, включая категориальные:

Результаты на обучающей и тестовой выборках:

- MAE (Mean Absolute Error) на обучающей выборке: 50334.58
- R2 (коэффициент детерминации) на обучающей выборке: 0.6381
- RMSE (Root Mean Squared Error) на обучающей выборке: 70262.49

- MAE (Mean Absolute Error) на тестовой выборке: 49066.21
- R2 (коэффициент детерминации) на тестовой выборке: 0.6634
- RMSE (Root Mean Squared Error) на тестовой выборке: 67207.34

В заключение, обученные модели линейной регрессии не позволяют сделать точные прогнозы и требуют дополнительных исследований и улучшений для увеличения качества предсказаний. Данная работа может быть использована как отправная точка для дальнейших исследований и разработки более точных моделей.