In [1]:
import pandas as pd
import pyarrow
from dotenv import load_dotenv
import os
import re
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.utils import resample
from sklearn.metrics import (
    accuracy_score,
    precision_score,
    recall_score,
    f1_score,
    cohen_kappa_score,
    log_loss,
    classification_report
)
from sklearn.utils.class_weight import compute_class_weight
from sklearn.preprocessing import OneHotEncoder
import numpy as np
import pandas as pd
from shapely import wkt
from shapely.affinity import translate, scale
from PIL import Image, ImageDraw
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix

import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report
from shapely.affinity import translate, scale
from PIL import Image, ImageDraw

load_dotenv()

file_path = os.getenv('FILE_PATH')

df_sample = pd.read_parquet(file_path + "detailed_woning_type_sample.parquet")
df = pd.read_csv(file_path + "bag_image_summary.csv", dtype="string")
df_joined = pd.merge(df_sample, df, how="left", right_on="bag_id", left_on="bag_nummeraanduidingid")
df = df_joined[df_joined["frontview_exists"].notna()]

# If you want to add the file path to the URLs, set this to True
add_file_path_to_urls = True

# Currently a funda sourced Url goes from: 
# frontview/0797/2000/0002/3888/0797200000023888.jpg
# to: img_dataset/07/079720000002-funda.jpg
def extract_path(url, source):
    if pd.isna(url) or url == '' or url is None:
        return ''
    id = url.rstrip('/').split('/')[-1]
    id, *_ = id.split('.')
    m = re.match(r'(\d{2})', id)
    first_two_digits = m.group(1) if m else ''
    return f"img_dataset/{first_two_digits}/{id}-{source}.jpg"

link_cols = ['frontview_funda_url', 'frontview_google_url', 'frontview_funda_in_business_url']
link_sources = ['funda', 'google', 'funda-in-business'] # Sources are in file name, so need to be added to filename for correct name

for col, source in zip(link_cols, link_sources):
    df[f'{col}_split'] = df[col].map(lambda url: extract_path(url, source))

# If you want to add the file path to the URLs, set add_file_path_to_urls to True
if add_file_path_to_urls:
    df[[f'{col}_split' for col in link_cols]] = df[[f'{col}_split' for col in link_cols]].map(lambda x: file_path + x if x else '')
    add_file_path_to_urls = False

df.to_csv(
    file_path + "Full_preprocessed_detailed_house.csv",
    index=False,
    encoding='utf-8',
)

df = pd.read_csv(file_path + "Full_preprocessed_detailed_house.csv", dtype="string")

# Verschillend is a special case, so we remove it from the dataset
df = df[df['build_type'] != 'Verschillend']

def pick_first_url(row):
    for col in [f"{c}_split" for c in link_cols]:
        val = row[col]
        if pd.notna(val) and val != '':
            return val
    return ''

df['frontview_url'] = df.apply(pick_first_url, axis=1)
df = df[df['frontview_url'] != '']

# Ensure 'opp_pand' and 'oppervlakte' are numeric before division
df['procent_ingenomen'] = pd.to_numeric(df['opp_pand'], errors='coerce') / pd.to_numeric(df['oppervlakte'], errors='coerce')

df['huisnr_bag_letter'] = df['huisnr_bag_letter'].notna().astype(int)
df['huisnr_bag_toevoeging'] = df['huisnr_bag_toevoeging'].notna().astype(int)

df['is_monument'] = df['is_monument'].fillna(0).astype(int)
df['is_protected'] = df['is_protected'].fillna(0).astype(int)

df = df.drop(columns=['bag_nummeraanduidingid', 'frontview_exists', 'random_rank', 'num_funda_images',
                      'frontview_funda_url', 'frontview_google_url', 'frontview_funda_in_business_url', 
                      'frontview_funda_url_split', 'frontview_google_url_split', 'frontview_funda_in_business_url_split',
                      'special_house_type', 'source_data_result_id',
                      'straatnaam', 'postcode', 'plaatsnaam', 'source_data_timestamp', 'bag_id'
                      ])

# merge_map = {
#     # Corridor or gallery flats
#     'Corridorflat': 'Corridor/Galerijflat',
#     'Galerijflat':  'Corridor/Galerijflat',
#     'Halfvrijstaande woning': 'Halfvrijstaande woning/2-onder-1-kapwoning',
#     '2-onder-1-kapwoning': 'Halfvrijstaande woning/2-onder-1-kapwoning',
#     'Hoekwoning':    'Hoek/Eindwoning',
#     'Eindwoning':    'Hoek/Eindwoning',
# }

# df['woningtype'] = df['woningtype'].map(merge_map).fillna(df['woningtype'])

