## Предсказание стоимости жилья

В  данном проекте необходимо обучить модель линейной регрессии на данных о жилье в Калифорнии в 1990 году. На основе данных нужно предсказать медианную стоимость дома в жилом массиве. Для оценки качества модели необходимо использовать метрики RMSE, MAE и R2.

В колонках датасета содержатся следующие данные:
- longitude — широта;
- latitude — долгота;
- housing_median_age — медианный возраст жителей жилого массива;
- total_rooms — общее количество комнат в домах жилого массива;
- total_bedrooms — общее количество спален в домах жилого массива;
- population — количество человек, которые проживают в жилом массиве;
- households — количество домовладений в жилом массиве;
- median_income — медианный доход жителей жилого массива;
- median_house_value — медианная стоимость дома в жилом массиве;
- ocean_proximity — близость к океану.

# Подготовка данных

In [1]:
import pandas as pd 
import numpy as np
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql.functions import isnull
from pyspark.sql.functions import mean
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import RegressionMetrics

pyspark_version = pyspark.__version__
if int(pyspark_version[:1]) == 3:
    from pyspark.ml.feature import OneHotEncoder    
elif int(pyspark_version[:1]) == 2:
    from pyspark.ml.feature import OneHotEncodeEstimator
        
RANDOM_SEED = 2022

In [2]:
spark = SparkSession.builder \
                    .master("local") \
                    .appName("EDA California Housing") \
                    .getOrCreate()

In [3]:
df = spark.read.load('/datasets/housing.csv', 
                                            format="csv", sep=",", inferSchema=True, header="true")
df.printSchema()

[Stage 1:>                                                          (0 + 1) / 1]

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



                                                                                

### Вывожу названия колонок и первые 20 строк

In [4]:

print(pd.DataFrame(df.dtypes, columns=['column', 'type']).head(20))

df.show(20)

               column    type
0           longitude  double
1            latitude  double
2  housing_median_age  double
3         total_rooms  double
4      total_bedrooms  double
5          population  double
6          households  double
7       median_income  double
8  median_house_value  double
9     ocean_proximity  string
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500

### Исследую данные на наличие пропусков

In [5]:
df.filter(df.longitude.isNull()).count()

0

In [6]:
df.filter(df.latitude.isNull()).count()

0

In [7]:
df.filter(df.housing_median_age.isNull()).count()

0

In [8]:
df.filter(df.total_rooms.isNull()).count()

0

In [9]:
df.filter(df.total_bedrooms.isNull()).count()

207

In [10]:
df.filter(df.population.isNull()).count()

0

In [11]:
df.filter(df.households.isNull()).count()

0

In [12]:
df.filter(df.median_income.isNull()).count()

0

In [13]:
df.filter(df.median_house_value.isNull()).count()

0

In [14]:
df.filter(df.ocean_proximity.isNull()).count()

0

### Нашла пропуски в столбце total_bedrooms. Заполняю их средним значением столбца

Ищу среднее значение столбца total_bedrooms

**Комментарий студента: я бы с удовольствием посчитала бы медиану столбца total_bedrooms, но я не нашла способа как эту медиану вычислить в pyspark**

In [15]:
df.select(mean('total_bedrooms')).collect()

[Row(avg(total_bedrooms)=537.8705525375618)]

Заполняю пропуски средним значением

In [16]:
df = df.withColumn("total_bedrooms", col("total_bedrooms").cast(IntegerType()))

In [17]:
df = df.na.fill(value=537.9, subset=['total_bedrooms'])

In [18]:
df.filter(df.total_bedrooms.isNull()).count()

0

### Преобразую колонку с категориальными значениями техникой One hot encoding

Делю столбцы на числовые и категориальные

In [19]:
categorical_cols = ['ocean_proximity']
numerical_cols  = ['longitude', 'latitude', 'housing_median_age', 'total_rooms', 'total_bedrooms', \
                   'population', 'households', 'median_income']
target = 'median_house_value'

### Делю датасет на выборки

Выборка всех данных

In [20]:
train_data, test_data = df.randomSplit([.8,.2], seed=RANDOM_SEED)
print(train_data.count(), test_data.count())



                                                                                

16418 4222


## Кодирование и стандартизация выборки (на обучающих данных)

### Трансформирую категориальные признаки (перевожу текстовые категории в числовое представление)

In [21]:
indexer = StringIndexer(inputCols=categorical_cols, 
                        outputCols=[c+'_idx' for c in categorical_cols]) 
#train_data = indexer.fit(train_data).transform(train_data)
indexer = indexer.fit(train_data)
train_data = indexer.transform(train_data)
test_data = indexer.transform(test_data)

cols = [c for c in train_data.columns for i in categorical_cols if (c.startswith(i))]
cols2 = [c for c in test_data.columns for i in categorical_cols if (c.startswith(i))]
train_data.select(cols).show(6)
test_data.select(cols2).show(6)

                                                                                

