# Coursework Assignment: Building a Regression Model

```
University of London
BSc in Computer Science
CM3005, Data Science
Hudson Leonardo MENDES
hlm12@student.london.ac.uk
```


# I. Introduction


## Domain-specific area


## Dataset


## Objectives


# II. Implementation


## Preprocessing


In [None]:
import pathlib

data_folderpath = pathlib.Path("./data")

ppd_folderpath = data_folderpath / "uk-ppd"
inflation_filepath = data_folderpath / "uk-ons/ons-inflation-1989-2022.csv"
interest_filepath = data_folderpath / "uk-boe/boe-interest-1975-2022.csv"


In [None]:
import pandas as pd

pd.set_option("display.float_format", lambda x: "{:,.3f}".format(x))


In [None]:
# https://www.gov.uk/guidance/about-the-price-paid-data
ppd_property_type = {
    "D": "detached",
    "S": "semi-detached",
    "T": "terraced",
    "F": "flat/maisonettes",
    # "O": "other" # => intentionally ommitted
}

ppd_duration = {"F": "freehold", "L": "leasehold"}

ppd_old_or_new = {"Y": "new", "N": "old"}

ppd_df = pd.concat(
    [
        pd.read_csv(
            ppd_filepath,
            compression="zip",
            names=[
                "id",
                "price",
                "date",
                "postcode",
                "property_type",
                "old_or_new",
                "duration",
                "paon",
                "saon",
                "street",
                "locality",
                "town_city",
                "district",
                "county",
                "ppd_category_type",
                "record_status",
            ],
        )
        for ppd_filepath in ppd_folderpath.glob("*.zip")
    ]
)
ppd_df["postgroup"] = ppd_df["postcode"].map(lambda x: str(x).split(" ")[0])
ppd_df["date"] = pd.to_datetime(ppd_df["date"])
ppd_df["property_type"] = ppd_df["property_type"].map(ppd_property_type.get)
ppd_df["duration"] = ppd_df["duration"].map(ppd_duration.get)
ppd_df["old_or_new"] = ppd_df["old_or_new"].map(ppd_old_or_new.get)
ppd_df = ppd_df[
    [
        "date",
        "postgroup",
        "property_type",
        "old_or_new",
        "duration",
        "price",
    ]
]
ppd_df = ppd_df.astype(
    {
        "postgroup": "category",
        "property_type": "category",
        "old_or_new": "category",
        "duration": "category",
        "price": "double",
    }
)
ppd_df = ppd_df.dropna()
ppd_df.sample(n=5)


In [None]:
import re
import string
from datetime import date

inflation_date_pattern = re.compile(r"([\d]{4})(?:\s+([\w]{3}))?")
inflation_month_names = [
    "JAN",
    "FEB",
    "MAR",
    "APR",
    "MAY",
    "JUN",
    "JUL",
    "AUG",
    "SEP",
    "OCT",
    "NOV",
    "DEC",
]
inflation_month_index = {mn: ix + 1 for (ix, mn) in enumerate(inflation_month_names)}
inflation_month_index["Q1"] = 1
inflation_month_index["Q2"] = 4
inflation_month_index["Q3"] = 7
inflation_month_index["Q3"] = 10

inflation_acceptable_numeric_chars = string.digits + ".,"


def extract_inflation_date(x: str) -> date:
    match = next(inflation_date_pattern.finditer(x), None)
    if match:
        group_count = len(match.groups())
        if group_count >= 1:
            year = int(match.group(1))
            month = 1
            month_name = match.group(2)
            if group_count > 1 and month_name:
                month_name = month_name.strip().upper()
                month = inflation_month_index.get(month_name)
            return date(year, month, 1)


def extract_inflation_rate(x: str) -> float:
    x = str(x)
    if all([c in inflation_acceptable_numeric_chars for c in x]):
        return float(x)
    return None


inflation_df = pd.read_csv(inflation_filepath)
inflation_df["date"] = inflation_df["Title"].map(extract_inflation_date)
inflation_df["date"] = pd.to_datetime(inflation_df["date"])
inflation_df["rate"] = inflation_df["CPIH ANNUAL RATE 00: ALL ITEMS 2015=100"].map(
    extract_inflation_rate
)
inflation_df["rate"] = inflation_df["rate"].astype("float", errors="ignore")
inflation_df = inflation_df[["date", "rate"]]
inflation_df = inflation_df.dropna()
inflation_df = inflation_df.set_index("date").sort_index()
inflation_df.sample(n=5)


In [None]:
interest_df = pd.read_csv(interest_filepath)
interest_df["date"] = pd.to_datetime(interest_df["Date Changed"])
interest_df["rate"] = interest_df["Rate"].astype("float")
interest_df = interest_df[["date", "rate"]]
interest_df = interest_df.set_index("date").sort_index()
interest_df.sample(n=5)


