В проекте нужно обучить модель на данных о жилье в Калифорнии в 1990 году. На основе данных нужно предсказать медианную стоимость дома в жилом массиве. Для оценки качества модели используйте метрики RMSE, MAE и R2.

### подготовка данных

импорт библиотек

In [1]:
import pandas as pd 
import numpy as np
import pyspark
import pyspark.sql.functions as F

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler, OneHotEncoder
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

импорт данных

In [2]:
RANDOM_SEED = 12345

spark = SparkSession.builder \
                    .master("local") \
                    .appName("Housing - Linear regression") \
                    .getOrCreate()

df = spark.read.option('header', 'true').csv('/datasets/housing.csv', inferSchema = True) 

                                                                                

In [3]:
df.show(3)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
only showing top 3 rows



данные выгрузились корректно

проверим пропущенные значения

In [4]:
missing_data = df.agg(*[F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).toPandas()
print(missing_data)

[Stage 3:>                                                          (0 + 1) / 1]

   longitude  latitude  housing_median_age  total_rooms  total_bedrooms  \
0          0         0                   0            0             207   

   population  households  median_income  median_house_value  ocean_proximity  
0           0           0              0                   0                0  


                                                                                

пропущенные значения в колонке total_bedrooms заполним медианой

In [5]:
median = df.approxQuantile('total_bedrooms', [0.5], 0.001)[0]
df = df.fillna({'total_bedrooms': median})

                                                                                

создадим новые признаки

In [6]:
df = df.withColumn('population_in_household', F.col('population') / F.col('households'))
df = df.withColumn('bedroom_index', F.col('total_bedrooms') / F.col('total_rooms'))
df = df.withColumn('rooms_per_household', F.col('total_rooms') / F.col('households'))
df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = false)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)
 |-- population_in_household: double (nullable = true)
 |-- bedroom_index: double (nullable = true)
 |-- rooms_per_household: double (nullable = true)



подготовка признаков

In [7]:
categorical_cols = ['ocean_proximity']
numerical_cols  = ['longitude', 'latitude', 'housing_median_age', 'total_rooms', 'total_bedrooms', 'population', 'households', 'median_income', 'rooms_per_household', 'population_in_household', 'bedroom_index'  ]
target = ['median_house_value']

In [8]:
train_data, test_data = df.randomSplit([0.8, 0.2], RANDOM_SEED)

In [9]:
indexer = StringIndexer(inputCols=categorical_cols, outputCols=[c+'_idx' for c in categorical_cols])
encoder = OneHotEncoder(inputCols=[c+'_idx' for c in categorical_cols],outputCols=[c+'_ohe' for c in categorical_cols])
categorical_assembler = VectorAssembler(inputCols=[c+'_ohe' for c in categorical_cols],outputCol="categorical_features")
numerical_assembler = VectorAssembler(inputCols=numerical_cols,outputCol='numerical_features')
standardScaler = StandardScaler(inputCol='numerical_features',outputCol='numerical_features_scaled')
final_assembler = VectorAssembler(inputCols=['categorical_features','numerical_features_scaled'], outputCol='features')
lr = LinearRegression(featuresCol='features', labelCol=target[0],  maxIter=10, regParam=0.3, elasticNetParam=0.8, standardization=False)

In [10]:
pipeline = Pipeline(stages=[indexer, encoder, categorical_assembler, numerical_assembler, standardScaler, final_assembler,lr])

### обучение моделей

моедель линейной регресси на всех признаках

In [11]:
piplineModel = pipeline.fit(train_data)
prediction = piplineModel.transform(test_data)

23/06/04 13:02:45 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/06/04 13:02:45 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

In [12]:
evaluator_rmse = RegressionEvaluator(labelCol=target[0], predictionCol='prediction', metricName='rmse')
rmse = evaluator_rmse.evaluate(prediction)

evaluator_mae = RegressionEvaluator(labelCol=target[0], predictionCol='prediction', metricName='mae')
mae = evaluator_mae.evaluate(prediction)

evaluator_r2 = RegressionEvaluator(labelCol=target[0], predictionCol='prediction', metricName='r2')
r2 = evaluator_r2.evaluate(prediction)

                                                                                

только на колличественных.

In [13]:
numerical_cols  = ['longitude', 'latitude', 'housing_median_age', 'total_rooms', 'total_bedrooms', 'population', 'households', 'median_income', 'rooms_per_household', 'population_in_household', 'bedroom_index'  ]
target = ['median_house_value']
train_data, test_data = df.randomSplit([0.8, 0.2], RANDOM_SEED)

assembler = VectorAssembler(inputCols=numerical_cols,outputCol='numerical_features')
standardScaler = StandardScaler(inputCol='numerical_features',outputCol='numerical_features_scaled')
final_assembler = VectorAssembler(inputCols=['numerical_features_scaled'], outputCol='features')

lr = LinearRegression(featuresCol='features', labelCol=target[0],  maxIter=10, regParam=0.3, elasticNetParam=0.8, standardization=False)
pipeline = Pipeline(stages=[assembler, standardScaler, final_assembler,lr])
pipelineModel = pipeline.fit(train_data)
predictions = pipelineModel.transform(test_data)

evaluator_rmse = RegressionEvaluator(labelCol=target[0], predictionCol='prediction', metricName='rmse')
rmse_num = evaluator_rmse.evaluate(predictions)

evaluator_mae = RegressionEvaluator(labelCol=target[0], predictionCol='prediction', metricName='mae')
mae_num = evaluator_mae.evaluate(predictions)

evaluator_r2 = RegressionEvaluator(labelCol=target[0], predictionCol='prediction', metricName='r2')
r2_num = evaluator_r2.evaluate(predictions)

                                                                                

### вывод

In [14]:
final_score = pd.DataFrame({'RMSE': [rmse, rmse_num], 'MAE': [mae, mae_num], 'R2': [r2, r2_num]}, index=['для всех признаков', 'для колличественных признаков'])

print(final_score)

                                       RMSE           MAE        R2
для всех признаков             67896.957492  48729.147766  0.656956
для колличественных признаков  69405.517091  49988.934375  0.641543


Для проекта использовали дф с данными о жилье в Калифорнии в 1990 году. Данные в целом были качественные, немного пропусков  в total_bedrooms. Категориальные признаки обработали indexler и ohe, колличественные standartscaler, затем создали вектор для модели, загрузили все это в контейнер и обучили модель линейной регресси . Лучшие ( хоть и не сильно) результат показала модель обученная всех признаках. 

Для увеличения эффективности стоит попробовать использовать модели бустинга и более тщательный подбор гиерпараметров.

In [15]:
spark.stop()