## Предсказание стоимости жилья

В проекте  нужно обучить модель линейной регрессии на данных о жилье в Калифорнии в 1990 году. На основе данных нужно предсказать медианную стоимость дома в жилом массиве. Обучите модель и сделайте предсказания на тестовой выборке.

In [None]:
# Инициализируем локальную спарк сессию
import pyspark
from pyspark.sql import Window

from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.sql.types import *
from pyspark.ml.feature import OneHotEncoder, VectorAssembler,StringIndexer,StandardScaler
import pyspark.sql.functions as F


In [None]:
spark = SparkSession.builder \
                    .master("local") \
                    .appName("Learning DataFrame Window Functions") \
                    .getOrCreate()

In [None]:
RS = 7

In [None]:
# прочитаем файл
data = spark.read.load('/datasets/housing.csv', format="csv", sep=",", inferSchema=True, header="true")


                                                                                

In [None]:
data.show(3)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
only showing top 3 rows



In [None]:
data.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



In [None]:
data_columns = data.columns


In [None]:
def check_nan():
    print("NaN values in each column:")
    for col in data_columns:
        print(col, data.filter(F.col(col).isNull()).count())

In [None]:
check_nan()

NaN values in each column:
longitude 0
latitude 0
housing_median_age 0
total_rooms 0
total_bedrooms 207
population 0
households 0
median_income 0
median_house_value 0
ocean_proximity 0


Вывод:

заполним средним

# Подготовка данных

In [None]:
total_bedrooms_avg = data.agg({'total_bedrooms':'mean'}).collect()

In [None]:
total_bedrooms_avg

[Row(avg(total_bedrooms)=537.8705525375618)]

In [None]:
data= data.na.fill({'total_bedrooms':537.8705525375618})

In [None]:
check_nan()

NaN values in each column:
longitude 0
latitude 0
housing_median_age 0
total_rooms 0
total_bedrooms 0
population 0
households 0
median_income 0
median_house_value 0
ocean_proximity 0


In [None]:
data.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = false)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



In [None]:
cat_column = 'ocean_proximity'

In [None]:
train_data, test_data = data.randomSplit([0.8,0.2],seed=RS)

In [None]:
# проверка
print(train_data.count(),test_data.count())

                                                                                

16481 4159


In [None]:
train_data

DataFrame[longitude: double, latitude: double, housing_median_age: double, total_rooms: double, total_bedrooms: double, population: double, households: double, median_income: double, median_house_value: double, ocean_proximity: string]

In [None]:
test_data

DataFrame[longitude: double, latitude: double, housing_median_age: double, total_rooms: double, total_bedrooms: double, population: double, households: double, median_income: double, median_house_value: double, ocean_proximity: string]

In [None]:
indexer = StringIndexer(inputCol=cat_column,
                   outputCol=cat_column+'_idx')

In [None]:
indexer_model = indexer.fit(train_data)

                                                                                

In [None]:
train_data = indexer_model.transform(train_data)

In [None]:
test_data = indexer_model.transform(test_data)

In [None]:
def check_unique(data,column):
    display(data.select(column).distinct().collect())

In [None]:
check_unique(train_data,'ocean_proximity_idx')


                                                                                

[Row(ocean_proximity_idx=0.0),
 Row(ocean_proximity_idx=1.0),
 Row(ocean_proximity_idx=4.0),
 Row(ocean_proximity_idx=3.0),
 Row(ocean_proximity_idx=2.0)]

In [None]:
check_unique(test_data,'ocean_proximity_idx')


                                                                                

[Row(ocean_proximity_idx=0.0),
 Row(ocean_proximity_idx=1.0),
 Row(ocean_proximity_idx=4.0),
 Row(ocean_proximity_idx=3.0),
 Row(ocean_proximity_idx=2.0)]

In [None]:
ohe = OneHotEncoder(inputCol=cat_column+'_idx',
                   outputCol=cat_column+'_ohe')

In [None]:
ohe_model = ohe.fit(train_data)

In [None]:
train_data = ohe_model.transform(train_data)


In [None]:
test_data = ohe_model.transform(test_data)

In [None]:
check_unique(train_data,'ocean_proximity_ohe')


                                                                                

[Row(ocean_proximity_ohe=SparseVector(4, {2: 1.0})),
 Row(ocean_proximity_ohe=SparseVector(4, {0: 1.0})),
 Row(ocean_proximity_ohe=SparseVector(4, {})),
 Row(ocean_proximity_ohe=SparseVector(4, {3: 1.0})),
 Row(ocean_proximity_ohe=SparseVector(4, {1: 1.0}))]