+---------------+-------------------+
|ocean_proximity|ocean_proximity_idx|
+---------------+-------------------+
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
+---------------+-------------------+
only showing top 6 rows

+---------------+-------------------+
|ocean_proximity|ocean_proximity_idx|
+---------------+-------------------+
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
+---------------+-------------------+
only showing top 6 rows



In [22]:
encoder = OneHotEncoder(inputCols=[c+'_idx' for c in categorical_cols],
                        outputCols=[c+'_ohe' for c in categorical_cols])
encoder = encoder.fit(train_data)
train_data = encoder.transform (train_data)
test_data = encoder.transform (test_data)

cols = [c for c in train_data.columns for i in categorical_cols if (c.startswith(i))]
cols2 = [c for c in test_data.columns for i in categorical_cols if (c.startswith(i))]
train_data.select(cols).show(3)
test_data.select(cols2).show(3)

+---------------+-------------------+-------------------+
|ocean_proximity|ocean_proximity_idx|ocean_proximity_ohe|
+---------------+-------------------+-------------------+
|     NEAR OCEAN|                2.0|      (3,[2],[1.0])|
|     NEAR OCEAN|                2.0|      (3,[2],[1.0])|
|     NEAR OCEAN|                2.0|      (3,[2],[1.0])|
+---------------+-------------------+-------------------+
only showing top 3 rows

+---------------+-------------------+-------------------+
|ocean_proximity|ocean_proximity_idx|ocean_proximity_ohe|
+---------------+-------------------+-------------------+
|     NEAR OCEAN|                2.0|      (3,[2],[1.0])|
|     NEAR OCEAN|                2.0|      (3,[2],[1.0])|
|     NEAR OCEAN|                2.0|      (3,[2],[1.0])|
+---------------+-------------------+-------------------+
only showing top 3 rows



Объединяю признаки в один вектор

In [23]:
categorical_assembler = \
        VectorAssembler(inputCols=[c+'_ohe' for c in categorical_cols],
                                        outputCol="categorical_features")
train_data = categorical_assembler.transform(train_data)
test_data = categorical_assembler.transform(test_data)

### Транформирую числовые признаки

In [24]:
numerical_assembler = VectorAssembler(inputCols=numerical_cols, outputCol='numerical_features')
train_data = numerical_assembler.transform(train_data)
test_data = numerical_assembler.transform(test_data)

In [25]:
standardScaler = StandardScaler(inputCol='numerical_features', outputCol='numerical_features_scaled')
standardScaler = standardScaler.fit(train_data)
train_data = standardScaler.transform(train_data)
test_data = standardScaler.transform(test_data)

                                                                                

Объединяю категориальные и числовые признаки

In [26]:
all_features = ['categorical_features','numerical_features_scaled']

final_assembler = VectorAssembler(inputCols=all_features, 
                                  outputCol="features") 
train_data = final_assembler.transform(train_data)
test_data = final_assembler.transform(test_data)

train_data.select(all_features).show(3)
test_data.select(all_features).show(3)