# Full preprocessed dataset with URLS, can be loaded into pipeline.
df.to_csv(
    file_path + "Full_preprocessed_detailed_house.csv",
    index=False,
    encoding='utf-8',
)

In [2]:
def compute_orientation(polygon):
    if polygon.is_empty or not polygon.is_valid:
        return np.nan

    # Get minimum rotated rectangle
    mrr = polygon.minimum_rotated_rectangle
    coords = list(mrr.exterior.coords)

    # Find the longest edge
    max_length = 0
    angle = 0

    for i in range(len(coords) - 1):
        p1 = coords[i]
        p2 = coords[i + 1]

        dx = p2[0] - p1[0]
        dy = p2[1] - p1[1]

        length = np.hypot(dx, dy)
        if length > max_length:
            max_length = length
            angle = np.degrees(np.arctan2(dy, dx))

    # Normalize angle to 0–180
    return angle % 180

def compute_elongation(polygon):
    if polygon.is_empty or not polygon.is_valid:
        return np.nan

    # Minimum rotated rectangle (oriented bounding box)
    min_rect = polygon.minimum_rotated_rectangle

    # Get corner points of the box
    coords = list(min_rect.exterior.coords)

    # Compute distances between the 4 sides
    edge_lengths = [np.linalg.norm(np.subtract(coords[i], coords[i + 1])) for i in range(4)]

    width = min(edge_lengths)
    height = max(edge_lengths)

    if height == 0:  # Prevent divide-by-zero
        return np.nan

    return width / height

def rasterize_polygon(geom, size=224):
    bounds = geom.bounds
    geom = translate(geom, xoff=-bounds[0], yoff=-bounds[1])
    scale_x = size / (bounds[2] - bounds[0] + 1e-8)
    scale_y = size / (bounds[3] - bounds[1] + 1e-8)
    geom = scale(geom, xfact=scale_x, yfact=scale_y, origin=(0, 0))

    img = Image.new("L", (size, size), 0)
    draw = ImageDraw.Draw(img)
    coords = [(x, size - y) for x, y in geom.exterior.coords]
    draw.polygon(coords, outline=1, fill=1)
    return np.array(img)

In [3]:
df['geometry'] = df['geometry'].apply(wkt.loads)

df['centroid_x'] = df['geometry'].apply(lambda geom: geom.centroid.x)
df['centroid_y'] = df['geometry'].apply(lambda geom: geom.centroid.y)

df['area'] = df['geometry'].apply(lambda geom: geom.area)

# perimeter = Sum of the lengths of all edges forming the boundary of a polygon
df['perimeter'] = df['geometry'].apply(lambda g: g.length)

# Gives 1 for a perfect circle (most compact shape)
# Gets closer to 0 for long, skinny, jagged shapes
df['compactness'] = (
    4 * np.pi * df['area'] / (df['perimeter'] ** 2)
)

df['num_vertices'] = df['geometry'].apply(lambda g: len(g.exterior.coords))

df['elongation'] = df['geometry'].apply(compute_elongation)

df['orientation_deg'] = df['geometry'].apply(compute_orientation)

df['num_vertices_log'] = np.log1p(df['num_vertices'])

df["mask"] = df["geometry"].apply(lambda g: rasterize_polygon(g, size=224))

In [4]:
# Normalize centroid_x and centroid_y
for col in ['centroid_x', 'centroid_y']:
    min_val = df[col].min()
    max_val = df[col].max()
    df[col] = df[col] / 3000000

# Normalize orientation_deg
df['orientation_deg'] = df['orientation_deg'] / 360

df = df.drop(columns=['geometry', 'num_vertices'])

In [5]:
# def prepare_final_data(df, base_path):
#     """Adds the full image path and encodes the 'woningtype' label."""
#     # Build full image path
#     df['img_path'] = df['frontview_url'].apply(lambda x: os.path.join(base_path, x))
#     return df

# df = prepare_final_data(df, file_path)

In [6]:
# Normalizing and feature engineering are applied to the dataset separately for training, validation, and testing.
# This is to ensure that the model does not learn from the validation and test sets during training.

# adjust random_state for reproducibility
train_df, temp_df = train_test_split(df, test_size=0.2, random_state=42)
val_df, test_df = train_test_split(temp_df, test_size=0.5, random_state=42)

