In [None]:
import numpy as np
import os
import pandas as pd
import shap
from sklearn.dummy import DummyClassifier
from sklearn.metrics import ConfusionMatrixDisplay, confusion_matrix
from sklearn.model_selection import train_test_split, StratifiedKFold, cross_val_score
from sklearn.preprocessing import LabelEncoder
from tableone import TableOne
from xgboost import XGBClassifier
from ydata_profiling import ProfileReport

In [None]:
%matplotlib inline

## Initial exploration

In [None]:
# Load data
data = pd.read_csv(os.path.join("../data", "certificates.csv"), low_memory=False)

In [None]:
print("Columns in data: ", len(data.columns))
print("Rows in data: ", len(data))
# Check if the same lodgement appears more than once in data
print("Unique LMK_KEY: ", data.LMK_KEY.nunique())

In [None]:
# Create data profiling report on full dataset - takes a few min to run
profile = ProfileReport(data)
profile.to_file("full_dataset.html")

In [None]:
# Check basic stats on target variable (more info in profiling report)
data["CURRENT_ENERGY_RATING"].value_counts(normalize=True).mul(100)

In [None]:
# I did some initial research and found the assessment procedure for EPC ratings: https://www.gov.uk/guidance/standard-assessment-procedure. I used some of the information presented there and in the links as a guide for narrowing down columns and when considering which information would likely be known if the EPC rating itself is unknown

# Remove columns that I know I won't use based on their context

# Addresses and locations - we want the model to generalise to all locations
# UPRN = Unique Property Reference Number
location_columns = [
    "ADDRESS1",
    "ADDRESS2",
    "ADDRESS3",
    "POSTCODE",
    "LMK_KEY",
    "BUILDING_REFERENCE_NUMBER",
    "LOCAL_AUTHORITY",
    "CONSTITUENCY",
    "COUNTY",
    "ADDRESS",
    "LOCAL_AUTHORITY_LABEL",
    "CONSTITUENCY_LABEL",
    "POSTTOWN",
    "UPRN",
    "UPRN_SOURCE",
]

In [None]:
# Efficiency and consumption columns - possibly uses the current energy rating (the target variable) to estimate these? If so, they wouldn't be known at prediction time as they rely on "future knowledge"
efficiency_columns = [
    "CURRENT_ENERGY_EFFICIENCY",
    "ENVIRONMENT_IMPACT_CURRENT",
    "ENERGY_CONSUMPTION_CURRENT",
    "CO2_EMISSIONS_CURRENT",
    "CO2_EMISS_CURR_PER_FLOOR_AREA",
    "LIGHTING_COST_CURRENT",
    "HEATING_COST_CURRENT",
    "HOT_WATER_COST_CURRENT",
]

In [None]:
# Columns with _POTENTIAL in them - presumably based on current data that won't be known at the time of prediction, e.g. 'POTENTIAL_ENERGY_RATING' and 'ENVIRONMENT_IMPACT_POTENTIAL'
potential_columns = list(data.columns[data.columns.str.contains("POTENTIAL")])

In [None]:
# Date columns (except relating to construction dates)
date_columns = ["INSPECTION_DATE", "LODGEMENT_DATE", "LODGEMENT_DATETIME"]

In [None]:
# Other columns that I don't think will be relevant at this stage
other_columns = ["TRANSACTION_TYPE", "TENURE"]

In [None]:
columns_to_drop = (
    location_columns
    + potential_columns
    + efficiency_columns
    + date_columns
    + other_columns
)

In [None]:
data = data.drop(columns=columns_to_drop)

Train/test splitting

In [None]:
# Split into train and holdout test sets for preprocessing and modelling (perform all feature decisions on train only to avoid data leakage)
train, test = train_test_split(
    data, test_size=0.2, stratify=data["CURRENT_ENERGY_RATING"]
)

In [None]:
print("Train dataset rows: ", len(train))
print("Test dataset rows: ", len(test))

## Feature engineering

### Windows

