In [2]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.pipeline import make_pipeline, Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import RobustScaler
from sklearn.impute import SimpleImputer
from sklearn.tree import DecisionTreeRegressor
from sklearn.base import TransformerMixin, BaseEstimator
from imblearn.over_sampling import SMOTENC
import sys
sys.path.append('..')
import flood_tool as ft
import os

df = pd.read_csv(os.path.join(ft._data_dir,'postcodes_labelled.csv'))

In [3]:
# define features and target
X = df.drop(columns=['riskLabel', 'localAuthority', 'medianPrice', 'historicallyFlooded', 'postcode'])
y = df.riskLabel

In [4]:
X.columns

Index(['easting', 'northing', 'soilType', 'elevation'], dtype='object')

In [5]:
# Split the Data in training and testing data
X_train, X_test, y_train, y_test = train_test_split(X, y, train_size=.8, random_state=42)

Regression:

In [6]:
# Define which columns contain numerical/categorical data
num_data_train = X_train.select_dtypes(include=['int64']).columns.tolist()
cat_data_train = X_train.select_dtypes(include=['object']).columns.tolist()

In [7]:
category_mapping = {'soilType': [
'Luvisols',
'Cambisols',
'Arenosols',
'Leptosols',
'Podsols',
'Planosols',
'Stagnosols',
'Gleysols',
'Histosols',
'Unsurveyed/Urban']
}
cat_features_to_ordinal = ['soilType']

In [8]:
class OrdinalTransformer(TransformerMixin, BaseEstimator):
    
    def __init__(self, category_mapping, unknown='ignore'):
        self.category_mapping = category_mapping
        self.unknown = unknown
        self.category_dicts = {col: {cat: idx for idx, cat in enumerate(categories)} for col, categories in category_mapping.items()}
 
    def fit(self, X=None, y=None):
        return self
 
    def transform(self, X, y=None):
        X = pd.DataFrame(X, columns=['soilType'])
        print(type(X))
        X_transformed = X.copy()
        for col, categories in self.category_mapping.items():
            X_transformed[col] = X[col].apply(lambda x: self.category_dicts[col].get(x, self.handle_unknown(col, x)))
        return X_transformed
 
    def handle_unknown(self, column, value):
        if self.unknown == 'ignore':
            return value
        elif self.unknown == 'use_max':
            return max(self.category_dicts[column].values()) + 1
        else:
            raise ValueError(f"Unknown handling mode '{self.unknown}' not supported.")

In [9]:
# Create pipeline for numerical data 

num_pipe = make_pipeline(SimpleImputer(), RobustScaler())


# Create pipeline for categorical data
               
cat_pipeline = ColumnTransformer([
        ('ordinal', make_pipeline(SimpleImputer(strategy = 'most_frequent'), OrdinalTransformer(category_mapping)), cat_features_to_ordinal)
        ])
# Combine both Pipelines

complete_pipe = ColumnTransformer([('num',num_pipe,num_data_train),
                                  ('cat', cat_pipeline,cat_data_train)], remainder='passthrough')

In [10]:
# Define regressor 

regressor = DecisionTreeRegressor(
    random_state=42,
    ccp_alpha=0.0,
    criterion='squared_error',
    max_depth=None,
    max_features=None,
    max_leaf_nodes=None,
    min_impurity_decrease=0.0,
    min_samples_leaf=1,
    min_samples_split=2,
    min_weight_fraction_leaf=0.0,
    splitter='best'
)

In [11]:
# Combine Regressor and Preprocessor
regression_model = Pipeline([
    ('preprocessing', complete_pipe),
    ('regressor', regressor)])

In [12]:
regression_model.fit(X_train, y_train)

<class 'pandas.core.frame.DataFrame'>


In [13]:
y_pred = regression_model.predict(X_test)
y_pred

<class 'pandas.core.frame.DataFrame'>


array([1., 1., 1., ..., 1., 1., 1.])