# Check if standardization is needed for huisnr and pocent_ingenomen, large numbers can be encoded as categorical?
for dataframe in [train_df, val_df, test_df]:
    for col in ['area']:
        scaler = StandardScaler()
        dataframe[col] = scaler.fit_transform(dataframe[[col]])

    if dataframe is train_df:
        build_type_train = train_df[['build_type']]
        encoder = OneHotEncoder(sparse_output=False, handle_unknown='ignore')
        encoder.fit(build_type_train)
        raw_feature_names = encoder.get_feature_names_out(['build_type'])
        clean_feature_names = [name.replace(' ', '_') for name in raw_feature_names]

    build_type = dataframe[['build_type']]
    encoded_array = encoder.transform(build_type)
    encoded_df = pd.DataFrame(encoded_array, columns=clean_feature_names, index=dataframe.index)
    dataframe.drop('build_type', axis=1, inplace=True)
    dataframe[encoded_df.columns] = encoded_df

In [7]:
features = [
    # BAG
    'area',
    'centroid_x',                               # 0 - 1
    'centroid_y',                               # 0 - 1
    'perimeter',                                # 0 - 1
    'compactness',                              # 0 - 1
    'elongation',                               # 0 - 1
    'orientation_deg',                          # 0 - 1
    'num_vertices_log',                         # 0 - 7 (can be inf)
]

target = 'woningtype'

In [8]:
train_df.to_csv(os.path.join(file_path, "train_df.csv"), index=False)
val_df.to_csv(os.path.join(file_path, "val_df.csv"), index=False)
test_df.to_csv(os.path.join(file_path, "test_df.csv"), index=False)

print("Saved train_df.csv, val_df.csv and test_df.csv to:", file_path)

Saved train_df.csv, val_df.csv and test_df.csv to: ../../Data/


In [9]:
len(train_df)

5648

In [10]:
for col in df.columns:
    print(col)

woningtype
huisnr
huisnr_bag_letter
huisnr_bag_toevoeging
opp_pand
oppervlakte
build_year
build_type
is_monument
is_protected
frontview_url
procent_ingenomen
centroid_x
centroid_y
area
perimeter
compactness
elongation
orientation_deg
num_vertices_log
mask


In [12]:
# Device setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

# Dataset
class MultiModalDataset(Dataset):
    def __init__(self, df, transform=None):
        self.df = df
        self.features = features
        self.transform = transform
        self.counter = 0

        self.masks = [torch.tensor(m, dtype=torch.float32).unsqueeze(0) for m in df["mask"]]
        self.tabular = torch.tensor(df[features].values, dtype=torch.float32)
        self.labels = torch.tensor(df["woningtype_encoded"].values, dtype=torch.long)
        
        self._remove_missing()


    def _remove_missing(self):
        to_drop = []
        for idx, row in self.df.iterrows():
            try:
                Image.open(row['frontview_url'])
            except Exception as e:
                to_drop.append(idx)
                self.counter += 1
                print(f'Dropped {self.counter}th row due to image load error: {e}')
        
        # Drop all invalid rows at once
        self.df = self.df.drop(index=to_drop).reset_index(drop=True)


    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]

        # Get mask and tabular
        mask = self.masks[idx]
        tabular = self.tabular[idx]
        label = self.labels[idx]

        # Load image
        try:
            img = Image.open(row['frontview_url']).convert('RGB')
            if self.transform:
                img = self.transform(img)
            else:
                img = torch.tensor(np.array(img), dtype=torch.float32).permute(2, 0, 1) / 255.0  # normalize manually
        except Exception as e:
            # Handle failed image read by recursively calling next index
            print(e)
            return self.__getitem__((idx + 1) % len(self))

        return img, mask, tabular, label

# Model
class CNNWithTabular(nn.Module):
    def __init__(self, image_out_dim, tabular_dim, output_dim):
        super().__init__()
        self.cnn = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=3, padding=1), nn.ReLU(),
            nn.MaxPool2d(3),
            nn.Conv2d(16, 64, kernel_size=3, padding=1), nn.ReLU(),
            nn.MaxPool2d(3),
            nn.Dropout(0.2),
            nn.Conv2d(64, 128, kernel_size=3, padding=1), nn.ReLU(),
            nn.MaxPool2d(3),
            nn.Flatten(),
            nn.Dropout(0.2),
            nn.Linear(128 * 8 * 8, image_out_dim), nn.ReLU()
        )

        self.tabular_net = nn.Sequential(
            nn.Linear(tabular_dim, 512), nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(512, 128), nn.ReLU(),
        )

        self.tab_scale = nn.Parameter(torch.ones(1) * 1.5)  

        self.final = nn.Sequential(
            nn.Linear(image_out_dim + 128, 512), nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(512, 256), nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(256, output_dim)
        )

    def forward(self, img, tab):
        img_out = self.cnn(img)
        tab_out = self.tabular_net(tab)
        tab_out = tab_out * self.tab_scale
        x = torch.cat([img_out, tab_out], dim=1)
        return self.final(x)

Using device: cuda