+--------------------+-------------------------+
|categorical_features|numerical_features_scaled|
+--------------------+-------------------------+
|       [0.0,0.0,1.0]|     [-61.952887791441...|
|       [0.0,0.0,1.0]|     [-61.927977100733...|
|       [0.0,0.0,1.0]|     [-61.913030686308...|
+--------------------+-------------------------+
only showing top 3 rows

+--------------------+-------------------------+
|categorical_features|numerical_features_scaled|
+--------------------+-------------------------+
|       [0.0,0.0,1.0]|     [-61.927977100733...|
|       [0.0,0.0,1.0]|     [-61.893102133741...|
|       [0.0,0.0,1.0]|     [-61.883137857458...|
+--------------------+-------------------------+
only showing top 3 rows



Собираю трансформированные категорийные и числовые признаки с помощью VectorAssembler

In [27]:
all_features = ['categorical_features','numerical_features_scaled']

final_assembler = VectorAssembler(inputCols=all_features, 
                                  outputCol="features2") 
train_data = final_assembler.transform(train_data)
test_data = final_assembler.transform(test_data)

train_data.select(all_features).show(3)
test_data.select(all_features).show(3)

+--------------------+-------------------------+
|categorical_features|numerical_features_scaled|
+--------------------+-------------------------+
|       [0.0,0.0,1.0]|     [-61.952887791441...|
|       [0.0,0.0,1.0]|     [-61.927977100733...|
|       [0.0,0.0,1.0]|     [-61.913030686308...|
+--------------------+-------------------------+
only showing top 3 rows

+--------------------+-------------------------+
|categorical_features|numerical_features_scaled|
+--------------------+-------------------------+
|       [0.0,0.0,1.0]|     [-61.927977100733...|
|       [0.0,0.0,1.0]|     [-61.893102133741...|
|       [0.0,0.0,1.0]|     [-61.883137857458...|
+--------------------+-------------------------+
only showing top 3 rows



Собираю трансформированные числовые признаки с помощью VectorAssembler

In [28]:
features_numerical = ['numerical_features_scaled']

final_assembler = VectorAssembler(inputCols=features_numerical, 
                                  outputCol="features_numerical2") 
train_data_numeric = final_assembler.transform(train_data)
test_data_numeric = final_assembler.transform(test_data)

train_data_numeric.select(features_numerical).show(3)
test_data_numeric.select(features_numerical).show(3)

+-------------------------+
|numerical_features_scaled|
+-------------------------+
|     [-61.952887791441...|
|     [-61.927977100733...|
|     [-61.913030686308...|
+-------------------------+
only showing top 3 rows

+-------------------------+
|numerical_features_scaled|
+-------------------------+
|     [-61.927977100733...|
|     [-61.893102133741...|
|     [-61.883137857458...|
+-------------------------+
only showing top 3 rows



# Обучение моделей

## Обучаю модель линейной регрессии на всех данных

In [29]:
lr = LinearRegression(labelCol=target, featuresCol='features2')

model = lr.fit(train_data)

23/03/25 15:06:28 WARN Instrumentation: [9d1219d2] regParam is zero, which might cause numerical instability and overfitting.
23/03/25 15:06:28 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/03/25 15:06:28 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
23/03/25 15:06:29 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
23/03/25 15:06:29 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK


### Предсказание вероятности

In [30]:
predictions = model.transform(test_data)

predictedLabes = predictions.select("median_house_value", "prediction")
predictedLabes.show()

+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|          103600.0|152865.87514861487|
|           50800.0|214876.07044658205|
|           58100.0| 142618.2032391131|
|           68400.0|132349.13921796344|
|           72200.0|  163939.709169711|
|           67000.0|154334.70189631497|
|           81300.0|  152708.727459664|
|           70500.0| 164406.5200928771|
|           60000.0|142660.04781813687|
|          109400.0|171160.71687177988|
|           74100.0|150521.59202428348|
|           74700.0|167727.33971670223|
|           90000.0| 209692.4597493834|
|          104200.0|200349.14378986368|
|           74100.0| 156998.5691604428|
|           67500.0|148166.00283249747|
|          103100.0| 47313.34136067517|
|           92500.0| 166599.2089151363|
|          128100.0|221960.62252641702|
|           99600.0| 187129.5296109207|
+------------------+------------------+
only showing top 20 rows



### Оценка качества модели

#### RMSE, MAE, R2 модели, обученной на всех данных

In [31]:
trainingSummary = model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)
print("MAE: %f" %trainingSummary.meanAbsoluteError)

RMSE: 68837.946988
r2: 0.642520
MAE: 49825.213058


## Обучаю модель линейной регрессии на числовых переменных

In [32]:
lr_numeric = LinearRegression(labelCol=target, featuresCol='features_numerical2')

model_numeric = lr_numeric.fit(train_data_numeric)

23/03/25 15:06:31 WARN Instrumentation: [1a017e01] regParam is zero, which might cause numerical instability and overfitting.


### Предсказание вероятности на числовых переменных

In [33]:
predictions_numeric = model_numeric.transform(test_data_numeric)

predictedLabes = predictions_numeric.select("median_house_value", "prediction")
predictedLabes.show()

+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|          103600.0|101361.56231828593|
|           50800.0|183294.72599111963|
|           58100.0|109575.40450924309|
|           68400.0| 80388.24150315206|
|           72200.0| 129873.3605462294|
|           67000.0|120400.88736406295|
|           81300.0| 117988.9750988842|
|           70500.0|130608.32066558907|
|           60000.0|110059.38271041354|
|          109400.0|118169.53892516624|
|           74100.0|117714.81989784446|
|           74700.0|134354.53908405034|
|           90000.0|175912.64852028713|
|          104200.0|166485.60635260819|
|           74100.0|122927.37492212746|
|           67500.0|113694.84989487799|
|          103100.0|-8425.855139859486|
|           92500.0|  140796.576008101|
|          128100.0|191171.40436296258|
|           99600.0| 152616.6706249104|
+------------------+------------------+
only showing top 20 rows



### Оценка качества модели на числовы переменных

#### RMSE, MAE, R2 модели, обученной на числовых переменных

In [34]:
trainingSummary = model_numeric.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)
print("MAE: %f" %trainingSummary.meanAbsoluteError)

RMSE: 69775.766998
r2: 0.632713
MAE: 50930.398065


In [35]:
spark.stop()

# Анализ результатов

Модель, обученная на всех данных, показала лучший результат по сравнению с моделью, обученной только на числовых переменных.