In [None]:
check_unique(test_data,'ocean_proximity_ohe')


                                                                                

[Row(ocean_proximity_ohe=SparseVector(4, {2: 1.0})),
 Row(ocean_proximity_ohe=SparseVector(4, {0: 1.0})),
 Row(ocean_proximity_ohe=SparseVector(4, {})),
 Row(ocean_proximity_ohe=SparseVector(4, {3: 1.0})),
 Row(ocean_proximity_ohe=SparseVector(4, {1: 1.0}))]

In [None]:
data.columns

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income',
 'median_house_value',
 'ocean_proximity']

In [None]:
features_all = ['longitude',
 'latitude',
'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income',
 'ocean_proximity_ohe']
features_wo_cat = ['longitude',
 'latitude',
'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income']
target =  'median_house_value'

In [None]:
numerical_columns=['longitude',
 'latitude',
'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income']

In [None]:
categorical_assembler = \
        VectorAssembler(inputCols=[cat_column+'_ohe'],
                                        outputCol="categorical_features")
train_data = categorical_assembler.transform(train_data)

In [None]:
categorical_assembler = \
        VectorAssembler(inputCols=[cat_column+'_ohe'],
                                        outputCol="categorical_features")
test_data = categorical_assembler.transform(test_data)

In [None]:
numeric_assembler = VectorAssembler(inputCols=[c for c in numerical_columns],outputCol='numeric_features')
train_data= numeric_assembler.transform(train_data)

In [None]:
numeric_assembler = VectorAssembler(inputCols=[c for c in numerical_columns],outputCol='numeric_features')
test_data= numeric_assembler.transform(test_data)

In [None]:
train_data.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = false)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)
 |-- ocean_proximity_idx: double (nullable = false)
 |-- ocean_proximity_ohe: vector (nullable = true)
 |-- categorical_features: vector (nullable = true)
 |-- numeric_features: vector (nullable = true)



In [None]:
test_data.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = false)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)
 |-- ocean_proximity_idx: double (nullable = false)
 |-- ocean_proximity_ohe: vector (nullable = true)
 |-- categorical_features: vector (nullable = true)
 |-- numeric_features: vector (nullable = true)



In [None]:

standardScaler = StandardScaler(inputCol='numeric_features', outputCol='numeric_features_snrd')
train_data = standardScaler.fit(train_data).transform(train_data)
test_data = standardScaler.fit(test_data).transform(test_data)

In [None]:
features_all = ['numeric_features_snrd','categorical_features']
features = ['numeric_features_snrd']

In [None]:
# передадим все нужные признаки в вектор
features_assembler = VectorAssembler(inputCols=features_all,outputCol='features')
train_data = features_assembler.transform(train_data)


In [None]:
# передадим все нужные признаки в вектор
features_assembler = VectorAssembler(inputCols=features_all,outputCol='features')
test_data = features_assembler.transform(test_data)


In [None]:
# передадим все,кроме категориальных, признаки в вектор
features_assembler_wo_cat = VectorAssembler(inputCols=features,outputCol='features_wo_cat')
train_data = features_assembler_wo_cat.transform(train_data)

In [None]:
# передадим все,кроме категориальных, признаки в вектор
features_assembler_wo_cat = VectorAssembler(inputCols=features,outputCol='features_wo_cat')
test_data = features_assembler_wo_cat.transform(test_data)

In [None]:
train_data.printSchema()


root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = false)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)
 |-- ocean_proximity_idx: double (nullable = false)
 |-- ocean_proximity_ohe: vector (nullable = true)
 |-- categorical_features: vector (nullable = true)
 |-- numeric_features: vector (nullable = true)
 |-- numeric_features_snrd: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- features_wo_cat: vector (nullable = true)



In [None]:
test_data.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = false)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)
 |-- ocean_proximity_idx: double (nullable = false)
 |-- ocean_proximity_ohe: vector (nullable = true)
 |-- categorical_features: vector (nullable = true)
 |-- numeric_features: vector (nullable = true)
 |-- numeric_features_snrd: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- features_wo_cat: vector (nullable = true)



# Обучение моделей

##  Используем все признаки:

