# Прогнозирование стоимости жилья в жилом массиве


***Описание целей и задач проекта***

В проекте нужно обучить модель линейной регрессии на данных о жилье в Калифорнии в 1990 году. На основе данных необходимо предсказать медианную стоимость дома в жилом массиве. Обучите модель и сделайте предсказания на тестовой выборке. Для оценки качества модели используйте метрики RMSE, MAE и R2.


***Описание полей данных***

В колонках датасета содержатся следующие данные:

 - housing_median_age — медианный возраст жителей жилого массива;
 - total_rooms — общее количество комнат в домах жилого массива;
 - total_bedrooms — общее количество спален в домах жилого массива;
 - population — количество человек, которые проживают в жилом массиве;
 - households — количество домовладений в жилом массиве;
 - median_income — медианный доход жителей жилого массива;
 - median_house_value — медианная стоимость дома в жилом массиве;
 - ocean_proximity — близость к океану.

***План работы***

1. Инициализация локальную Spark-сессию.
2. Прочитать содержимое файла /datasets/housing.csv и вывести типы данных колонок датасета, используя методы pySpark.
3. Выполнить предобработку данных: 
 - исследовать  данные на наличие пропусков и заполнить их;
 - преобразовать колонку с категориальными значениями техникой One hot encoding.
4.построить две модели линейной регрессии на разных наборах данных: 
 - 1) используя все данные из файла 
  - 2)используя только числовые переменные, исключив категориальные.
5. Сделать предсказание на тестовой выборке.
5. Оценить качество модели с помощью оценщика LinearRegression из библиотеки MLlib, используйте метрики RMSE, MAE и R2

# Подготовка данных

Тут будут все импорты необходимых библиотек

In [1]:
import pandas as pd 
import numpy as np

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F

from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

from pyspark.ml.regression import LinearRegression

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import pyspark.sql.functions as sf

from pyspark.ml import Pipeline

from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

import seaborn as sns

In [2]:
from pyspark.sql.types import DoubleType, IntegerType, StringType
pyspark_version = pyspark.__version__
if int(pyspark_version[:1]) == 3:
    from pyspark.ml.feature import OneHotEncoder    
elif int(pyspark_version[:1]) == 2:
    from pyspark.ml.feature import OneHotEncodeEstimator
    
RANDOM_SEED = 2022

#загрузка файла для работы
spark = SparkSession.builder \
                    .master("local") \
                    .appName("california_housing_w_features") \
                    .getOrCreate()

df = spark.read.option('header', 'true').csv('/datasets/housing.csv', inferSchema = True) 

                                                                                

In [3]:
df = spark.read.option('header', 'true').csv('/datasets/housing.csv', inferSchema = True)


In [4]:
df.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

 Выведем типы данных колонок датасета и посмотрим есть ли пропуски в данных

In [5]:
df.printSchema() 

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



'Nullable = true' означает, что в данных есть пропуски, их необходимо будет заполнить.

In [6]:
#посчитаем количество строк в датафрейме
df.count()

20640

In [7]:
#удалим полные дубликаты и посчитаем сколько строк останется
df.dropDuplicates().count()

                                                                                

20640

Как показала проверка, дубликтов в датафрейме нет.

***Обработка данных с пропусками***

In [8]:
# выведите базовые статистики
df.describe().toPandas()

                                                                                

Unnamed: 0,summary,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,count,20640.0,20640.0,20640.0,20640.0,20433.0,20640.0,20640.0,20640.0,20640.0,20640
1,mean,-119.56970445736148,35.6318614341087,28.639486434108527,2635.7630813953488,537.8705525375618,1425.4767441860463,499.5396802325581,3.8706710029070246,206855.81690891477,
2,stddev,2.003531723502584,2.135952397457101,12.58555761211163,2181.6152515827944,421.3850700740312,1132.46212176534,382.3297528316098,1.899821717945263,115395.6158744136,
3,min,-124.35,32.54,1.0,2.0,1.0,3.0,1.0,0.4999,14999.0,<1H OCEAN
4,max,-114.31,41.95,52.0,39320.0,6445.0,35682.0,6082.0,15.0001,500001.0,NEAR OCEAN


In [9]:
# выведите пропущенные значения в каждой колонке
columns = df.columns


