# Предсказание стоимости жилья

В проекте вам нужно обучить модель линейной регрессии на данных о жилье в Калифорнии в 1990 году. На основе данных нужно предсказать медианную стоимость дома в жилом массиве. Обучите модель и сделайте предсказания на тестовой выборке. Для оценки качества модели используйте метрики RMSE, MAE и R2.

## Загрузка библиотек

In [1]:
%%time
%%capture

import numpy as np 
import pandas as pd
import gc
# Сброс ограничений на количество выводимых рядов
pd.set_option('display.max_rows', 2)
 # Сброс ограничений на число столбцов
pd.set_option('display.max_columns', None)
 # Сброс ограничений на количество символов в записи
# pd.set_option('display.max_colwidth', None)
# При записи изменённых признаков в исходный датафрейм код может вызывать предупреждение SettingWithCopy.
# Причина в особенности поведения sklearn и pandas. 
# Чтобы предупреждение не появлялось, в код добавляют строчку:
pd.options.mode.chained_assignment = None

import matplotlib.pyplot as plt
%matplotlib inline
import math
from math import factorial, exp
from scipy import stats as st
import seaborn as sns
import warnings

from scipy.stats import binom #помогает получить вероятности биномиального распределения
from scipy.stats import poisson #значения функции распределения для распределения Пуассона

from ipywidgets import IntProgress
from IPython.display import display
import time
import datetime
# Помогаторные функции
def кп(текст): #красный подчеркнутый
    return f"\033[31m\033[4m{текст}\033[0m"
def сп(текст): #синий подчеркнутый
    return f"\033[34m\033[4m{текст}\033[0m"
def з(текст): #синий подчеркнутый
    return f"\033[32m{текст}\033[0m"
pd.set_option('display.float_format', '{:.2f}'.format)
def print_df(df):
    pd.set_option('display.max_rows', None)
    display(df)
    pd.set_option('display.max_rows', 2)
    
    
def t():
    return datetime.datetime.now()

import random
def цвет():
    return f'rgb({random.randrange(255)},{random.randrange(255)},{random.randrange(255)})'

def info_df(df):
    print(f'запуск проверки {сп(t())}, на моент запуска {сп(len(df))} строк')
    дубликатов = df.duplicated().sum()
    if дубликатов >= 1:
        print(кп('Найдены дубликаты:'))
        print(df.value_counts())
        print(f'Нужно удалить {кп(дубликатов)} дублей')
    строк = df.reset_index()['index'].count()
    print(з(df.info()))
    display(df.describe())    
    if строк>3:
        display(df.sample(3))
    else:
        print(f"Набор данных содержит {кп(строк)} строк. Вывожу весь набор данных")
        display(df.sample(строк))

def f1():
    print(f'''
{сп('1. Функции вызова')}
    1.1 {з('print_df(df)')} - выводит набор данных полностью
    1.5 {з('t()')} - выводит текущую дату и время
    1.6 {з('цвет()')} - выводит цветовую палитру, пример rgb(7,40,89)
{сп('2. Визуализация данных')}
    2.1 {з('info_df(df):')} - выводит информацию о данных (.info()), проверяет наличие дублей (.duplicated().sum()), если они есть, то предупреждает о них, описывает числительные данные (.describe()).
    2.2 {з('диаграмма(df, target, col, x, name):')} - выводит диаграмму, target - значение по оси ординат, x - значение по оси абсцисс, col - поле содержащее информацию о легенде, name -наименование графика.
    2.3 {з('boxplot(df, target, col, name):')} - выводит boxplot, target - значение по оси ординат, x - значение по оси абсцисс, col - поле содержащее информацию о легенде, name -наименование графика.
    2.4 {з('линия(df, target, x, name='', legend=0, col=''):')} - выводит линейный график, target - значение по оси ординат, x - значение по оси абсцисс, col - поле содержащее информацию о легенде, name -наименование графика. legend=0 означает, что легенда на графике не нужна, а значит не нужно указывать параметр col
    2.5 {з('гистограмма(df, target, col, name):')} - выводит гистограмму, target - значение по оси ординат, x - значение по оси абсцисс, col - поле содержащее информацию о легенде, name -наименование графика.
    2.6 {з('кп(текст)')} - цвет красный подчеркнутый
    2.7 {з('сп(текст)')} - цвет синий подчеркнутый
    2.8 {з('з(текст)')} - цвет зеленый
    ''')