In [None]:
from tqdm import tqdm, trange
from typing import Callable
from datetime import date, timedelta

tqdm.pandas()


def build_rate_extractor(df: pd.DataFrame) -> Callable[[date], float]:
    min_date = df.index.min()
    max_date = df.index.max()
    cur_date = min_date
    rate_index = {}
    first_rate = df.rate[0]
    prev_rate = first_rate
    last_rate = df.rate[-1]
    with trange((max_date - min_date).days, desc="rate_index") as pbar:
        while cur_date <= max_date:
            rates = df[df.index == cur_date].rate
            if rates.any():
                new_rate = rates[0] / 100.0
                rate_index[cur_date] = new_rate
                prev_rate = new_rate
            else:
                rate_index[cur_date] = prev_rate
            cur_date += timedelta(days=1)
            pbar.update()

    def get_rate_for_date(d: date) -> float:
        if d < min_date:
            return first_rate
        elif d > max_date:
            return last_rate
        else:
            return rate_index[d]

    return get_rate_for_date


df = ppd_df.copy()
df["inflation_rate"] = df.date.progress_map(build_rate_extractor(df=inflation_df))
df["interest_rate"] = df.date.progress_map(build_rate_extractor(df=interest_df))
df["date_year"] = df.date.progress_map(lambda d: d.year)
df["date_month"] = df.date.progress_map(lambda d: d.month)
df["date_day"] = df.date.progress_map(lambda d: d.day)
df["date_day_of_week"] = df.date.progress_map(lambda d: d.weekday())
df = df.sort_values(by="date").reset_index()
df = df[
    ["date_year", "date_month", "date_day", "date_day_of_week"]
    + list(ppd_df.columns[1:-1])
    + ["inflation_rate", "interest_rate", "price"]
]
df.sample(n=5)


In [None]:
df.info()


In [None]:
df.to_csv(data_folderpath / "snapshot-Xy-1NF.zip", index=False)


## Statistical Summary


In [1]:
try:
    assert df is not None
except NameError:
    import pathlib
    import pandas as pd
    import numpy as np

    print("[SNAPSHOT] Reloading...")
    pd.set_option("display.float_format", lambda x: "{:,.3f}".format(x))
    data_folderpath = pathlib.Path("./data")
    df = pd.read_csv(data_folderpath / "snapshot-Xy-1NF.zip").astype(
        {
            "postgroup": "category",
            "property_type": "category",
            "old_or_new": "category",
            "duration": "category",
            "price": "double",
        }
    )
    print(f" - reloaded from snapshot, {df.shape[0]}")
df.sample(n=5)


[SNAPSHOT] Reloading...
 - reloaded from snapshot, 4336841


Unnamed: 0,date_year,date_month,date_day,date_day_of_week,postgroup,property_type,old_or_new,duration,inflation_rate,interest_rate,price
2209667,2020,6,24,2,PE32,semi-detached,old,freehold,0.008,0.001,290000.0
685069,2018,9,27,3,BN21,flat/maisonettes,old,leasehold,0.022,0.007,130000.0
247454,2018,4,20,4,TR4,detached,old,freehold,0.022,0.005,515000.0
3536421,2021,9,14,1,BN10,detached,old,freehold,0.029,0.001,399950.0
744281,2018,10,17,2,PE11,detached,old,freehold,0.022,0.007,210000.0


In [2]:
df_float = df.select_dtypes(include=["float"])
df_float.sample(n=5)

Unnamed: 0,inflation_rate,interest_rate,price
2107241,0.015,0.001,440000.0
1731239,0.015,0.007,550000.0
3589448,0.029,0.001,139950.0
1661466,0.015,0.007,199950.0
1901223,0.014,0.007,135995.0


### Central Tendency

In [3]:
df_central_tendency = pd.concat([
    pd.DataFrame(df_float.mean(), columns=["mean"]).transpose(),
    pd.DataFrame(df_float.median(), columns=["mean"]).transpose(),
    pd.DataFrame(
        [df_float[c].mode()[0] for c in df_float.columns],
        columns=["mode"]).set_index(df_float.columns).transpose(),
])

df_central_tendency

Unnamed: 0,inflation_rate,interest_rate,price
mean,0.025,0.005,318106.045
mean,0.02,0.005,247000.0
mode,0.023,0.001,250000.0


### Measures of Spread

In [4]:
df_float_quantiles = df_float.quantile([0.25, 0.5, 0.75]).transpose()
df_float_variances = df_float.var()
df_float_interquartile_range = df_float_quantiles[0.75] - df_float_quantiles[0.25]
df_measures_of_spread = pd.concat([
    df_float.describe(),
    pd.DataFrame(df_float_variances, columns=["var"]).transpose(),
    pd.DataFrame(df_float_interquartile_range, columns=["IQR"]).transpose()
])
df_measures_of_spread = df_measures_of_spread.transpose()[["std", "var", "min", "25%", "50%", "75%", "max", "IQR"]].transpose()
df_measures_of_spread