In [None]:
def lin_reg(label,features,train,test,n_to_show=10):
    linear_regression = LinearRegression(labelCol=label,featuresCol=features)
    model = linear_regression.fit(train)
    predict = model.transform(test)
    predict.select(['median_house_value','features','prediction']).show(n_to_show)
    regress_eval_rmse = RegressionEvaluator(
    labelCol=label, predictionCol="prediction", metricName="rmse")
    print('RMSE of model is = %g' % regress_eval_rmse.evaluate(predict))
    regress_eval_r2 = RegressionEvaluator(
        labelCol=label, predictionCol="prediction", metricName="r2")
    print('R2 of model is = %g' % regress_eval_r2.evaluate(predict))
    regress_eval_mae = RegressionEvaluator(
        labelCol=label, predictionCol="prediction", metricName="mae")
    print('MAE of model is = %g' % regress_eval_mae.evaluate(predict))

In [None]:
lin_reg(label=target,features='features',train=train_data,test=test_data,n_to_show=15)

22/12/24 21:53:49 WARN Instrumentation: [19c0d11e] regParam is zero, which might cause numerical instability and overfitting.
22/12/24 21:53:49 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/12/24 21:53:49 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
22/12/24 21:53:49 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
22/12/24 21:53:49 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK


+------------------+--------------------+------------------+
|median_house_value|            features|        prediction|
+------------------+--------------------+------------------+
|           76100.0|[-62.346767604200...|192100.06063064537|
|           50800.0|[-62.336731907201...|235885.69687341922|
|           78300.0|[-62.331714058702...|144672.87973488192|
|           58100.0|[-62.326696210203...|160465.46999319107|
|           69000.0|[-62.316660513204...| 195867.8584616005|
|           74600.0|[-62.316660513204...|122185.61747195455|
|           72200.0|[-62.311642664705...|182737.90533559886|
|           70500.0|[-62.311642664705...|162907.24843274546|
|           86400.0|[-62.306624816205...| 205645.5844819143|
|          128900.0|[-62.306624816205...| 227745.0852126556|
|           70500.0|[-62.306624816205...| 182361.2132793374|
|           74100.0|[-62.301606967706...|168452.69721016963|
|           80500.0|[-62.301606967706...|199099.98477088404|
|           96000.0|[-62

##  Используем только числовые признаки:


In [None]:
lin_reg(label=target,features='features_wo_cat',train=train_data,test=test_data,n_to_show=15)

22/12/24 21:53:53 WARN Instrumentation: [45db6e9d] regParam is zero, which might cause numerical instability and overfitting.


+------------------+--------------------+------------------+
|median_house_value|            features|        prediction|
+------------------+--------------------+------------------+
|           76100.0|[-62.346767604200...|180255.25655686483|
|           50800.0|[-62.336731907201...| 215569.0264028539|
|           78300.0|[-62.331714058702...|105904.67190399021|
|           58100.0|[-62.326696210203...| 138752.4593104166|
|           69000.0|[-62.316660513204...|173735.28019600734|
|           74600.0|[-62.316660513204...| 80811.55055968184|
|           72200.0|[-62.311642664705...|159941.94416810526|
|           70500.0|[-62.311642664705...|139594.87999911653|
|           86400.0|[-62.306624816205...|  185764.502147282|
|          128900.0|[-62.306624816205...|205079.84416332748|
|           70500.0|[-62.306624816205...| 159813.5739037334|
|           74100.0|[-62.301606967706...|146914.45745939296|
|           80500.0|[-62.301606967706...|178009.03603035584|
|           96000.0|[-62

In [None]:
spark.stop()

# Анализ результатов

Юез стандартизации:

Модель со всеми признаками:

* **RMSE** of model is = **69780.1**

* **R2** of model is = **0.632608**

* **MAE** of model is = **50100.9**

Модель без категориальных признаков:

* **RMSE** of model is = **70634.6**

* **R2** of model is = **0.623555**

* **MAE** of model is = **51165.8**

С стандартизацией

Модель со всеми признаками:

* **RMSE** of model is = **71916.5**
* **R2** of model is = **0.609767**
* **MAE** of model is = **55351.9**

Модель без категориальных признаков:

* **RMSE** of model is = **75958**

* **R2** of model is = **0.564675**

* **MAE** of model is = **61018.1**


Вывод:

Можем заметить, что модель со всеми признакми лучше, чем без категориальных, однако далека от хорошей модели.
Модель без стандартизации показала результаты лучше.