# библиотеки PySpark

import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.types as pst
import pyspark.sql.functions as psf
from pyspark.sql.functions import col, lit

spark = (
    SparkSession.builder 
    .master("local") 
    .appName("Sample Regression") 
    .getOrCreate()
)


from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
# from pyspark.ml.classification import LogisticRegression
#Модели
from pyspark.ml.regression import LinearRegression
# Оценка качества
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
# Метрики регрессии
from pyspark.mllib.evaluation import RegressionMetrics

from pyspark.mllib.recommendation import ALS, Rating

pyspark_version = pyspark.__version__
if int(pyspark_version[:1]) == 3:
    from pyspark.ml.feature import OneHotEncoder    
elif int(pyspark_version[:1]) == 2:
    from pyspark.ml.feature import OneHotEncodeEstimator

print(f'spark session id: {spark.sparkContext.applicationId}')






def зкште(trt):
    return print(trt)

CPU times: user 1.02 s, sys: 319 ms, total: 1.34 s
Wall time: 17.3 s


In [2]:
print(f'spark session id: {spark.sparkContext.applicationId}')

spark session id: local-1707544729981


## Подготовка данных

In [3]:
start_df_info = {'longitude':'широта'
, 'latitude':'долгота'
, 'housing_median_age':'медианный возраст жителей жилого массива'
, 'total_rooms':'общее количество комнат в домах жилого массива'
, 'total_bedrooms':'общее количество спален в домах жилого массива'
, 'population':'количество человек, которые проживают в жилом массиве'
, 'households':'количество домовладений в жилом массиве'
, 'median_income':'медианный доход жителей жилого массива'
, 'median_house_value':'медианная стоимость дома в жилом массиве'
, 'ocean_proximity':'близость к океану'}

### Прочитайте содержимое файла /datasets/housing.csv.

In [4]:
start_df = spark.read.option('header', 'true').csv('/datasets/housing.csv', inferSchema = True)
start_df.limit(3).toPandas()

                                                                                

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,-122.23,37.88,41.00,880.00,129.00,322.00,126.00,8.33,452600.00,NEAR BAY
...,...,...,...,...,...,...,...,...,...,...
2,-122.24,37.85,52.00,1467.00,190.00,496.00,177.00,7.26,352100.00,NEAR BAY


### Выведите типы данных колонок датасета. Используйте методы pySpark.

In [5]:
start_df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



In [6]:
# for i in start_df.limit(1).toPandas().columns:
#     print(", '"+i+"'")

In [7]:
#Разделяю колонки на два типа: числовые и текстовые, которые представляют категориальные данные.
categorical_cols = ['ocean_proximity']
numerical_cols  = [
    'longitude'
, 'latitude'
, 'housing_median_age'
, 'total_rooms'
, 'total_bedrooms'
, 'population'
, 'households'
, 'median_income'
# , 'median_house_value'
]

target = "median_house_value"

### Выполните предобработку данных:

#### Исследуйте данные на наличие пропусков и заполните их, выбрав значения по своему усмотрению

In [8]:
print_df(start_df.describe().toPandas())

                                                                                

Unnamed: 0,summary,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,count,20640.0,20640.0,20640.0,20640.0,20433.0,20640.0,20640.0,20640.0,20640.0,20640
1,mean,-119.56970445736148,35.6318614341087,28.639486434108527,2635.7630813953488,537.8705525375618,1425.4767441860463,499.5396802325581,3.8706710029070246,206855.81690891477,
2,stddev,2.003531723502584,2.135952397457101,12.58555761211163,2181.6152515827944,421.3850700740312,1132.46212176534,382.3297528316098,1.899821717945263,115395.6158744136,
3,min,-124.35,32.54,1.0,2.0,1.0,3.0,1.0,0.4999,14999.0,<1H OCEAN
4,max,-114.31,41.95,52.0,39320.0,6445.0,35682.0,6082.0,15.0001,500001.0,NEAR OCEAN


In [9]:
# Пустоты не знаю чем заменить, любая замена может плохо повлиять на результат, поэтому просто исключу такие значения
# print_df(df.fillna(-1).describe().toPandas())
start_df = start_df.na.drop(subset=numerical_cols)
print_df(start_df.describe().toPandas())

                                                                                