There are descriptions and efficiency columns for many parts of the buildings, e.g. 'WINDOWS_DESCRIPTION', 'WINDOWS_ENERGY_EFF', 'WINDOWS_ENV_EFF'. I will use the description columns only as I think the others may be based on the energy rating itself

In [None]:
# Windows data
window_cols = [
    "GLAZED_TYPE",
    "GLAZED_AREA",
    "MULTI_GLAZE_PROPORTION",
    "WINDOWS_DESCRIPTION",
    "WINDOWS_ENERGY_EFF",
    "WINDOWS_ENV_EFF",
]

In [None]:
# Create a table grouped by the target variable to see which columns give most info
tb1 = TableOne(
    train,
    columns=window_cols + ["CURRENT_ENERGY_RATING"],
    categorical=[
        "GLAZED_TYPE",
        "GLAZED_AREA",
        "WINDOWS_DESCRIPTION",
        "WINDOWS_ENERGY_EFF",
        "WINDOWS_ENV_EFF",
    ]
    + ["CURRENT_ENERGY_RATING"],
    groupby="CURRENT_ENERGY_RATING",
)
print(tb1)

In [None]:
# Most informative windows column is the MULTI_GLAZE_PROPORTION, which is also numeric so I'll drop the rest
window_cols_to_drop = [
    "GLAZED_TYPE",
    "GLAZED_AREA",
    "WINDOWS_DESCRIPTION",
    "WINDOWS_ENERGY_EFF",
    "WINDOWS_ENV_EFF",
]
train = train.drop(columns=window_cols_to_drop)
test = test.drop(columns=window_cols_to_drop)

In [None]:
def convert_description_column(
    *, df, description_strings, description_col, new_col, missing_val
):
    """Convert a column containing text descriptions to specified categories.

    Args:
        df (pd.DataFrame): dataframe containing description_col.
        description_strings (list): substrings to search for in description_col.
        description_col (str): column to be converted.
        new_col (str): name of new column.
        missing_val (str or float): value to use for infilling missing values.

    Returns:
        pd.DataFrame: input df with new_col column.
    """
    df[description_col] = df[description_col].fillna(missing_val)
    for description in description_strings:
        df.loc[df[description_col].str.contains(description), new_col] = description
    df[new_col] = df[new_col].fillna(missing_val)
    return df

### Walls

In [None]:
wall_columns = list(train.columns[train.columns.str.contains("WALLS")])

# Create a few types of wall types based on looking at those present in the WALLS_DESCRIPTION column using value_counts()
wall_types = [
    "Solid brick",
    "Cavity wall",
    "Stone",
    "Cob",
    "System built",
    "Timber frame",
]


def preprocess_walls_data(*, df, wall_types, wall_columns):
    df = convert_description_column(
        df=df,
        description_col="WALLS_DESCRIPTION",
        new_col="WALL_TYPE",
        description_strings=wall_types,
        missing_val="Other",
    )

    # Also search for wall insulation in the description and pull out the main types
    df = convert_description_column(
        df=df,
        description_col="WALLS_DESCRIPTION",
        new_col="WALL_INSULATION",
        description_strings=[
            "no insulation",
            "with internal insulation",
            "with external insulation",
            "partial insulation",
            "insulated",
        ],
        missing_val="no insulation",
    )

    # Create a binary feature for wall insulation
    df["WALL_INSULATION"] = np.where(df["WALL_INSULATION"] == "no insulation", 0, 1)
    df = df.drop(columns=wall_columns)
    return df

In [None]:
train = preprocess_walls_data(
    df=train, wall_types=wall_types, wall_columns=wall_columns
)
test = preprocess_walls_data(df=test, wall_types=wall_types, wall_columns=wall_columns)

### Floors

In [None]:
# Similar above but for floors
floor_columns = list(train.columns[train.columns.str.contains("FLOOR")])
floor_types = ["Solid", "Suspended", "Other property below"]