In [13]:
# Encode labels
le = LabelEncoder()
train_df["woningtype_encoded"] = le.fit_transform(train_df[target])
val_df["woningtype_encoded"] = le.transform(val_df[target])
test_df["woningtype_encoded"] = le.transform(test_df[target])

# Datasets and loaders
train_ds = MultiModalDataset(train_df)
val_ds = MultiModalDataset(val_df)
test_ds = MultiModalDataset(test_df)

train_loader = DataLoader(train_ds, batch_size=32, shuffle=True)
val_loader = DataLoader(val_ds, batch_size=32)
test_loader = DataLoader(test_ds, batch_size=32)

# Model, loss, optimizer
model = CNNWithTabular(image_out_dim=128, tabular_dim=len(features), output_dim=len(le.classes_)).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(
    model.parameters(),
    lr=0.001,
    betas=(0.9, 0.999),
    weight_decay=1e-5
)

# Training loop
loss_history = []

for epoch in range(16):
    model.train()
    total_loss = 0
    for x_img, x_mask, x_tab, y_batch in train_loader:
        x_img, x_mask, x_tab, y_batch = x_img.to(device), x_mask.to(device), x_tab.to(device), y_batch.to(device)
        optimizer.zero_grad()
        out = model(x_img, x_tab)
        loss = criterion(out, y_batch)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    
    avg_loss = total_loss / len(train_loader)
    loss_history.append(avg_loss)
    print(f"Epoch {epoch+1}: Loss = {avg_loss:.4f}")

# Evaluation
all_preds, all_true, all_probs = [], [], []

model.eval()
with torch.no_grad():
    for x_img, x_tab, y_batch in test_loader:
        x_img, x_tab, y_batch = x_img.to(device), x_tab.to(device), y_batch.to(device)
        out = model(x_img, x_tab)
        probs = torch.softmax(out, dim=1)
        preds = probs.argmax(dim=1)
        
        all_preds.extend(preds.cpu().tolist())
        all_true.extend(y_batch.cpu().tolist())
        all_probs.extend(probs.cpu().numpy())

print("\n--- Evaluation Metrics ---")
accuracy = accuracy_score(all_true, all_preds)
precision = precision_score(all_true, all_preds, average='macro', zero_division=0)
recall = recall_score(all_true, all_preds, average='macro', zero_division=0)
f1 = f1_score(all_true, all_preds, average='macro', zero_division=0)
kappa = cohen_kappa_score(all_true, all_preds)
logloss = log_loss(all_true, all_probs)

print(f"Accuracy: {accuracy:.4f}")
print(f"Precision (macro): {precision:.4f}")
print(f"Recall (macro): {recall:.4f}")
print(f"F1 Score (macro): {f1:.4f}")
print(f"Cohen's Kappa: {kappa:.4f}")
print(f"Log Loss: {logloss:.4f}")

# --- Classification Report ---
print("\n--- Classification Report ---") 
print(classification_report(all_true, all_preds, target_names=le.classes_, zero_division=0))

Dropped 1th row due to image load error: [Errno 2] No such file or directory: '../../Data/img_dataset/09/0938200000025776-funda.jpg'
Dropped 2th row due to image load error: [Errno 2] No such file or directory: '../../Data/img_dataset/09/0984200000013978-funda.jpg'
Dropped 3th row due to image load error: [Errno 2] No such file or directory: '../../Data/img_dataset/09/0917200000025392-funda.jpg'
Dropped 4th row due to image load error: [Errno 2] No such file or directory: '../../Data/img_dataset/09/0957200000019190-funda.jpg'
Dropped 5th row due to image load error: [Errno 2] No such file or directory: '../../Data/img_dataset/09/0988200000035883-funda.jpg'
Dropped 6th row due to image load error: [Errno 2] No such file or directory: '../../Data/img_dataset/09/0988200000049049-funda.jpg'
Dropped 7th row due to image load error: [Errno 2] No such file or directory: '../../Data/img_dataset/09/0907200000015447-funda.jpg'
Dropped 8th row due to image load error: [Errno 2] No such file or di

RuntimeError: stack expects each tensor to be equal size, but got [3, 960, 1440] at entry 0 and [3, 640, 640] at entry 2

In [None]:
print("\n--- Confusion Matrix ---")
cm = confusion_matrix(all_true, all_preds)
plt.figure(figsize=(12, 10))
sns.heatmap(cm, annot=True, fmt="d", cmap='Blues', xticklabels=le.classes_, yticklabels=le.classes_)
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.title('Confusion Matrix')
plt.show()  

In [None]:
plt.figure(figsize=(8, 6))
plt.plot(range(1, len(loss_history) + 1), loss_history, marker='o')
plt.title('Training Loss per Epoch')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.grid(True)
plt.show()