Unnamed: 0,summary,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,count,20433.0,20433.0,20433.0,20433.0,20433.0,20433.0,20433.0,20433.0,20433.0,20433
1,mean,-119.57068859198068,35.63322125972706,28.633093525179856,2636.5042333480155,537.8705525375618,1424.9469485635982,499.43346547252,3.8711616013312273,206864.4131551901,
2,stddev,2.003577890751096,2.1363476663779872,12.591805202182837,2185.269566977601,421.3850700740312,1133.2084897449597,382.2992258828481,1.899291249306247,115435.66709858322,
3,min,-124.35,32.54,1.0,2.0,1.0,3.0,1.0,0.4999,14999.0,<1H OCEAN
4,max,-114.31,41.95,52.0,39320.0,6445.0,35682.0,6082.0,15.0001,500001.0,NEAR OCEAN


In [10]:
print(f'При удалении пустот потери составили {20640-20433}, {round(((20640-20433)/20640)*100, 2)}%')

При удалении пустот потери составили 207, 1.0%


### Разделение на выборки



In [11]:
# train_data, test_data, valid_data = df.randomSplit([.7,.2, .1], seed=12345)
train_data, test_data = start_df.randomSplit([.8,.2], seed=12345)
print(train_data.count(), test_data.count())#, valid_data.count())

                                                                                

16274 4159


#### Преобразуйте колонку с категориальными значениями техникой One hot encoding

In [12]:
indexer = StringIndexer(inputCols=categorical_cols, 
                        outputCols=[c+'_idx' for c in categorical_cols]) 
df = indexer.fit(train_data).transform(train_data)

cols = [c for c in df.columns for i in categorical_cols if (c.startswith(i))]
print(df.select(cols).show(1))

encoder = OneHotEncoder(inputCols=[c+'_idx' for c in categorical_cols],
                        outputCols=[c+'_ohe' for c in categorical_cols])
df = (
    encoder
    .fit(df)
    .transform(df)
    .drop(col('ocean_proximity'))
    .drop(col('ocean_proximity_idx'))
)

cols = [c for c in df.columns for i in categorical_cols if (c.startswith(i))]
print(df.select(cols).show(1))



                                                                                

+---------------+-------------------+
|ocean_proximity|ocean_proximity_idx|
+---------------+-------------------+
|     NEAR OCEAN|                2.0|
+---------------+-------------------+
only showing top 1 row

None
+-------------------+
|ocean_proximity_ohe|
+-------------------+
|      (4,[2],[1.0])|
+-------------------+
only showing top 1 row

None


In [13]:
# test_data
test_df = indexer.fit(test_data).transform(test_data)

cols = [c for c in test_df.columns for i in categorical_cols if (c.startswith(i))]
print(test_df.select(cols).show(1))
test_df = (
    encoder
    .fit(test_df)
    .transform(test_df)
    .drop(col('ocean_proximity'))
    .drop(col('ocean_proximity_idx'))
)
cols = [c for c in test_df.columns for i in categorical_cols if (c.startswith(i))]
print(test_df.select(cols).show(1))



+---------------+-------------------+
|ocean_proximity|ocean_proximity_idx|
+---------------+-------------------+
|     NEAR OCEAN|                2.0|
+---------------+-------------------+
only showing top 1 row

None
+-------------------+
|ocean_proximity_ohe|
+-------------------+
|      (4,[2],[1.0])|
+-------------------+
only showing top 1 row

None


In [14]:
numerical_assembler = VectorAssembler(inputCols=numerical_cols,
 outputCol="numerical_features")
df = numerical_assembler.transform(df)

standardScaler = StandardScaler(inputCol='numerical_features',
 outputCol="numerical_features_scaled")
df = (
    standardScaler
    .fit(df)
    .transform(df)
)
print_df(df.limit(3).toPandas())

all_features = ['categorical_features','numerical_features_scaled']

categorical_assembler = VectorAssembler(
    inputCols=[c+'_ohe' for c in categorical_cols]
    , outputCol="categorical_features")
df = categorical_assembler.transform(df)
print_df(df.select(all_features).limit(3).toPandas())



final_assembler = VectorAssembler(inputCols=all_features, 
                                  outputCol="features") 
