# Описание проекта

В проекте вам нужно обучить модель линейной регрессии на данных о жилье в Калифорнии в 1990 году. На основе данных нужно предсказать медианную стоимость дома в жилом массиве. Обучите модель и сделайте предсказания на тестовой выборке. Для оценки качества модели используйте метрики RMSE, MAE и R2.

В колонках датасета содержатся следующие данные:  
* longitude — широта;
* latitude — долгота;
* housing_median_age — медианный возраст домов в жилом массиве;
* total_rooms — общее количество комнат в домах жилого массива;
* total_bedrooms — общее количество спален в домах жилого массива;
* population — количество человек, которые проживают в жилом массиве;
* households — количество домовладений в жилом массиве;
* median_income — медианный доход жителей жилого массива;
* median_house_value — медианная стоимость дома в жилом массиве;
* ocean_proximity — близость к океану.

На основе данных нужно предсказать медианную стоимость дома в жилом массиве — median_house_value. Для оценки качества модели используются метрики: RMSE, MAE и R2.

# Подготовка данных

In [1]:
import pandas as pd 
import numpy as np
import pyspark

from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
import seaborn as sns
from pyspark.ml.feature import OneHotEncoder 
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
RANDOM_SEED = 2022

**Инициализируем локальную Spark-сессию**

In [2]:
spark = SparkSession.builder \
                    .appName("EDA California Housing") \
                    .getOrCreate()

**Прочитаем содержимое файла '\/datasets/housing.csv'**

In [3]:
df_housing = spark.read.load('/datasets/housing.csv',
                                        format='csv',
                                             sep=',',
                                    inferSchema=True,
                                         header=True)


df_housing.printSchema()

                                                                                

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



**Выведем первые пять строк**

In [4]:
pd.DataFrame(df_housing.take(5), columns=df_housing.columns)

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,-122.23,37.88,41.0,880.0,129.0,322.0,126.0,8.3252,452600.0,NEAR BAY
1,-122.22,37.86,21.0,7099.0,1106.0,2401.0,1138.0,8.3014,358500.0,NEAR BAY
2,-122.24,37.85,52.0,1467.0,190.0,496.0,177.0,7.2574,352100.0,NEAR BAY
3,-122.25,37.85,52.0,1274.0,235.0,558.0,219.0,5.6431,341300.0,NEAR BAY
4,-122.25,37.85,52.0,1627.0,280.0,565.0,259.0,3.8462,342200.0,NEAR BAY


**Выведем типы данных колонок датасета. Используем для этого метод pySpark dtypes**

In [5]:
print(pd.DataFrame(df_housing.dtypes, columns=['column', 'type']))

               column    type
0           longitude  double
1            latitude  double
2  housing_median_age  double
3         total_rooms  double
4      total_bedrooms  double
5          population  double
6          households  double
7       median_income  double
8  median_house_value  double
9     ocean_proximity  string


**Более детально изучим данные. Для этого вызовем метод describe**

In [6]:
df_housing.describe().toPandas()

                                                                                

Unnamed: 0,summary,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,count,20640.0,20640.0,20640.0,20640.0,20433.0,20640.0,20640.0,20640.0,20640.0,20640
1,mean,-119.56970445736148,35.6318614341087,28.639486434108527,2635.7630813953488,537.8705525375618,1425.4767441860463,499.5396802325581,3.8706710029070246,206855.81690891477,
2,stddev,2.003531723502584,2.135952397457101,12.58555761211163,2181.6152515827944,421.3850700740312,1132.46212176534,382.3297528316098,1.899821717945263,115395.6158744136,
3,min,-124.35,32.54,1.0,2.0,1.0,3.0,1.0,0.4999,14999.0,<1H OCEAN
4,max,-114.31,41.95,52.0,39320.0,6445.0,35682.0,6082.0,15.0001,500001.0,NEAR OCEAN


# Предобработка данных

**Исследуем данные на наличие пропусков**

In [7]:
columns = df_housing.columns


for column in columns:
    print(column, df_housing.where(F.isnan(column) | F.col(column).isNull()).count())

longitude 0
latitude 0
housing_median_age 0
total_rooms 0
total_bedrooms 207
population 0
households 0
median_income 0
median_house_value 0
ocean_proximity 0


**Колонка total_bedrooms содержит 207 пропусков - это 1% от общего количества записей в таблице. Поэтому удалим строки, содержащие пропуски**

