# California Housing Price Prediction

**Цель проекта:** Построение и сравнение двух моделей линейной регрессии для предсказания медианной стоимости жилья (`median_house_value`) на данных California Housing 1990.

**Технологический стек:** PySpark 3.x, MLlib, DataFrame API

**Целевая переменная:** `median_house_value`

**Метрики оценки:** RMSE, MAE, R²

**Модели:**
- **Модель 1:** Все признаки (числовые + OHE-кодированный `ocean_proximity`)
- **Модель 2:** Только числовые признаки

## Imports

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, isnan, when, count, mean

from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

import pandas as pd



## Global Constants

In [2]:
RANDOM_SEED = 42
DATA_PATH = "source/housing.csv"

## Инициализация Spark-сессии

In [3]:
spark = (
    SparkSession
    .builder
    .appName("California Housing - Linear Regression")
    .master("local[3]")
    .config("spark.driver.memory", "6g")
    .config("spark.executor.memory", "2g")
    .config("spark.sql.shuffle.partitions", "8")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .getOrCreate()
)

print(f"Spark version: {spark.version}")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/01/17 06:57:19 WARN Utils: Your hostname, EvanderPC, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
26/01/17 06:57:19 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/17 06:57:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark version: 4.0.0


### Результат

SparkSession успешно инициализирована с локальной конфигурацией (3 потока, 6GB driver memory).

## Загрузка и первичный осмотр данных

In [4]:
try:
    df = spark.read.csv(DATA_PATH, header = True, inferSchema = True)
except:
    df = spark.read.csv("/datasets/housing.csv", header = True, inferSchema = True)

In [5]:
df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



In [6]:
df.show(10)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [7]:
display(df.describe().toPandas())

26/01/17 06:57:34 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


Unnamed: 0,summary,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,count,20640.0,20640.0,20640.0,20640.0,20433.0,20640.0,20640.0,20640.0,20640.0,20640
1,mean,-119.56970445736148,35.6318614341087,28.639486434108527,2635.7630813953488,537.8705525375618,1425.4767441860463,499.5396802325581,3.8706710029070246,206855.81690891477,
2,stddev,2.003531723502584,2.135952397457101,12.58555761211163,2181.6152515827944,421.3850700740312,1132.46212176534,382.3297528316098,1.899821717945263,115395.6158744136,
3,min,-124.35,32.54,1.0,2.0,1.0,3.0,1.0,0.4999,14999.0,<1H OCEAN
4,max,-114.31,41.95,52.0,39320.0,6445.0,35682.0,6082.0,15.0001,500001.0,NEAR OCEAN


In [8]:
print(f"Количество строк: {df.count()}")
print(f"Количество колонок: {len(df.columns)}")

Количество строк: 20640
Количество колонок: 10


### Результат

Датасет загружен. Схема соответствует ожидаемой: 8 числовых признаков (double), 1 категориальный (string), 1 целевая переменная (double).

## Обработка пропущенных значений

In [9]:
from pyspark.sql.types import StringType

numeric_cols_check = [f.name for f in df.schema.fields if not isinstance(f.dataType, StringType)]
string_cols_check = [f.name for f in df.schema.fields if isinstance(f.dataType, StringType)]

null_exprs = []
for c in numeric_cols_check:
    null_exprs.append(count(when(col(c).isNull() | isnan(c), c)).alias(c))
for c in string_cols_check:
    null_exprs.append(count(when(col(c).isNull(), c)).alias(c))

null_counts = df.select(null_exprs)
display(null_counts.toPandas())

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,0,0,0,0,207,0,0,0,0,0


In [10]:
total_bedrooms_median = df.approxQuantile("total_bedrooms", [0.5], 0.01)[0]
print(f"Медиана total_bedrooms: {total_bedrooms_median}")

Медиана total_bedrooms: 433.0


In [11]:
df = df.fillna({"total_bedrooms": total_bedrooms_median})

In [12]:
null_exprs_after = []
for c in numeric_cols_check:
    null_exprs_after.append(count(when(col(c).isNull() | isnan(c), c)).alias(c))
for c in string_cols_check:
    null_exprs_after.append(count(when(col(c).isNull(), c)).alias(c))