df = final_assembler.transform(df)


                                                                                

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity_ohe,numerical_features,numerical_features_scaled
0,-124.35,40.54,52.0,1820.0,300.0,806.0,270.0,3.01,94600.0,"(0.0, 0.0, 1.0, 0.0)","[-124.35, 40.54, 52.0, 1820.0, 300.0, 806.0, 2...","[-61.96187477252211, 18.921000446195844, 4.122..."
1,-124.3,41.8,19.0,2672.0,552.0,1298.0,478.0,1.98,85800.0,"(0.0, 0.0, 1.0, 0.0)","[-124.3, 41.8, 19.0, 2672.0, 552.0, 1298.0, 47...","[-61.93696046823079, 19.50907298103074, 1.5064..."
2,-124.3,41.84,17.0,2677.0,531.0,1244.0,456.0,3.03,103600.0,"(0.0, 0.0, 1.0, 0.0)","[-124.3, 41.84, 17.0, 2677.0, 531.0, 1244.0, 4...","[-61.93696046823079, 19.527741950390585, 1.347..."


Unnamed: 0,categorical_features,numerical_features_scaled
0,"(0.0, 0.0, 1.0, 0.0)","[-61.96187477252211, 18.921000446195844, 4.122..."
1,"(0.0, 0.0, 1.0, 0.0)","[-61.93696046823079, 19.50907298103074, 1.5064..."
2,"(0.0, 0.0, 1.0, 0.0)","[-61.93696046823079, 19.527741950390585, 1.347..."


In [15]:
# print_df(df.limit(2).toPandas().T)

In [16]:
# test_df
test_df = numerical_assembler.transform(test_df)

test_df = (
    standardScaler
    .fit(test_df)
    .transform(test_df)
)
print_df(test_df.limit(3).toPandas())

test_df = categorical_assembler.transform(test_df)
print_df(test_df.select(all_features).limit(3).toPandas())

test_df = final_assembler.transform(test_df)


Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity_ohe,numerical_features,numerical_features_scaled
0,-124.23,40.54,52.0,2694.0,453.0,1152.0,435.0,3.08,106700.0,"(0.0, 0.0, 1.0, 0.0)","[-124.23, 40.54, 52.0, 2694.0, 453.0, 1152.0, ...","[-62.41294758890141, 19.199106084750266, 4.155..."
1,-124.17,40.74,17.0,2026.0,338.0,873.0,313.0,4.04,128900.0,"(0.0, 0.0, 1.0, 0.0)","[-124.17, 40.74, 17.0, 2026.0, 338.0, 873.0, 3...","[-62.38280368762689, 19.29382293765974, 1.3585..."
2,-124.17,40.75,13.0,2171.0,339.0,951.0,353.0,4.85,116100.0,"(0.0, 0.0, 1.0, 0.0)","[-124.17, 40.75, 13.0, 2171.0, 339.0, 951.0, 3...","[-62.38280368762689, 19.298558780305214, 1.038..."


Unnamed: 0,categorical_features,numerical_features_scaled
0,"(0.0, 0.0, 1.0, 0.0)","[-62.41294758890141, 19.199106084750266, 4.155..."
1,"(0.0, 0.0, 1.0, 0.0)","[-62.38280368762689, 19.29382293765974, 1.3585..."
2,"(0.0, 0.0, 1.0, 0.0)","[-62.38280368762689, 19.298558780305214, 1.038..."


In [17]:
# for i in df.limit(1).toPandas().columns:
#     print(", col('"+i+"')")

## Обучение моделей

Постройте две модели линейной регрессии на разных наборах данных:
используя все данные из файла;
используя только числовые переменные, исключив категориальные.
Для построения модели используйте оценщик LinearRegression из библиотеки MLlib.

### Обучение модели

#### обученную на всех данных

In [18]:
%%time
lr = LinearRegression(labelCol=target, featuresCol='features')
model = lr.fit(df)

predictions = model.transform(test_df)
predictedLabes = predictions.select(target, "prediction")
predictedLabes.show()

24/02/10 05:59:17 WARN Instrumentation: [b4e9f3c9] regParam is zero, which might cause numerical instability and overfitting.
24/02/10 05:59:17 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
24/02/10 05:59:17 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
24/02/10 05:59:18 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
24/02/10 05:59:18 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
                                                                                

