**Цель проекта:**

Разработка модели для прогнозирования стоимости недвижимости при помощи spark.

**Описание данных:**

Для прогнозирования используются данные о недвижимости в Калифорнии.

⭕ Более подробная информация находится в README файле.

# **Importing libraries**

In [1]:
import pyspark
import pandas as pd 
import numpy as np
import pyspark.sql.functions as F

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, StandardScaler

spark = SparkSession.builder \
                    .master("local") \
                    .appName("California house median price prediction") \
                    .getOrCreate()

In [2]:
RANDOM_SEED = 67

# **Data understanding**

In [3]:
df_housing = spark.read.load('/ds/housing.csv', format="csv", sep=",", inferSchema=True, header="true")
df_housing.printSchema()

                                                                                

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



                                                                                

In [4]:
print(pd.DataFrame(df_housing.dtypes, columns=['column', 'type']))

               column    type
0           longitude  double
1            latitude  double
2  housing_median_age  double
3         total_rooms  double
4      total_bedrooms  double
5          population  double
6          households  double
7       median_income  double
8  median_house_value  double
9     ocean_proximity  string


In [5]:
df_housing.limit(5).toPandas()

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,-122.23,37.88,41.0,880.0,129.0,322.0,126.0,8.3252,452600.0,NEAR BAY
1,-122.22,37.86,21.0,7099.0,1106.0,2401.0,1138.0,8.3014,358500.0,NEAR BAY
2,-122.24,37.85,52.0,1467.0,190.0,496.0,177.0,7.2574,352100.0,NEAR BAY
3,-122.25,37.85,52.0,1274.0,235.0,558.0,219.0,5.6431,341300.0,NEAR BAY
4,-122.25,37.85,52.0,1627.0,280.0,565.0,259.0,3.8462,342200.0,NEAR BAY


In [6]:
print('shape: ', (df_housing.count(), len(df_housing.columns)))

shape:  (20640, 10)


**В датасете 20640 строк и 10 колонок. 9 колонок имеют тип double, а последняя колонка имеет тип string.**

# **Data preparation**

Проверим данные на наличие пропусков.

In [7]:
df = df_housing.select([F.count(F.when(F.col(c).contains('None') | \
                            F.col(c).contains('NULL') | \
                            (F.col(c) == '' ) | \
                           F.col(c).isNull() | \
                            F.isnan(c), c 
                           )).alias(c)
                    for c in df_housing.columns])
df.toPandas().head()

                                                                                

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,0,0,0,0,207,0,0,0,0,0


Фитча total_bedrooms содержит 207 пропусков. Удалим их.

In [8]:
df_housing = df_housing.dropna()

In [9]:
cat_cols = ['ocean_proximity']
num_cols = ['longitude', 'latitude', 'housing_median_age', 'total_rooms', 
            'total_bedrooms', 'total_bedrooms', 'population', 'households', 'median_income']
target = 'median_house_value'

Cначала трансформируем категориальный признак, а потом применим технику OHE.

In [10]:
indexer = StringIndexer(inputCols=cat_cols, 
                        outputCols=[cat_cols[0] + '_idx']) 
df_housing = indexer.fit(df_housing).transform(df_housing)

                                                                                

In [11]:
encoder = OneHotEncoder(inputCols=[c + '_idx' for c in cat_cols], outputCols=[c + '_ohe' for c in cat_cols])
df_housing = encoder.fit(df_housing).transform(df_housing)

In [12]:
cat_assembler = VectorAssembler(inputCols=[c + '_ohe' for c in cat_cols], outputCol="cat_features")
df_housing = cat_assembler.transform(df_housing) 

In [13]:
num_assembler = VectorAssembler(inputCols=num_cols, outputCol="num_features")
df_housing = num_assembler.transform(df_housing) 

Масштабируем численные колонки.

In [14]:
standardScaler = StandardScaler(inputCol='num_features', outputCol="num_features_scaled")
df_housing = standardScaler.fit(df_housing).transform(df_housing)

                                                                                

In [15]:
df_housing.columns

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income',
 'median_house_value',
 'ocean_proximity',
 'ocean_proximity_idx',
 'ocean_proximity_ohe',
 'cat_features',
 'num_features',
 'num_features_scaled']

In [16]:
num_and_cut_assembler = VectorAssembler(inputCols= ['cat_features','num_features_scaled'], 
                                  outputCol="features") 
num_assembler = VectorAssembler(inputCols= ['num_features_scaled'], 
                                  outputCol="features") 
df_housing_nc = num_and_cut_assembler.transform(df_housing)
df_housing_n = num_assembler.transform(df_housing)

In [17]:
df_housing_nc.select(['cat_features','num_features_scaled']).show(3) 

+-------------+--------------------+
| cat_features| num_features_scaled|
+-------------+--------------------+
|(4,[3],[1.0])|[-61.005863841998...|
|(4,[3],[1.0])|[-61.000872770752...|
|(4,[3],[1.0])|[-61.010854913244...|
+-------------+--------------------+
only showing top 3 rows



у нас два фрейма данных: df_housing_nc - в котором и категориальные и числовые колонки; df_housing_n - в котором только числовые колонки.

# **Modeling**

Для оценки качества моделей будем использовать три метрики: RMSE, MAE, r2.

Первая модель будет обучена и протестирована на df_housing_n, а вторая - на df_housing_nc.

In [18]:
train, test = df_housing_n.randomSplit([.8, .2], seed=RANDOM_SEED)
lr = LinearRegression(labelCol=target, featuresCol='features', regParam=0.3)
model = lr.fit(train) 
predictions = model.transform(test)
trainingSummary = model.summary
print('Model with only num features:')
print("RMSE: %0.2f" % trainingSummary.rootMeanSquaredError)
print('MAE: %0.2f' % trainingSummary.meanAbsoluteError)
print("r2: %0.2f" % trainingSummary.r2)

22/10/11 13:37:06 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/10/11 13:37:06 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
22/10/11 13:37:06 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
22/10/11 13:37:06 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
                                                                                

Model with only num features:
RMSE: 69386.94
MAE: 50593.09
r2: 0.64


In [19]:
train, test = df_housing_nc.randomSplit([.8, .2], seed=RANDOM_SEED)
lr = LinearRegression(labelCol=target, featuresCol='features', regParam=0.3)
model = lr.fit(train) 
predictions = model.transform(test)
trainingSummary = model.summary
print('Model with num and cat features:')
print("RMSE: %0.2f" % trainingSummary.rootMeanSquaredError)
print('MAE: %0.2f' % trainingSummary.meanAbsoluteError)
print("r2: %0.2f" % trainingSummary.r2)

                                                                                

Model with num and cat features:
RMSE: 68475.97
MAE: 49539.31
r2: 0.65


In [20]:
spark.stop()

# **Results**

**model_1** - модель, которая обучалась только на численных признаках.

**model_2** - модель, которая обучалась на численных и категориальных признаках.

||model_1|model_2|
|---|---|---|
|**RMSE**|69387|68476|  
|**MAE**|50593|49539|   
|**r2**|0.64|0.65|

<b> Как мы видим model_1 показала себя хуже model_2 по всем метрикам. Из этого можно сделать вывод, что в рамках данной задачи, информация, которая хранится в категориальных признаках важна и на прямую влияет на качество предсказаний модели.</b>