null_counts_after = df.select(null_exprs_after)
display(null_counts_after.toPandas())

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,0,0,0,0,0,0,0,0,0,0


### Результат

Пропуски обнаружены в колонке `total_bedrooms`. Применена стратегия заполнения медианой. После обработки DataFrame не содержит null/NaN значений.

## Определение списков признаков

In [13]:
categorical_cols = ["ocean_proximity"]

numerical_cols = [
    "longitude",
    "latitude",
    "housing_median_age",
    "total_rooms",
    "total_bedrooms",
    "population",
    "households",
    "median_income"
]

target = "median_house_value"

print(f"Категориальные признаки: {categorical_cols}")
print(f"Числовые признаки: {numerical_cols}")
print(f"Целевая переменная: {target}")

Категориальные признаки: ['ocean_proximity']
Числовые признаки: ['longitude', 'latitude', 'housing_median_age', 'total_rooms', 'total_bedrooms', 'population', 'households', 'median_income']
Целевая переменная: median_house_value


### Результат

Списки признаков определены. Целевая переменная исключена из списка признаков.

## Трансформация категориальных признаков

In [14]:
df.select("ocean_proximity").distinct().show()

+---------------+
|ocean_proximity|
+---------------+
|     NEAR OCEAN|
|         ISLAND|
|      <1H OCEAN|
|         INLAND|
|       NEAR BAY|
+---------------+



In [15]:
indexer = StringIndexer(
    inputCol = "ocean_proximity",
    outputCol = "ocean_proximity_idx",
    handleInvalid = "keep"
)
df = indexer.fit(df).transform(df)

In [16]:
encoder = OneHotEncoder(
    inputCol = "ocean_proximity_idx",
    outputCol = "ocean_proximity_ohe",
    dropLast = True
)
df = encoder.fit(df).transform(df)

In [17]:
df.select("ocean_proximity", "ocean_proximity_idx", "ocean_proximity_ohe").show(10, truncate = False)

+---------------+-------------------+-------------------+
|ocean_proximity|ocean_proximity_idx|ocean_proximity_ohe|
+---------------+-------------------+-------------------+
|NEAR BAY       |3.0                |(5,[3],[1.0])      |
|NEAR BAY       |3.0                |(5,[3],[1.0])      |
|NEAR BAY       |3.0                |(5,[3],[1.0])      |
|NEAR BAY       |3.0                |(5,[3],[1.0])      |
|NEAR BAY       |3.0                |(5,[3],[1.0])      |
|NEAR BAY       |3.0                |(5,[3],[1.0])      |
|NEAR BAY       |3.0                |(5,[3],[1.0])      |
|NEAR BAY       |3.0                |(5,[3],[1.0])      |
|NEAR BAY       |3.0                |(5,[3],[1.0])      |
|NEAR BAY       |3.0                |(5,[3],[1.0])      |
+---------------+-------------------+-------------------+
only showing top 10 rows


### Результат

Категориальный признак `ocean_proximity` преобразован в числовое представление:
1. StringIndexer → `ocean_proximity_idx` (числовой индекс)
2. OneHotEncoder → `ocean_proximity_ohe` (разреженный вектор)

## Трансформация числовых признаков

In [18]:
numerical_assembler = VectorAssembler(
    inputCols = numerical_cols,
    outputCol = "numerical_features",
    handleInvalid = "keep"
)
df = numerical_assembler.transform(df)

In [19]:
scaler = StandardScaler(
    inputCol = "numerical_features",
    outputCol = "numerical_features_scaled",
    withMean = True,
    withStd = True
)
df = scaler.fit(df).transform(df)

In [20]:
df.select("numerical_features", "numerical_features_scaled").show(5, truncate = False)