In [8]:
df_housing = df_housing.na.drop(how='any')

**Проверим, как сработало удаление**

In [9]:
columns = df_housing.columns


for column in columns:
    print(column, df_housing.where(F.isnan(column) | F.col(column).isNull()).count())

longitude 0
latitude 0
housing_median_age 0
total_rooms 0
total_bedrooms 0
population 0
households 0
median_income 0
median_house_value 0
ocean_proximity 0


**Разделим наш датасет на две части - выборку для обучения и выборку для тестирования качества модели. Для каждого разбиения будем использовать метод randomSplit().Выведем количество записей в выборках на экран при помощи метода count()**

In [10]:
train_data, test_data = df_housing.randomSplit([.8,.2], seed=RANDOM_SEED)
print(train_data.count(), test_data.count()) 

16247 4186


**Далее приступим к первому этапу — трансформации признаков.  
Преобразуем колонку с категориальными значениями с помощью базовых трансформеров признаков — StringIndexer и OneHotEncoder**

**Обучение модели StringIndexer на тренировочных данных**

In [11]:
indexer = StringIndexer(inputCol='ocean_proximity', outputCol='ocean_proximity_idx')
indexer_model = indexer.fit(train_data)

                                                                                

**Применение обученной модели на тренировочных данных**

In [12]:
train_data = indexer_model.transform(train_data)

**Применение обученной модели на тестовых данных**

In [13]:
test_data = indexer_model.transform(test_data)

**Обучение модели OneHotEncoder на преобразованных тренировочных данных**

In [14]:
encoder = OneHotEncoder(inputCols=['ocean_proximity_idx'], outputCols=['ocean_proximity_ohe'])
encoder_model = encoder.fit(train_data)

**Применение обученной модели OneHotEncoder на преобразованных тренировочных данных**

In [15]:
train_data = encoder_model.transform(train_data)

**Применение обученной модели OneHotEncoder на преобразованных тестовых данных**

In [16]:
test_data = encoder_model.transform(test_data)

In [20]:
train_data = train_data.drop('ocean_proximity')

In [21]:
test_data = test_data.drop('ocean_proximity')

In [22]:
train_data.toPandas().head()

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity_idx,ocean_proximity_ohe
0,-124.35,40.54,52.0,1820.0,300.0,806.0,270.0,3.0147,94600.0,2.0,"(0.0, 0.0, 1.0, 0.0)"
1,-124.3,41.8,19.0,2672.0,552.0,1298.0,478.0,1.9797,85800.0,2.0,"(0.0, 0.0, 1.0, 0.0)"
2,-124.27,40.69,36.0,2349.0,528.0,1194.0,465.0,2.5179,79000.0,2.0,"(0.0, 0.0, 1.0, 0.0)"
3,-124.26,40.58,52.0,2217.0,394.0,907.0,369.0,2.3571,111400.0,2.0,"(0.0, 0.0, 1.0, 0.0)"
4,-124.25,40.28,32.0,1430.0,419.0,434.0,187.0,1.9417,76100.0,2.0,"(0.0, 0.0, 1.0, 0.0)"


In [25]:
categorical_assembler = VectorAssembler(inputCols=["ocean_proximity_ohe"], outputCol="categorical_features")
train_data = categorical_assembler.transform(train_data)

In [27]:
test_data = categorical_assembler.transform(test_data)

**Трансформируем числовые признаки**

In [28]:
numerical_cols = ["longitude", "latitude", "housing_median_age", "total_rooms", "total_bedrooms", "population", "households", "median_income"]
numerical_assembler = VectorAssembler(inputCols=numerical_cols, outputCol="numerical_features")
train_data = numerical_assembler.transform(train_data)
test_data = numerical_assembler.transform(test_data)

In [29]:
standardScaler = StandardScaler(inputCol='numerical_features', outputCol="numerical_features_scaled")
scaler_model = standardScaler.fit(train_data)
train_data = scaler_model.transform(train_data)
test_data = scaler_model.transform(test_data)

**Далее соберем полученные категорийные и числовые признаки с помощью VectorAssembler**

In [31]:
all_features = ['categorical_features','numerical_features_scaled']

final_assembler = VectorAssembler(inputCols=all_features, outputCol="features") 
train_data = final_assembler.transform(train_data)