def preprocess_floor_data(*, df, floor_types):
    """Preprocess floor data by binning descriptions and creating an insulation feature.

    Args:
        df (pd.DataFrame): dataframe containing FLOOR_DESCRIPTION column.
        floor_types (list): floor types of interest (to be searched for in description).

    Returns:
        pd.DataFrame: input dataframe with new FLOOR_TYPE and FLOOR_INSULATION columns.
    """
    df["FLOOR_DESCRIPTION"] = df["FLOOR_DESCRIPTION"].replace(
        {
            "(another dwelling below)": "Other property below",
            "(other premises below)": "Other property below",
        }
    )

    df = convert_description_column(
        df=df,
        description_col="FLOOR_DESCRIPTION",
        new_col="FLOOR_TYPE",
        description_strings=floor_types,
        missing_val="Other",
    )

    df = convert_description_column(
        df=df,
        description_col="FLOOR_DESCRIPTION",
        new_col="FLOOR_INSULATION",
        description_strings=["no insulation", "insulated", "limited insulation"],
        missing_val="no insulation",
    )

    df["FLOOR_INSULATION"] = np.where(df["FLOOR_INSULATION"] == "no insulation", 0, 1)

    df.loc[df["FLOOR_TYPE"] == "Other property below", "FLOOR_INSULATION"] = 1

    df = df.drop(columns=["FLOOR_DESCRIPTION", "FLOOR_ENERGY_EFF", "FLOOR_ENV_EFF"])
    return df

In [None]:
train = preprocess_floor_data(df=train, floor_types=floor_types)
test = preprocess_floor_data(df=test, floor_types=floor_types)

### Roof

In [None]:
roof_columns = list(train.columns[train.columns.str.contains("ROOF")])
roof_types = ["Pitched", "Flat", "Other property above"]


def preprocess_roof_data(*, df, roof_types, roof_columns):
    """Preprocess roof data by binning descriptions and creating an insulation feature.

    Args:
        df (pd.DataFrame): dataframe containing ROOF_DESCRIPTION column.
        roof_types (list): roof types of interest (to be searched for in description).

    Returns:
        pd.DataFrame: input dataframe with new ROOF_TYPE and FLOOR_INSULATION columns.
    """
    df["ROOF_DESCRIPTION"] = df["ROOF_DESCRIPTION"].replace(
        {
            "(another dwelling above)": "Other property above",
            "(other premises above)": "Other property above",
        }
    )

    df = convert_description_column(
        df=df,
        description_col="ROOF_DESCRIPTION",
        new_col="ROOF_TYPE",
        description_strings=roof_types,
        missing_val="Other",
    )
    df = convert_description_column(
        df=df,
        description_col="ROOF_DESCRIPTION",
        new_col="ROOF_INSULATION",
        description_strings=["no insulation", "loft insulation", "limited insulation"],
        missing_val="no insulation",
    )

    # Create binary column for roof insulation
    df["ROOF_INSULATION"] = np.where(df["ROOF_INSULATION"] == "no insulation", 0, 1)
    # Set properties with other above them as insulated
    df.loc[df["ROOF_TYPE"] == "Other property above", "ROOF_INSULATION"] = 1

    df = df.drop(columns=roof_columns)
    return df

In [None]:
train = preprocess_roof_data(df=train, roof_columns=roof_columns, roof_types=roof_types)
test = preprocess_roof_data(df=test, roof_columns=roof_columns, roof_types=roof_types)

In [None]:
# I will drop some further columns here due to time constraints. The hot water and main heating columns overlap
# with main fuel, and the lighting columns are partly captured by LOW_ENERGY_LIGHTING

columns_to_drop = [
    "HOTWATER_DESCRIPTION",
    "HOT_WATER_ENERGY_EFF",
    "HOT_WATER_ENV_EFF",
    "SECONDHEAT_DESCRIPTION",
    "SHEATING_ENERGY_EFF",
    "SHEATING_ENV_EFF",
    "MAINHEAT_DESCRIPTION",
    "MAINHEAT_ENERGY_EFF",
    "MAINHEAT_ENV_EFF",
    "MAINHEATCONT_DESCRIPTION",
    "MAINHEATC_ENERGY_EFF",
    "MAINHEATC_ENV_EFF",
    "MAIN_HEATING_CONTROLS",
    "LIGHTING_DESCRIPTION",
    "LIGHTING_ENERGY_EFF",
    "LIGHTING_ENV_EFF",
    "HEAT_LOSS_CORRIDOR",
    "UNHEATED_CORRIDOR_LENGTH",
    "MECHANICAL_VENTILATION",
    "ENERGY_TARIFF",
    "FLOOR_LEVEL",
]

