## Предсказание стоимости жилья

Нужно обучить модель линейной регрессии на данных о жилье в Калифорнии в 1990 году. На основе данных нужно предсказать медианную стоимость дома в жилом массиве. Сделать предсказания на тестовой выборке. Для оценки качества модели использовать метрики RMSE, MAE и R2.

In [1]:
import pyspark
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler, OneHotEncoder
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

In [2]:
spark = SparkSession.builder \
                    .master("local") \
                    .appName("Learning DataFrame Window Functions") \
                    .getOrCreate()

In [3]:
main_df = spark.read.load('/datasets/housing.csv', format="csv", sep=",", inferSchema=True, header="true")
rename_cols = False

                                                                                

## Data preprocessing

In [4]:
main_df.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [5]:
numeric_columns = main_df.columns
numeric_columns.remove('ocean_proximity')
numeric_columns.remove('median_house_value')

In [6]:
main_df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



In [7]:
for col in numeric_columns:
        main_df = main_df.withColumn(col, main_df[col].cast(FloatType()))

In [8]:
main_df = main_df.withColumn('median_house_value', main_df['median_house_value'].cast(FloatType()))

### Check for dublicates

In [9]:
if main_df.count() > main_df.dropDuplicates().count():
    main_df = main_df.dropDuplicates()
    print('dublicates was delate')
else:
    print('did not found dublicates')



did not found dublicates


                                                                                

### Check for null values

In [10]:
for col in main_df.columns:
    null_rows = main_df.filter(F.isnull(col))
    count_null = null_rows.count()
    if count_null > 0:
        print(col)
        print('count null values: ', count_null)
        print('==================')

total_bedrooms
count null values:  207


In [11]:
mean_total_bedrooms = main_df.select(F.mean(main_df['total_bedrooms'])).collect()[0][0]

In [12]:
main_df = main_df.fillna({'total_bedrooms': mean_total_bedrooms})

In [13]:
null_rows = main_df.filter(F.isnull('total_bedrooms'))
null_rows.count()

0

## Data preparation

In [14]:
train_data, test_data = main_df.randomSplit([0.8,0.2], seed=123456)

<b> assembler function<b>

In [15]:
def assembler(data, columns, values_type):
    
    if values_type == 'categorical':
        asm = VectorAssembler(inputCols=[c+'_ohe' for c in columns], outputCol="categorical_features")
        return asm.transform(data)
    
    elif values_type == 'numeric':
        asm = VectorAssembler(inputCols=columns, outputCol="numerical_features")
        return asm.transform(data) 

### creating DF without categorical values

### string Index

In [16]:
indexer = StringIndexer(inputCols=['ocean_proximity'], 
                        outputCols=['ocean_proximity_idx'],
                        handleInvalid = 'error')
indexer_fited = indexer.fit(train_data)

train_data_idx = indexer_fited.transform(train_data)
test_data_idx = indexer_fited.transform(test_data)

                                                                                

### OHE

In [17]:
def ohe_transformation(data, categorical_cols):
    encoder = OneHotEncoder(inputCols=[c+'_idx' for c in categorical_cols], 
                            outputCols=[c+'_ohe' for c in categorical_cols])
    
    return encoder.fit(data).transform(data)

In [18]:
train_data_ohe = ohe_transformation(train_data_idx, ['ocean_proximity'])
test_data_ohe = ohe_transformation(test_data_idx, ['ocean_proximity'])

In [19]:
asm_train_data = assembler(train_data_ohe, ['ocean_proximity'], 'categorical')
asm_test_data = assembler(test_data_ohe, ['ocean_proximity'], 'categorical')

### standardization

In [20]:
def standardization(data, input_col, output_col):
    standard_scaler = StandardScaler(inputCol=input_col, 
                                     outputCol=output_col)
    
    return standard_scaler.fit(data).transform(data) 