for column in columns:
    print(column, df.where(F.isnan(column) | F.col(column).isNull()).count())

longitude 0
latitude 0
housing_median_age 0
total_rooms 0
total_bedrooms 207
population 0
households 0
median_income 0
median_house_value 0
ocean_proximity 0


Есть 207 пропцущенных значений по колонке total_bedrooms.
Заменим их на нули, из предполодения, что если даннеые не заполнены, значит отдельного спального места нет. 

In [10]:
#Есть 207 пропцущенных значений по колонке total_bedrooms.
#Это немного, значит можно заполнить их нулями.

#удаление колонок с пропусками
#df = df.na.drop(how='any')

#заполнение колонок с пропусками
df = df.fillna(value=0)
df.printSchema()

#проверка результата

columns = df.columns


for column in columns:
    print(column, df.where(F.isnan(column) | F.col(column).isNull()).count())

root
 |-- longitude: double (nullable = false)
 |-- latitude: double (nullable = false)
 |-- housing_median_age: double (nullable = false)
 |-- total_rooms: double (nullable = false)
 |-- total_bedrooms: double (nullable = false)
 |-- population: double (nullable = false)
 |-- households: double (nullable = false)
 |-- median_income: double (nullable = false)
 |-- median_house_value: double (nullable = false)
 |-- ocean_proximity: string (nullable = true)

longitude 0
latitude 0
housing_median_age 0
total_rooms 0
total_bedrooms 0
population 0
households 0
median_income 0
median_house_value 0
ocean_proximity 0


Теперь пропущенных значений нет.

***Разделим колонки на три типа: числовые, категориальные и целевой***

In [11]:
categorical_cols = ['ocean_proximity']
numerical_cols  = ['longitude', 'latitude', 'housing_median_age',"population",'households', "median_income", 'total_rooms', 'total_bedrooms']
target = "median_house_value"

***Раздение данных на обучающую и тренировочную выборки. Выделим целевой признак***

In [12]:
data = df
#разделим данные на выборки
train_data, test_data = data.randomSplit([.8,.2], seed=RANDOM_SEED)

# выделим целевые признаким в отдельные переменные
#train_target = train_data['median_house_value']
#test_target = test_data['median_house_value']

#удалим цеелвые признаким из выборок
#train_data = train_data.drop('median_house_value')
#test_data = test_data.drop('median_house_value')

print('выборки:', train_data.count(), test_data.count())


выборки: 16418 4222


In [13]:
print(train_data.show(5))

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -124.35|   40.54|              52.0|     1820.0|         300.0|     806.0|     270.0|       3.0147|           94600.0|     NEAR OCEAN|
|   -124.3|    41.8|              19.0|     2672.0|         552.0|    1298.0|     478.0|       1.9797|           85800.0|     NEAR OCEAN|
|  -124.27|   40.69|              36.0|     2349.0|         528.0|    1194.0|     465.0|       2.5179|           79000.0|     NEAR OCEAN|
|  -124.26|   40.58|              52.0|     2217.0|         394.0|     907.0|     369.0|       2.3571|          111400.0|     NEAR OCEAN|
|  -124.25|   40.28|              

***Преобразование колонок с категориальными и количественными значениями, их сбор VectorAssembler при использованием Pipeline***

In [14]:

#value - это предсказываемая переменная 
stages = []
#label_stringIdx = StringIndexer(inputCol = 'median_house_value', outputCol = 'label', handleInvalid = 'keep')
#stages += [label_stringIdx]

#зависит от категориаьных колонок: страны и категории загрязнения
categoricalColumns = ['ocean_proximity']
for categoricalCol in categoricalColumns:
    #преобразовываем строковые колонки в спарковские категориальные строки
    stringIndexer = StringIndexer(inputCol = categoricalCol,
                                  outputCol = categoricalCol + 'Index',
                                  handleInvalid = 'keep')
    #преобразовываем категориальные колонки в бинарные (числовые) вектора благодаря строковому преобразователю
    encoder = OneHotEncoder(inputCol=stringIndexer.getOutputCol(),
                            outputCol=categoricalCol + "classVec")
    

    stages += [stringIndexer, encoder]
       
    
