# Предсказание стоимости жилья

В проекте вам нужно обучить модель линейной регрессии на данных о жилье в Калифорнии в 1990 году. На основе данных нужно предсказать медианную стоимость дома в жилом массиве. Обучите модель и сделайте предсказания на тестовой выборке. Для оценки качества модели используйте метрики RMSE, MAE и R2.

In [1]:
import pandas as pd

from pyspark.sql import SparkSession
import pyspark.sql.functions as F

from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import OneHotEncoder
from pyspark.mllib.evaluation import RegressionMetrics

        
RANDOM_SEED = 2022

spark = SparkSession.builder \
                    .master("local") \
                    .appName("cal_housing_data") \
                    .getOrCreate()

In [2]:
df = spark.createDataFrame(pd.read_csv(url))

In [3]:
df.printSchema() 

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



## Подготовка данных

### Замена пропусков на среднее

In [4]:
df.select([F.count(F.when(F.isnan(c), c)).alias(c) for c in df.columns]).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|        0|       0|                 0|          0|           207|         0|         0|            0|                 0|              0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+



In [5]:
df.withColumn('isNull_c', F.isnan(F.col('total_bedrooms'))).where('isNull_c = false').select(F.round(F.mean(F.col('total_bedrooms')))).collect()[0]['round(avg(total_bedrooms), 0)']

538.0

In [6]:
df = df.na.fill(df.withColumn('isNull_c', F.isnan(F.col('total_bedrooms'))) 
           .where('isNull_c = false')
           .select(F.round(F.mean(F.col('total_bedrooms'))))
           .collect()[0]['round(avg(total_bedrooms), 0)'])

### Трансформация категорийных признаков

In [7]:
categorical_cols = ['ocean_proximity']
numerical_cols = ["longitude", "latitude", "housing_median_age", "total_rooms", "total_bedrooms", "population", "households", "median_income"]
target = "median_house_value" 

indexer = StringIndexer(inputCol="ocean_proximity", outputCol="ocean_proximity_idx", stringOrderType="frequencyDesc")

In [8]:
df = indexer.fit(df).transform(df)

In [9]:
cols = [c for c in df.columns for i in ['ocean_proximity'] if (c.startswith(i))]
df.select(cols).show(3) 

+---------------+-------------------+
|ocean_proximity|ocean_proximity_idx|
+---------------+-------------------+
|       NEAR BAY|                3.0|
|       NEAR BAY|                3.0|
|       NEAR BAY|                3.0|
+---------------+-------------------+
only showing top 3 rows



In [10]:
encoder = OneHotEncoder(inputCol="ocean_proximity_idx", outputCol="ocean_proximity_ohe")
df = encoder.transform(df)

In [11]:
cols = [c for c in df.columns for i in categorical_cols if (c.startswith(i))]
df.select(cols).show(3) 

+---------------+-------------------+-------------------+
|ocean_proximity|ocean_proximity_idx|ocean_proximity_ohe|
+---------------+-------------------+-------------------+
|       NEAR BAY|                3.0|      (4,[3],[1.0])|
|       NEAR BAY|                3.0|      (4,[3],[1.0])|
|       NEAR BAY|                3.0|      (4,[3],[1.0])|
+---------------+-------------------+-------------------+
only showing top 3 rows



In [12]:
categorical_assembler = VectorAssembler(inputCols=[c+'_ohe' for c in categorical_cols], outputCol="categorical_features")
df = categorical_assembler.transform(df)

### Трансформация числовых признаков

In [13]:
numerical_assembler = VectorAssembler(inputCols=numerical_cols, outputCol="numerical_features")
df = numerical_assembler.transform(df)

In [14]:
standardScaler = StandardScaler(inputCol='numerical_features', outputCol="numerical_features_scaled")
df = standardScaler.fit(df).transform(df)

In [15]:
all_features = ['categorical_features','numerical_features_scaled']

final_assembler = VectorAssembler(inputCols=all_features, outputCol="features") 
df = final_assembler.transform(df)

df.select(all_features).show(3)

+--------------------+-------------------------+
|categorical_features|numerical_features_scaled|
+--------------------+-------------------------+
|       (4,[3],[1.0])|     [-61.007269596069...|
|       (4,[3],[1.0])|     [-61.002278409814...|
|       (4,[3],[1.0])|     [-61.012260782324...|
+--------------------+-------------------------+
only showing top 3 rows



## Обучение моделей

In [16]:
train_data, test_data = df.randomSplit([0.8,0.2], seed=RANDOM_SEED)

In [17]:
train_data.count(), test_data.count()

(16437, 4203)

### линейная регрессия все данные

In [18]:
lr = LinearRegression(labelCol=target, featuresCol='features')

In [19]:
model = lr.fit(train_data)

In [20]:
predictions = model.transform(test_data)

predictedLabes = predictions.select(['prediction',"median_house_value"])

In [21]:
results_collect = predictedLabes.collect()
results_list = [(float(i[0]), float(i[1])) for i in results_collect]
scoreAndLabels = spark.sparkContext.parallelize(results_list)

In [22]:
metrics = RegressionMetrics(scoreAndLabels)

In [23]:
print("MAE = %s" % metrics.meanAbsoluteError)
print("RMSE = %s" % metrics.rootMeanSquaredError)
print("R-squared = %s" % metrics.r2)

MAE = 49366.323740013155
RMSE = 68485.34195442485
R-squared = 0.6455254898473684


### линейная регрессия только числовые переменные

In [24]:
lr = LinearRegression(labelCol=target, featuresCol='numerical_features_scaled')

In [25]:
model = lr.fit(train_data)

In [26]:
predictions = model.transform(test_data)

predictedLabes = predictions.select(['prediction',"median_house_value"])

In [27]:
results_collect = predictedLabes.collect()
results_list = [(float(i[0]), float(i[1])) for i in results_collect]
scoreAndLabels = spark.sparkContext.parallelize(results_list)

In [28]:
metrics = RegressionMetrics(scoreAndLabels)

In [29]:
print("MAE = %s" % metrics.meanAbsoluteError)
print("RMSE = %s" % metrics.rootMeanSquaredError)
print("R-squared = %s" % metrics.r2)

MAE = 50530.87162094686
RMSE = 69496.71941905603
R-squared = 0.6349785697665222


# Анализ результатов

Использование категориальной переменной `ocean_proximity` улучшает метрики MAE, RMSE, R-squared.