Unnamed: 0,inflation_rate,interest_rate,price
std,0.02,0.004,399667.085
var,0.0,0.0,159733778805.72
min,0.005,0.001,1.0
25%,0.015,0.001,160000.0
50%,0.02,0.005,247000.0
75%,0.024,0.007,376999.0
max,0.096,0.022,165000000.0
IQR,0.009,0.006,216999.0


### Types of Distribution

In [None]:
pd.plotting.scatter_matrix(df_float, diagonal='kde')

## Data visualisation


In [None]:
try:
    assert df is not None
except NameError:
    import pathlib
    import pandas as pd

    print("[SNAPSHOT] Reloading...")
    pd.set_option("display.float_format", lambda x: "{:,.3f}".format(x))
    data_folderpath = pathlib.Path("./data")
    df = pd.read_csv(data_folderpath / "snapshot-Xy-1NF.zip").astype(
        {
            "postgroup": "category",
            "property_type": "category",
            "old_or_new": "category",
            "duration": "category",
            "price": "double",
        }
    )
    print(f" - reloaded from snapshot, {df.shape[0]}")
df.sample(n=5)


In [None]:
%matplotlib inline

In [None]:
import scipy
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.ticker import FormatStrFormatter, StrMethodFormatter


In [None]:
from datetime import date

max_price = float(df.price.max())
max_rate = max(df.interest_rate.max(), df.inflation_rate.max())
min_intersecting_date = date(df.date_year.min(), 1, 1)
max_intersecting_date = date(df.date_year.max(), 12, 30)


In [None]:
_, axes = plt.subplots(ncols=2, nrows=1, figsize=(15, 5))


def plot_rate_distributions(ax, df: pd.DataFrame, label: str, color: str):
    df = df.copy()
    x = np.linspace(0.0, df["rate"].max(), 100)
    df["bin"] = pd.cut(df["rate"], bins=x)
    y = list(df.groupby("bin").count()["rate"])
    ax.fill_between(x[:-1], 0.0, y, color=color, alpha=0.5)
    ax.xaxis.set_major_formatter(FormatStrFormatter("%2.2f%%"))
    intervals = [0.05, 0.95]
    for interval, quantile in zip(intervals, df.rate.quantile(intervals)):
        percentile = f"P{int(interval*100.)}={round(quantile, 2)}"
        bbox = dict(boxstyle="round, pad=0.3", fc="lightgray", lw=2)
        ax.axvline(x=quantile, color="blue")
        ax.annotate(
            percentile,
            xy=(quantile, max(y)),
            bbox=bbox,
            ha="center",
            va="center",
        )
    ax.axvline(x=quantile, color="blue")
    ax.legend([label], loc="lower center", bbox_to_anchor=(0.5, -0.2))


plot_rate_distributions(
    ax=axes[0],
    df=interest_df,
    label="interest",
    color="green",
)

plot_rate_distributions(
    ax=axes[1],
    df=inflation_df,
    label="inflation",
    color="red",
)


In [None]:
from datetime import date
from tqdm import tqdm

_, axes = plt.subplots(nrows=2, figsize=(15, 10), sharex=True)

series = df.copy()
series["date"] = df.apply(lambda r: date(r.date_year, r.date_month, r.date_day), axis=1)
series = series.groupby("date").mean(numeric_only=True).dropna()

x = series.index

axes[0].grid(visible=True)
axes[0].plot(x, series.interest_rate * 100.0, "g.-", alpha=0.7)
axes[0].plot(x, series.inflation_rate * 100.0, "r.-", alpha=0.7)
axes[0].set_xlim(left=min_intersecting_date, right=max_intersecting_date)
axes[0].set_ylabel("rates (%)")
axes[0].yaxis.set_major_formatter(FormatStrFormatter("%2.2f%%"))
axes[0].legend(["interest", "inflation"])

axes[1].grid(visible=True)
axes[1].yaxis.set_major_formatter(StrMethodFormatter("{x:,}"))
axes[1].set_ylim(0.0, df.price.quantile(0.95) * 1.2)
axes[1].set_ylabel("property price (£)")
for ix, property_type in tqdm(list(enumerate(ppd_property_type.values()))):
    sub_series = df[df.property_type == property_type].copy()
    sub_series["date_ym"] = sub_series.apply(
        lambda r: date(r.date_year, r.date_month, 1), axis=1
    )
    sub_series = sub_series[["date_ym", "price"]]
    sub_series = sub_series.groupby("date_ym").mean(numeric_only=True)
    sub_series = sub_series.fillna(method="ffill")
    axes[1].plot(sub_series.index, sub_series.price, "s", alpha=0.7)
    axes[1].legend(ppd_property_type.values())