#зависит от численной колонки: 
numericColumns = ['longitude', 'latitude', 'housing_median_age',"population",'households', "median_income", 'total_rooms', 'total_bedrooms', ]

for numericol in categoricalColumns:
    numerical_assembler =  VectorAssembler(inputCols=numerical_cols,outputCol="numerical_features")
    standardScaler = StandardScaler(inputCol='numerical_features', outputCol="numerical_features_scaled")
    
    #
    stages += [numerical_assembler, standardScaler]


all_features = [categoricalCol + "classVec",'numerical_features_scaled']
final_assembler = VectorAssembler(inputCols=all_features, 
                                  outputCol="features") 

stages += [final_assembler]


***ДАТАСЕТ ДЛЯ 2-ой МОДЕЛИ, ГДЕ ВСЕ КАТЕГОРИАЛЬНЫЕ ДАННЫЕ УДАЛЕНЫ***

***Удалим категориальные переменные из датасета***

In [24]:
#создадим новый датасет, кторый в дальнейшем будем преобразовывать
df2=df

#Разделим колонки на три типа: числовые и категориальные данные и целевой***
categorical_cols2 = ['ocean_proximity']
numerical_cols2  = ['longitude', 'latitude', 'housing_median_age',"population",'households', "median_income", 'total_rooms', 'total_bedrooms', ]
target2 = "median_house_value"



#удалим колонки с категориальными данными
exclude = ['ocean_proximity']
selected_columns = [col for col in df2.columns if col not in exclude]
df2= df2.select(selected_columns)