In [21]:
asm_train_data = assembler(asm_train_data, numeric_columns, 'numeric')
asm_test_data = assembler(asm_test_data, numeric_columns, 'numeric')

In [22]:
train_data_std = standardization(asm_train_data, 'numerical_features', 'numerical_features_scaled')
test_data_std = standardization(asm_test_data, 'numerical_features', 'numerical_features_scaled')

### creating common assembler and vectors

In [23]:
all_features_col = ['categorical_features', 'numerical_features_scaled']
final_assembler = VectorAssembler(inputCols=all_features_col, outputCol='all_features')

train_final_assembler = final_assembler.transform(train_data_std)
test_final_assembler = final_assembler.transform(test_data_std)

In [24]:
all_train_data = train_final_assembler.select(['all_features', 'median_house_value'])
all_test_data = test_final_assembler.select(['all_features', 'median_house_value'])

In [25]:
num_train_data = train_final_assembler.select(['numerical_features_scaled', 'median_house_value'])
num_test_data = test_final_assembler.select(['numerical_features_scaled', 'median_house_value'])

## Model learning

In [26]:
TARGET = 'median_house_value'

In [27]:
def show_scores(prediction):   
    scores = ['rmse', 'mae', 'r2']
    for s in scores:
        score = RegressionEvaluator().setLabelCol('median_house_value')\
            .setPredictionCol('prediction')\
            .setMetricName(s)\
            .evaluate(prediction)

        print(s,': ', score)

### Learning without catecorical values

In [28]:
linear_model = LinearRegression(labelCol=TARGET, featuresCol='numerical_features_scaled')
linear_model = linear_model.fit(num_train_data)

23/06/18 15:15:03 WARN Instrumentation: [05d82187] regParam is zero, which might cause numerical instability and overfitting.
23/06/18 15:15:04 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/06/18 15:15:04 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
23/06/18 15:15:04 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
23/06/18 15:15:04 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK


In [29]:
predictions = linear_model.transform(num_test_data)
predictions.select(['median_house_value', 'prediction']).show(5)

+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|           94600.0| 168166.5872017471|
|           70000.0| 97728.27777123125|
|          107000.0|141662.56518132845|
|           67000.0|101120.06148363417|
|           70200.0|117876.39872245723|
+------------------+------------------+
only showing top 5 rows



In [30]:
show_scores(predictions)

rmse :  70194.92668872164
mae :  49750.06338169492
r2 :  0.6235062921708119


### Learning with catecorical values

In [31]:
linear_model_2 = LinearRegression(labelCol=TARGET, featuresCol='all_features')
linear_model_2 = linear_model_2.fit(all_train_data)

23/06/18 15:15:07 WARN Instrumentation: [bea56b75] regParam is zero, which might cause numerical instability and overfitting.
                                                                                

In [32]:
predictions_2 = linear_model_2.transform(all_test_data)
predictions_2.select(['median_house_value', 'prediction']).show(5)

+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|           94600.0|199230.31614177395|
|           70000.0| 136921.5226259539|
|          107000.0|177798.78258546768|
|           67000.0|140461.95616019797|
|           70200.0|156991.46882058075|
+------------------+------------------+
only showing top 5 rows



In [33]:
show_scores(predictions_2)

rmse :  67919.87353278972
mae :  48391.5168087972
r2 :  0.6475155115921454


In [34]:
spark.stop()

# Анализ результатов

Модель c категориальными признаками ожидаемо показала значениие метрик лучше чем модель без категориальный признаков.<br>
<b> значение метрик модели с категорильными признаками: </b><br>
rmse = 67919.87 <br>
mae = 48391.51 <br>
r2 = 0.64 <br>
<b>значение метрик модели без категорильных признаков: <br></b>
rmse =  70194.92<br>
mae =  49750.06<br>
r2 = 0.62<br>
Это может говорить о том, что признак ocean_proximity - удаленность от океана влияет на медианную стоимсоть дома - median_house_value