## Machine learning model


In [None]:
try:
    assert df is not None
except NameError:
    import pathlib
    import pandas as pd
    import numpy as np

    print("[SNAPSHOT] Reloading...")
    pd.set_option("display.float_format", lambda x: "{:,.3f}".format(x))
    data_folderpath = pathlib.Path("./data")
    df = pd.read_csv(data_folderpath / "snapshot-Xy-1NF.zip").astype(
        {
            "postgroup": "category",
            "property_type": "category",
            "old_or_new": "category",
            "duration": "category",
            "price": "double",
        }
    )
    print(f" - reloaded from snapshot, {df.shape[0]}")
df.sample(n=5)


In [None]:
X, y = df[df.columns[:-1]], df[df.columns[-1]]


In [None]:
from sklearn.preprocessing import OneHotEncoder, StandardScaler, FunctionTransformer
from sklearn.compose import make_column_transformer, make_column_selector


def make_sine_cycle_encoder(period: int = 1) -> float:
    assert period != 0
    return FunctionTransformer(lambda x: np.sin(x / period * 2 * np.pi))


# https://scikit-learn.org/stable/modules/sgd.html#tips-on-practical-use
def make_df_column_transformer():
    categorical_selector = make_column_selector(dtype_include="category")
    float_seletor = make_column_selector(dtype_include="float64")
    one_hot = OneHotEncoder(sparse_output=True, handle_unknown="ignore")
    numerical_scaler = StandardScaler(with_mean=True, with_std=True)
    cycle_sine_12 = make_sine_cycle_encoder(period=12)
    cycle_sine_31 = make_sine_cycle_encoder(period=31)
    cycle_sine_6 = make_sine_cycle_encoder(period=6)
    return make_column_transformer(
        (one_hot, categorical_selector),
        (numerical_scaler, float_seletor),
        (numerical_scaler, ["date_year"]),
        (cycle_sine_12, ["date_month"]),
        (cycle_sine_31, ["date_day"]),
        (cycle_sine_6, ["date_day_of_week"]),
        remainder="drop",
        verbose=True,
    )


preprocessing_df_column_transformer = make_df_column_transformer()
X_encoded = preprocessing_df_column_transformer.fit_transform(X)
pd.DataFrame.sparse.from_spmatrix(X_encoded)


In [None]:
from sklearn.datasets import load_iris
from sklearn.feature_selection import SelectKBest
from sklearn.feature_selection import chi2



In [None]:
from typing import Tuple
from sklearn.model_selection import train_test_split


def produce_split_summary(
    X_split: pd.DataFrame, y_split: pd.DataFrame, name: str, total: int
) -> Tuple[str, int, int, str]:
    return (
        name,
        X_split.shape[0],
        y_split.shape[0],
        "{:.1f}%".format(100.0 * X_split.shape[0] / total),
    )


r = 42
train_size = 0.99
X1, X2, y1, y2 = train_test_split(X_encoded, y, train_size=train_size, random_state=r)
pd.DataFrame(
    [
        produce_split_summary(X, y, "full", total=X.shape[0]),
        produce_split_summary(X1, y1, "train", total=X.shape[0]),
        produce_split_summary(X2, y2, "test", total=X.shape[0]),
    ],
    columns=["split", "|X|", "|y|", "%"],
)


In [None]:
from typing import Tuple
from sklearn.neural_network import MLPRegressor


def make_model(hidden_layer_sizes: Tuple[int, ...], max_iter: int):
    r = 42
    return MLPRegressor(
        hidden_layer_sizes=hidden_layer_sizes,
        validation_fraction=0.01,
        random_state=r,
        verbose=True,
        max_iter=max_iter,
    )


model = make_model(hidden_layer_sizes=(2,), max_iter=1)
model.fit(X1, y1)
pd.DataFrame(
    [
        ("train", model.score(X1, y1)),
        ("test", model.score(X2, y2)),
    ],
    columns=["spit", "score"],
)


In [None]:
import itertools
import numpy as np
from typing import Set, List, Tuple
from sklearn.model_selection import GridSearchCV


grid = GridSearchCV(
    model,
    param_grid={
        "hidden_layer_sizes": [
            (2, 4),
            (3, 4),
            (4, 4),
            (8, 4),
            (16, 4),
        ],
        "max_iter": [10],
    },
    cv=5,
    verbose=3,
)
grid


In [None]:
grid.fit(X1, y1)
grid.best_params_

In [None]:
model = make_model(
    hidden_layer_sizes=grid.best_params_["hidden_layer_sizes"],
    max_iter=200,
)
model


In [None]:
model.fit(X1, y1)


In [None]:
model.score(X2, y2)


# III. Conclusions


## Performance of results


## Closing remarks/statements