***Раздение данных на обучающую и тренировочную выборки для 2-ой модели/***

In [25]:
data2 = df2
#разделим данные на выборки
train_data2, test_data2 = data2.randomSplit([.8,.2], seed=RANDOM_SEED)

print(train_data2.count(), test_data2.count())

16418 4222


***Трансформация числовых признаков***

In [26]:
#тренировочная выборка
numerical_assembler2 = VectorAssembler(inputCols=numerical_cols, outputCol="numerical_features2")
train_data2 = numerical_assembler2.transform(train_data2)
standardScaler2 = StandardScaler(inputCol='numerical_features2', outputCol="features2")
standardScaler2_trained = standardScaler2.fit(train_data2)
train_data2 = standardScaler2_trained.transform(train_data2)

#тестовая выборка
numerical_assembler2 = VectorAssembler(inputCols=numerical_cols, outputCol="numerical_features2")
test_data2 = numerical_assembler2.transform(test_data2)
standardScaler2 = StandardScaler(inputCol='numerical_features2', outputCol="numerical_features_scaled_test2")
test_data2 = standardScaler2_trained.transform(test_data2)
#test_data2 = standardScaler2.fit(test_data2).transform(test_data2)

                                                                                

In [27]:
print(test_data2.columns)

['longitude', 'latitude', 'housing_median_age', 'total_rooms', 'total_bedrooms', 'population', 'households', 'median_income', 'median_house_value', 'numerical_features2', 'features2']


# Обучение моделей

***1. Построим модель линейной регрессии  №1, где используютсявсе данные из файла***

In [28]:
#обучим модель первую модель

lr = LinearRegression(featuresCol='features', labelCol ='median_house_value', maxIter=10, regParam=0.3, elasticNetParam=0.8) 
stages += [lr]

# задаем план stages для обучения модели 
pipeline = Pipeline(stages=stages)

# тренируем модель
model = pipeline.fit(train_data)

# делаем предсказания на тестовой выборке
predictions = model.transform(test_data)

23/04/22 06:40:13 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/04/22 06:40:13 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

In [29]:
#предсказание результатов

predictions = model.transform(test_data)

predictedLabes = predictions.select("median_house_value", "prediction")
predictedLabes.show(5)

+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|          103600.0| 183418.1087200842|
|           50800.0|232881.09614893887|
|           58100.0|  158878.409502991|
|           68400.0|162131.23710889602|
|           72200.0|180961.47946255258|
+------------------+------------------+
only showing top 5 rows



Оценка качества модели с помощью метрик RMSE, MAE и R2

Root Mean Squared Error (RMSE) - метрика, которая сообщает нам квадратный корень из средней квадратичной разницы между прогнозируемыми значениями и фактическими значениями в наборе данных. Чем ниже RMSE, тем лучше модель соответствует набору данных.

Mean Absolute Error (MAE) - показывает среднюю абсолютную разницу между прогнозируемыми значениями и фактическими значениями в наборе данных. Чем ниже MAE, тем лучше модель соответствует набору данных.

Coefficient of Determination (R2) - то величина изменения в выходном зависимом атрибуте, которая предсказуема из входных независимых переменных. Он используется для проверки того, насколько хорошо наблюдаемые результаты воспроизводятся моделью, в зависимости от отношения общего отклонения результатов, описанных моделью.


In [30]:
from pyspark.ml.evaluation import RegressionEvaluator

mae = RegressionEvaluator(labelCol='median_house_value', metricName="mae").evaluate(predictions)
rmse = RegressionEvaluator(labelCol='median_house_value', metricName="rmse").evaluate(predictions)
r2 = RegressionEvaluator(labelCol='median_house_value', metricName="r2").evaluate(predictions)

print("MAE = ", mae)
print("RMSE = ", rmse)
print("R2 = ", r2)

MAE =  50189.496556608065
RMSE =  69064.60679110579
R2 =  0.6476871478026036


***2.Построим модель линейной регрессии №2, где только числовые переменные,а категориальные удалены.***

***Раздение данных на обучающую и тренировочную выборки***

In [31]:
#обучим вторую модель

lr2 = LinearRegression(featuresCol='features2', labelCol ='median_house_value', maxIter=10, regParam=0.3, elasticNetParam=0.8) 

model2 = lr2.fit(train_data2)

In [32]:
#предсказание результатов

predictions2 = model2.transform(test_data2)

predictedLabes2 = predictions2.select("median_house_value", "prediction")
predictedLabes2.show(5)




+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|          103600.0|101536.76721770782|
|           50800.0|185324.07948330836|
|           58100.0|109687.58200852666|
|           68400.0| 79232.28617719933|
|           72200.0|129832.98257200373|
+------------------+------------------+
only showing top 5 rows



In [33]:
#посчитаем метрики качества второй модели

mae = RegressionEvaluator(labelCol='median_house_value', metricName="mae").evaluate(predictions2)
rmse = RegressionEvaluator(labelCol='median_house_value', metricName="rmse").evaluate(predictions2)
r2 = RegressionEvaluator(labelCol='median_house_value', metricName="r2").evaluate(predictions2)

print("MAE = ", mae)
print("RMSE = ", rmse)
print("R2 = ", r2)

MAE =  51018.89584708491
RMSE =  69268.19359646735
R2 =  0.6456070095736492


In [34]:
#осановить сессию
spark.stop()

# Анализ результатов

Представленный датасет с данными о жилье в Калифорнии в 1990 году был предобработан - все пропущенные значения были заменены на нули.

Далее мы обучили модели линейной регрессии на разных наборах данных:

- 1) Первая модель: на всей данных из датасета, при  этом проведя трансформацию и закодировав качественные признаки, а также трансформируя количественные признаки. Далее разбили данные на обучающую и тестовую выборки. Обучили модели на тренировочной выборке и сделали предсказание на тестовой, после чего проверили оценили качество модели используйте метрики RMSE, MAE и R2
- 2) Вторая модель: на данных того же датасета, но исключив качественные призники и обучение проведи только на количественных, предварительно их трансформиру. И проверили снова теже метрики.

ПО 1 модели были получены следующие метрики:
 - MAE =  50189.496556608065
 - RMSE =  69064.60679110579
 - R2 =  0.6476871478026036
 
Метрики по 2 модели:
 - MAE =  51018.89584708491
 - RMSE =  69268.19359646735
 - R2 =  0.6456070095736492
 
Как мы видим метрики MAE и RMSE по первой модели лучше, потому что они ниже. Что означает меньшу разницу между прогнозируемыми и реальными занчениями по median_house_value — медианной стоимости дома в жилом массиве.
Метрика R2 по первой модели  на 0.002 пункта выше, что лучше, так как это озачает, что наблюдения модели, как за качественными, так за количественными признаками немного точнее воспроизводятся модель.

По всем метриками выходит, что первая модель лучше, чем вторая  предсказывает медианную стоимости дома в жилом массиве.