train = train.drop(columns=columns_to_drop)
test = test.drop(columns=columns_to_drop)

### Main fuel source

In [None]:
# Create a feature for the main fuel source
train = convert_description_column(
    df=train,
    description_col="MAIN_FUEL",
    new_col="MAIN_FUEL_TYPE",
    description_strings=["mains gas", "electricity"],
    missing_val="Other",
)
train = train.drop(columns=["MAIN_FUEL"])

test = convert_description_column(
    df=test,
    description_col="MAIN_FUEL",
    new_col="MAIN_FUEL_TYPE",
    description_strings=["mains gas", "electricity"],
    missing_val="Other",
)
test = test.drop(columns=["MAIN_FUEL"])

### Construction age

In [None]:
# Preprocess the 'CONSTRUCTION_AGE_BAND' column

def preprocess_construction_age(df):
    """Preprocess construction age column by tidying nans and date ranges/year values.
    
    NB The column contains a mixture of age bands preceded by the string 'England and Wales: ' and some integer year values. Ideally I'd get the start and end dates for each bands, then map the numeric values into the bins. However I'm using a shortcut to save time (not very robust...)

    Args:
        df (pd.DataFrame): dataframe containing CONSTRUCTION_AGE_BAND column.

    Returns:
        pd.DataFrame: input dataframe with tidied up CONSTRUCTION_AGE_BAND column.
    """
    df["CONSTRUCTION_AGE_BAND"] = df["CONSTRUCTION_AGE_BAND"].replace(
        {"NO DATA!": np.nan, "INVALID!": np.nan}
    )

    # Only two of the numeric values are pre-2012, so I'll bucket those manually and then add the rest to the 2012 onwards bin
    df["CONSTRUCTION_AGE_BAND"] = df["CONSTRUCTION_AGE_BAND"].replace(
        {"1920": "England and Wales: 1900-1929", "1960": "England and Wales: 1950-1966"}
    )

    # Create a new column with the existing date bins and all year values added to the latest
    df["CONSTRUCTION_AGE_BAND"] = [
        "England and Wales: 2012 onwards" if type(x) is not float and len(x) == 4 else x
        for x in df["CONSTRUCTION_AGE_BAND"]
    ]
    return df

In [None]:
train = preprocess_construction_age(train)
test = preprocess_construction_age(test)

In [None]:
train["CONSTRUCTION_AGE_BAND"].value_counts(dropna=False)

In [None]:
test["CONSTRUCTION_AGE_BAND"].value_counts(dropna=False)

In [None]:
def convert_string_to_binary(df, column):
    """Convert string Y/N values to binary 1/0 values.

    Args:
        df (pd.DataFrame): dataframe containing column with Y/N values.
        column (str): name of column to process in df.

    Returns:
        pd.DataFrame: input dataframe with converted Y/N values in specified column.
    """
    df[column] = df[column].replace({"Y": 1, "N": 0})
    return df

In [None]:
# Tidy up missing data and convert Y/N strings to binary
def tidy_up(df):
    """Final tidy up of data and calls to convert_string_to_binary function.

    Args:
        df (pd.DataFrame): dataframe containing BUILT_FORM, MAINS_GAS_FLAG, FLAT_TOP_STOREY and SOLAR_WATER_HEATING_FLAG columns.

    Returns:
        pd.DataFrame: input dataframe with preprocessing applied.
    """
    df["BUILT_FORM"] = df["BUILT_FORM"].replace({"NO DATA!": np.nan})

    df = convert_string_to_binary(df=df, column="MAINS_GAS_FLAG")
    df = convert_string_to_binary(df=df, column="FLAT_TOP_STOREY")
    df = convert_string_to_binary(df=df, column="SOLAR_WATER_HEATING_FLAG")
    return df

