In [1]:
# input
data_nodes_input_path = '..\\output\\final_data_nodes.GEOJSON'
random_seed = 4

x_cols = ['maxspeed', 'bridge', 'junction', 'building_height',
          'dist_to_train', 'dist_to_recreation', 'landuse_is_residential', 
          'landuse_is_commercial', 'landuse_is_industrial', 'rt_highway', 
          'rt_trunk', 'rt_primary', 'rt_secondary','rt_tertiary', 
          'rt_unclassified', 'rt_residential', 'rt_living_street',
          'rt_busway', 'rt_service',]
c_cols = ['x', 'y']

In [2]:
import os
import sys
import random
import numpy as np
import pandas as pd
import geopandas as gpd
import contextily as cx
pd.options.mode.copy_on_write = True
pd.set_option('display.max_columns', 500)
# libs
from sklearn.ensemble import RandomForestRegressor
from pykrige.rk import RegressionKriging
from sklearn.metrics import r2_score, root_mean_squared_error

import matplotlib as mpl
import matplotlib.pyplot as plt


In [3]:
class HiddenPrints:
    def __enter__(self):
        self._original_stdout = sys.stdout
        sys.stdout = open(os.devnull, 'w')

    def __exit__(self, exc_type, exc_val, exc_tb):
        sys.stdout.close()
        sys.stdout = self._original_stdout

In [5]:
gdf = gpd.read_file(data_nodes_input_path, engine='pyogrio')
gdf_measured = gdf[gdf['dBA_raw'].notna()]
gdf_measured_slt20 = gdf[gdf['dBA_raw_slt20'].notna()]

def create_spatial_folds(df, no_of_folds=16):

    def create_fold_arr(sorted_df, no_folds):
        fold_arr = np.concatenate([np.full((sorted_df.shape[0] // no_folds), i) for i in range(no_folds-1)])
        fold_arr = np.concatenate((fold_arr, np.full(sorted_df.shape[0]-len(fold_arr), no_folds-1)))
        return fold_arr
    # no_of_folds must be the square of some integer (i.e. 1,4,9,16,...)
    df = df.sort_values(by=['x', 'y']).reset_index(drop=True)
    df['xf'] = create_fold_arr(df, int(np.sqrt(no_of_folds)))
    df = df.sort_values(by=['xf', 'y']).reset_index(drop=True)
    df['fold'] = create_fold_arr(df, no_of_folds)
    return df

def spatial_train_test_split(gdf, no_test_folds, random_state=1, fold_col='fold'):
    """
    spatial folds
    """
    random.seed(random_state)
    folds = gdf[fold_col].unique()
    random.shuffle(folds)
    print(folds)
    return gdf[gdf[fold_col].isin(folds[no_test_folds:])], gdf[gdf[fold_col].isin(folds[:no_test_folds])]

gdf_measured = create_spatial_folds(gdf_measured)
gdf_measured_slt20 = create_spatial_folds(gdf_measured_slt20)
train_measured, test_measured = spatial_train_test_split(gdf_measured, 6, random_seed)
train_slt20, test_slt20 = spatial_train_test_split(gdf_measured_slt20, 6, random_seed)


[10  5 12  9 14  3  0  8 13  2 15  6 11  1  4  7]
[10  5 12  9 14  3  0  8 13  2 15  6 11  1  4  7]


In [None]:
def get_new_model():
    model = RandomForestRegressor
    model_parameters = {'n_estimators': 50,
                        'max_depth': 10,
                        'min_samples_split': 10,
                        'min_samples_leaf': 4,
                        'max_features': 2,
                        'random_state': 12}
    kriging_parameters = {'n_closest_points': 20, 
                          'nlags': 6}

    return RegressionKriging(model(**model_parameters), **kriging_parameters)

def do_model(train, test, ycol):
    m = get_new_model()
    with HiddenPrints():
        m.fit(train[x_cols].values, train[c_cols].values, train[ycol].values)
    p = m.predict(test[x_cols].values, test[c_cols].values)
    print(f'r2  : {r2_score(test[ycol].values, p):.3f}')
    print(f'rmse: {root_mean_squared_error(test[ycol].values, p):.3f}')


In [6]:
do_model(train_measured, test_measured, 'dBA_raw')

r2  : 0.310
rmse: 6.670


In [7]:
do_model(train_measured, test_measured, 'dBA_raw')

r2  : 0.310
rmse: 6.670


In [8]:
do_model(train_measured, test_measured, 'dBA_raw')

r2  : 0.310
rmse: 6.670


In [9]:
raise Exception()

Exception: 

In [None]:
do_model(train_measured, test_measured, 'dBA_raw')

r2  : 0.310
rmse: 6.670