+-------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|numerical_features                                     |numerical_features_scaled                                                                                                                                        |
+-------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[-122.23,37.88,41.0,880.0,129.0,322.0,126.0,8.3252]    |[-1.3278030546902004,1.0525227849496404,0.9821188656747666,-0.804799599801809,-0.9723936907596469,-0.9744049915469923,-0.977009185045236,2.3447089561176147]     |
|[-122.22,37.86,21.0,7099.0,1106.0,2401.0,1138.0,8.3014]|[-1.3228118684350991,1.0431592803959744,-0.6070042082805048,2.0

### Результат

Числовые признаки:
1. Собраны в вектор `numerical_features` через VectorAssembler
2. Стандартизированы (z-score) в `numerical_features_scaled` через StandardScaler

## Сборка финальных векторов признаков

In [21]:
all_features_assembler = VectorAssembler(
    inputCols = ["numerical_features_scaled", "ocean_proximity_ohe"],
    outputCol = "features_all",
    handleInvalid = "keep"
)
df = all_features_assembler.transform(df)

In [22]:
numerical_only_assembler = VectorAssembler(
    inputCols = ["numerical_features_scaled"],
    outputCol = "features_numerical_only",
    handleInvalid = "keep"
)
df = numerical_only_assembler.transform(df)

In [23]:
df.select("features_all", "features_numerical_only").show(5, truncate = False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|features_all                                                                                                                                                                         |features_numerical_only                                                                                                                                          |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------

### Результат

Созданы два набора признаков:
- `features_all` — масштабированные числовые + OHE категориальные (для Модели 1)
- `features_numerical_only` — только масштабированные числовые (для Модели 2)

## Разделение на обучающую и тестовую выборки

In [24]:
train_data, test_data = df.randomSplit([0.8, 0.2], seed = RANDOM_SEED)

In [25]:
train_count = train_data.count()
test_count = test_data.count()
total_count = train_count + test_count

print(f"Обучающая выборка: {train_count} ({train_count / total_count * 100:.1f}%)")
print(f"Тестовая выборка: {test_count} ({test_count / total_count * 100:.1f}%)")

Обучающая выборка: 16560 (80.2%)
Тестовая выборка: 4080 (19.8%)


### Результат

Данные разделены на обучающую (80%) и тестовую (20%) выборки с фиксированным `RANDOM_SEED = 42` для воспроизводимости.

## Обучение Модели 1 (все признаки)

In [26]:
lr_all = LinearRegression(
    featuresCol = "features_all",
    labelCol = "median_house_value",
    predictionCol = "prediction_all"
)

model_all = lr_all.fit(train_data)

26/01/17 06:57:39 WARN Instrumentation: [35f2d639] regParam is zero, which might cause numerical instability and overfitting.


In [27]:
print(f"Коэффициенты модели 1: {model_all.coefficients}")
print(f"Intercept модели 1: {model_all.intercept}")

Коэффициенты модели 1: [-53490.07345595366,-54644.19263731768,13270.814291262797,-7540.499368468419,28963.320418835432,-51593.58595271231,33580.316339160374,73071.51290304304,-1847662.861184172,-1886790.2020866682,-1842691.0993547563,-1852791.223850157,-1697929.4691433532]
Intercept модели 1: 2066418.3865902456


In [28]:
predictions_all = model_all.transform(test_data)
predictions_all.select("median_house_value", "prediction_all").show(10)

+------------------+------------------+
|median_house_value|    prediction_all|
+------------------+------------------+
|          103600.0| 150535.0313654698|
|          106700.0|217627.58383267722|
|           73200.0|125170.79914365336|
|           90100.0|195021.45765675628|
|           67000.0|152292.70778735774|
|           86400.0|186078.22917030472|
|           70500.0| 163940.2842260038|
|           85100.0|180011.90104833012|
|           80500.0|181582.51330237836|
|           96000.0|170317.31862686411|
+------------------+------------------+
only showing top 10 rows


### Результат

Модель 1 (все признаки: числовые + OHE-кодированный `ocean_proximity`) обучена. Предсказания получены на тестовой выборке.

## Обучение Модели 2 (только числовые признаки)

In [29]:
lr_num = LinearRegression(
    featuresCol = "features_numerical_only",
    labelCol = "median_house_value",
    predictionCol = "prediction_num"
)

model_num = lr_num.fit(train_data)

26/01/17 06:57:41 WARN Instrumentation: [74b352ef] regParam is zero, which might cause numerical instability and overfitting.


In [30]:
print(f"Коэффициенты модели 2: {model_num.coefficients}")
print(f"Intercept модели 2: {model_num.intercept}")

Коэффициенты модели 2: [-85162.95023147648,-90938.2529201782,14228.631891215411,-11503.739447488786,32961.01277015951,-52216.69504409997,34017.21924714423,74881.20724153295]
Intercept модели 2: 206419.06530819417


In [31]:
predictions_num = model_num.transform(test_data)
predictions_num.select("median_house_value", "prediction_num").show(10)

+------------------+------------------+
|median_house_value|    prediction_num|
+------------------+------------------+
|          103600.0|100751.57997894948|
|          106700.0|190789.97226600122|
|           73200.0| 74735.13614428585|
|           90100.0|162343.71353153448|
|           67000.0|119470.36043505865|
|           86400.0|155923.36644661447|
|           70500.0|131197.36798356194|
|           85100.0| 150449.1526877614|
|           80500.0|150168.94827232388|
|           96000.0|133760.89761402982|
+------------------+------------------+
only showing top 10 rows


### Результат

Модель 2 (только числовые признаки) обучена. Предсказания получены на тестовой выборке.

## Оценка качества моделей

In [32]:
evaluator_rmse_all = RegressionEvaluator(
    labelCol = "median_house_value",
    predictionCol = "prediction_all",
    metricName = "rmse"
)
evaluator_mae_all = RegressionEvaluator(
    labelCol = "median_house_value",
    predictionCol = "prediction_all",
    metricName = "mae"
)
evaluator_r2_all = RegressionEvaluator(
    labelCol = "median_house_value",
    predictionCol = "prediction_all",
    metricName = "r2"
)

rmse_all = evaluator_rmse_all.evaluate(predictions_all)
mae_all = evaluator_mae_all.evaluate(predictions_all)
r2_all = evaluator_r2_all.evaluate(predictions_all)

print("Модель 1 (все признаки):")
print(f"  RMSE: {rmse_all:,.2f}")
print(f"  MAE:  {mae_all:,.2f}")
print(f"  R²:   {r2_all:.4f}")

Модель 1 (все признаки):
  RMSE: 70,786.68
  MAE:  50,863.76
  R²:   0.6378


In [33]:
evaluator_rmse_num = RegressionEvaluator(
    labelCol = "median_house_value",
    predictionCol = "prediction_num",
    metricName = "rmse"
)
evaluator_mae_num = RegressionEvaluator(
    labelCol = "median_house_value",
    predictionCol = "prediction_num",
    metricName = "mae"
)
evaluator_r2_num = RegressionEvaluator(
    labelCol = "median_house_value",
    predictionCol = "prediction_num",
    metricName = "r2"
)

rmse_num = evaluator_rmse_num.evaluate(predictions_num)
mae_num = evaluator_mae_num.evaluate(predictions_num)
r2_num = evaluator_r2_num.evaluate(predictions_num)

print("Модель 2 (только числовые):")
print(f"  RMSE: {rmse_num:,.2f}")
print(f"  MAE:  {mae_num:,.2f}")
print(f"  R²:   {r2_num:.4f}")

Модель 2 (только числовые):
  RMSE: 71,791.60
  MAE:  51,804.75
  R²:   0.6275


In [34]:
results_df = pd.DataFrame({
    "Модель": ["Модель 1 (все признаки)", "Модель 2 (числовые)"],
    "RMSE": [rmse_all, rmse_num],
    "MAE": [mae_all, mae_num],
    "R²": [r2_all, r2_num]
})

display(results_df)

Unnamed: 0,Модель,RMSE,MAE,R²
0,Модель 1 (все признаки),70786.683197,50863.757996,0.637843
1,Модель 2 (числовые),71791.596523,51804.751861,0.627487


### Результат

Метрики RMSE, MAE и R² рассчитаны для обеих моделей и представлены в сравнительной таблице.

## Сравнительный анализ и выводы

In [35]:
rmse_diff = rmse_num - rmse_all
rmse_improvement = (rmse_diff / rmse_num) * 100

r2_diff = r2_all - r2_num

print("Сравнительный анализ:")
print(f"  Разница RMSE: {rmse_diff:,.2f} (улучшение {rmse_improvement:.2f}%)")
print(f"  Разница R²: {r2_diff:.4f}")

Сравнительный анализ:
  Разница RMSE: 1,004.91 (улучшение 1.40%)
  Разница R²: 0.0104


In [36]:
spark.stop()