In [None]:
train = tidy_up(train)
test = tidy_up(test)

In [None]:
train["PROPERTY_TYPE"].value_counts()

In [None]:
# Create a new profiling report on the processed train data
profile = ProfileReport(train)
profile.to_file("train_dataset.html")

In [None]:
# Encode categorical variables using one hot encoding
# This creates a new column (binary) for each option in the column, not good for high cardinality data. Another option is target encoding
categorical_variables = [
    "PROPERTY_TYPE",
    "BUILT_FORM",
    "CONSTRUCTION_AGE_BAND",
    "WALL_TYPE",
    "FLOOR_TYPE",
    "ROOF_TYPE",
    "MAIN_FUEL_TYPE",
]

In [None]:
train = pd.get_dummies(train, columns=categorical_variables, dummy_na=True)
test = pd.get_dummies(test, columns=categorical_variables, dummy_na=True)

## Target variable encoding

In [None]:
# label encode the target variable (going from A, B, C to 0, 1, 2 etc)
le = LabelEncoder()
features_train = train.drop(columns=["CURRENT_ENERGY_RATING"])
target_train = train["CURRENT_ENERGY_RATING"]

target_train = le.fit_transform(target_train)

features_test = test.drop(columns=["CURRENT_ENERGY_RATING"])
target_test = test["CURRENT_ENERGY_RATING"]
# Use the label encoder from train on the holdout test data
target_test = le.transform(target_test)

In [None]:
target_train

## Modelling

I will use sparsity aware algorithms (nan values allowed), otherwise I would need to impute to fill the missing values. Options for that include median/mode over the train set, K nearest neighbours or MICE

In [None]:
def evaluate_model_cv(X, y, model):
    """Perform 10-fold cross validation on the train data and report the accuracy score for all folds.

    Args:
        X (pd.DataFrame): train feature set.
        y (pd.Series): train target variable.
        model (classifier): a classification model to be used for cross validation.
    """
    # define evaluation procedure
    cv = StratifiedKFold(n_splits=10)
    # evaluate model
    scores = cross_val_score(model, X, y, scoring="accuracy", cv=cv, n_jobs=-1)
    print("Accuracy scores on CV folds: ", scores)

In [None]:
# Create a baseline model using a dummy classifier (uses no training data)
evaluate_model_cv(
    X=features_train, y=target_train, model=DummyClassifier(strategy="stratified")
)

In [None]:
# Train an xgboost model to compare with the baseline model
evaluate_model_cv(X=features_train, y=target_train, model=XGBClassifier(random_state=1))

The xgboost model significantly outperforms the baseline model

In [None]:
# Fit on all train data, then predict on the holdout test data using xgboost
clf = XGBClassifier(random_state=1)
clf.fit(features_train, target_train)
predictions = clf.predict(features_test)

In [None]:
# Create and plot a confusion matrix for the model to visualise the predictions and see where the model is correct and incorrect
class_names = ["A", "B", "C", "D", "E", "F", "G"]
cm = confusion_matrix(y_pred=predictions, y_true=target_test)

disp = ConfusionMatrixDisplay(cm, display_labels=class_names)
disp.plot(cmap="Blues")

## Explainability

Use SHAP to explain which features are important to the model's predictions across the training data

In [None]:
# Create a SHAP explainer for tree based models - takes a few min to run
explainer = shap.TreeExplainer(clf)
shap_values = explainer.shap_values(features_train)

In [None]:
# Plot the most important model features broken down by class (0=A, 1=B etc)
shap.summary_plot(shap_values, features=features_train, class_inds='original', class_names=clf.classes_)

In [None]:
#  SHAP summary plot showing feature importances for a single class - C rating
shap.summary_plot(shap_values[2], features=features_train)

In [None]:
#  SHAP summary plot showing feature importances for a single class - G rating
shap.summary_plot(shap_values[6], features=features_train)

Next steps with more time:
- feature dropout to remove unimportant columns
- try more algorithms in cross validation
- include some of the dropped columns
- generalise some functions to avoid hard coded column names
- add unit tests