In [32]:
test_data = final_assembler.transform(test_data)

In [33]:
train_data.select(all_features).show(3) 

+--------------------+-------------------------+
|categorical_features|numerical_features_scaled|
+--------------------+-------------------------+
|       (4,[2],[1.0])|     [-62.014527297581...|
|       (4,[2],[1.0])|     [-61.989591822190...|
|       (4,[2],[1.0])|     [-61.974630536955...|
+--------------------+-------------------------+
only showing top 3 rows



# Обучение моделей

**Нам нужно построить две модели линейной регрессии на разных наборах данных**

**Первая модель линейной регрессии будет использовать все данные**

In [35]:
target = 'median_house_value'

In [36]:
lr_1 = LinearRegression(labelCol=target, featuresCol='features',regParam=0.0)

In [37]:
model = lr_1.fit(train_data) 

23/11/01 07:33:57 WARN Instrumentation: [cb766a4f] regParam is zero, which might cause numerical instability and overfitting.
23/11/01 07:33:58 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/11/01 07:33:58 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
23/11/01 07:33:58 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
23/11/01 07:33:58 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
                                                                                

**Сохраним трансформированную таблицу с колонкой предсказания первой модели в переменной predictions**

In [38]:
predictions = model.transform(test_data)

In [39]:
predictions_col = 'prediction'

**Теперь аналогично построим вторую модель, используя только числовые переменные, исключив категориальные**

In [40]:
lr_2 = LinearRegression(labelCol=target, featuresCol='numerical_features_scaled', regParam=0.0)

In [41]:
model2 = lr_2.fit(train_data) 

23/11/01 07:34:15 WARN Instrumentation: [8f589d7a] regParam is zero, which might cause numerical instability and overfitting.


**Сохраним трансформированную таблицу с колонкой предсказания первой модели в переменной predictions2**

In [42]:
predictions2 = model2.transform(test_data)

# Анализ результатов

**Для анализа результатов работы модели создадим объект класса RegressionEvaluator**

In [43]:
evaluator = RegressionEvaluator(predictionCol=predictions_col, labelCol = target)

**Сравним результаты работы линейной регрессии на двух наборах данных с помощью метрики RMSE**

**Для первой (учитывающей все признаки)**

In [44]:
evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})

69547.08641366745

**Для второй (учитывающей только количественные признаки)**

In [45]:
evaluator.evaluate(predictions2, {evaluator.metricName: "rmse"})

70338.98028310479

**Чем точнее модель, тем больше значение RMSE будет стремиться к нулю, следовательно первая модель точнее**

**Сравним результаты работы линейной регрессии на двух наборах данных с помощью метрики MAE**

**Для первой (учитывающей все признаки)**

In [46]:
evaluator.evaluate(predictions, {evaluator.metricName: "mae"})

50655.72889279407

**Для второй (учитывающей только количественные признаки)**

In [47]:
evaluator.evaluate(predictions2, {evaluator.metricName: "mae"})

51552.859235811164

**Для модели, которая идеально работает, MAE будет равен 0. Поэтому, чем меньше будет значение этой метрики, тем точнее модель, следовательно первая модель точнее**

**Сравним результаты работы линейной регрессии на двух наборах данных с помощью метрики R2**

**Для первой (учитывающей все признаки)**

In [48]:
evaluator.evaluate(predictions, {evaluator.metricName: "r2"})

0.6389097095366882

**Для второй (учитывающей только количественные признаки)**

In [49]:
evaluator.evaluate(predictions2, {evaluator.metricName: "r2"})

0.6306398264599548

**R2 равно единице, когда модель идеально предсказывает результат, а 0 - когда работает также, как и среднее. Соответственно, чем выше R2, тем точнее модель, следовательно первая модель точнее**

# Вывод

В рамках данной работы мы:    

Инициализировали локальную Spark-сессию;    
Провели предобработку данных, используя методы pySpark;    
Исследовали данные на наличие пропусков и избавились от них;    
Преобразовали колонку с категориальными значениями техникой One hot encoding;    
Провели стандартизацию количественных признаков;    
Построив две модели линейной регрессии на разных наборах данных:  

используя все данные из файла;  

используя только числовые переменные, исключив категориальные.  

Выяснили, что согласно всем трем метрикам RMSE, MAE и R2 точнее работает первая модель, использующая все данные из файла.