+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|          106700.0| 222057.7088553328|
|          128900.0|211618.79102734523|
|          116100.0| 236739.8270487073|
|           70500.0| 167155.2706735488|
|           85600.0|192139.88636508724|
|           75500.0| 141206.7277661264|
|           79600.0|167359.86729453504|
|           92800.0|212807.39938381873|
|           75000.0|118920.36473653745|
|           74100.0|139593.38552329363|
|           99600.0|154472.65722586727|
|          143000.0| 205976.8782780082|
|          133900.0|205833.31734374678|
|           70700.0|132758.05619673664|
|           71100.0|  123210.330452919|
|          106300.0|185142.89142902708|
|          150000.0|163025.88386321533|
|          145800.0|144699.46289487835|
|           76800.0|129211.55409948528|
|          277000.0|209651.34244175814|
+------------------+------------------+
only showing top 20 rows

CPU times: use

#### только на числовых признаках

In [19]:
lr_nall = LinearRegression(labelCol=target, featuresCol='numerical_features_scaled')
model_nall = lr_nall.fit(df)

predictions_nall = model_nall.transform(test_df)
predictedLabes_nall = predictions_nall.select(target, "prediction")
predictedLabes_nall.show()

24/02/10 05:59:20 WARN Instrumentation: [36521630] regParam is zero, which might cause numerical instability and overfitting.


+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|          106700.0|204400.87696393346|
|          128900.0|187795.65599645954|
|          116100.0| 212850.4290227187|
|           70500.0|143596.49410904152|
|           85600.0| 168831.4398709694|
|           75500.0|112353.94860424288|
|           79600.0|146466.32176567102|
|           92800.0|188876.66543677775|
|           75000.0| 91700.44832249358|
|           74100.0|112687.07235120377|
|           99600.0|125538.15461986978|
|          143000.0| 179332.1410093815|
|          133900.0|181646.55299546244|
|           70700.0|117263.18308426253|
|           71100.0| 82016.63810589304|
|          106300.0| 157532.6287729172|
|          150000.0|158560.50940309325|
|          145800.0|130012.94328301493|
|           76800.0|112580.72161014937|
|          277000.0|206014.30507776607|
+------------------+------------------+
only showing top 20 rows



## Анализ результатов

Сравните результаты работы линейной регрессии на двух наборах данных по метрикам RMSE, MAE и R2. Сделайте выводы.

In [20]:
results_collect = predictedLabes.collect()
results_list = [ (float(i[0]), float(i[1])) for i in results_collect]
scoreAndLabels = spark.sparkContext.parallelize(results_list)

metrics = RegressionMetrics(scoreAndLabels)
# Root mean squared error
print("RMSE = %s" % metrics.rootMeanSquaredError)
# R-squared
print("R-squared = %s" % metrics.r2)
print(f"MAE = {metrics.meanAbsoluteError}")

                                                                                

RMSE = 68801.32383046365
R-squared = 0.4377666800100797
MAE = 51836.38324793676


Модель очень плохая, R2 меншье 0.5, большие разбросы.

In [21]:
results_collect_nall = predictedLabes_nall.collect()
results_list_nall = [ (float(i[0]), float(i[1])) for i in results_collect_nall]
scoreAndLabels_nall = spark.sparkContext.parallelize(results_list_nall)

metrics_nall = RegressionMetrics(scoreAndLabels_nall)
# Root mean squared error
print("RMSE = %s" % metrics_nall.rootMeanSquaredError)
# R-squared
print("R-squared = %s" % metrics_nall.r2)
print(f"MAE = {metrics_nall.meanAbsoluteError}")

RMSE = 71123.18941960506
R-squared = 0.392557673542405
MAE = 55201.64592314891


In [22]:
MASE = metrics.meanAbsoluteError/metrics_nall.meanAbsoluteError
print(f"MASE = {MASE}")

MASE = 0.9390369142272093


## Вывод
Средняя абсолютная масштабированная ошибка (Mean absolute scaled error - MASE) равна 0.93, означает что модель на только числовых признаках хуже. Это так же подтверждает и показатель R2.<br>
В целом R2 меньше 0.5, говорит о том, что модель плохая, на это указывает так же и большие значения показателей MAE и RMSE.

In [23]:
# start_df_info