In [None]:
import torch
import torch.nn as nn
from torchvision.models import convnext_large, ConvNeXt_Large_Weights
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
import pandas as pd
import geopandas as gpd
import numpy as np
from srai.neighbourhoods import H3Neighbourhood
import os
from tqdm import tqdm
from sklearn.decomposition import PCA
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score
import matplotlib.pyplot as plt

In [None]:
# Define the model architecture (make sure this matches your trained model)
class FineTunedConvNeXt(nn.Module):
    def __init__(self):
        super().__init__()
        self.convnext = convnext_large(weights=ConvNeXt_Large_Weights.DEFAULT)

    def forward(self, x):
        features = self.convnext(x)
        return features.view(features.size(0), -1)  # Flatten the features

In [None]:
# Load the trained model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = FineTunedConvNeXt().to(device)
checkpoint_path = r"D:\tu delft\Afstuderen\Phase 6 Experiments\checkpoints\final_model.pth"
model.load_state_dict(torch.load(checkpoint_path, map_location=device))
model.eval()

# Print the number of parameters
total_params = sum(p.numel() for p in model.parameters())
print(f"Total number of parameters: {total_params:,}")

In [None]:
# Dataset for inference with error handling
class RegionDataset(Dataset):
    def __init__(self, regions_gdf, image_dir, transform):
        self.regions_gdf = regions_gdf
        self.image_dir = image_dir
        self.transform = transform
        self.missing_regions = []

    def __len__(self):
        return len(self.regions_gdf)

    def __getitem__(self, idx):
        region_id = self.regions_gdf.index[idx]
        image_path = os.path.join(self.image_dir, f"{region_id}.jpg")
        try:
            image = Image.open(image_path).convert('RGB')
            return self.transform(image), region_id
        except FileNotFoundError:
            self.missing_regions.append(region_id)
            return torch.zeros(3, 224, 224), region_id  # Return a blank image tensor

# Function to generate embeddings
def generate_embeddings(model, regions_gdf, image_dir, device, batch_size):
    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

    dataset = RegionDataset(regions_gdf, image_dir, transform)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=False)

    embeddings = {}
    with torch.no_grad():
        for images, region_ids in tqdm(dataloader, desc="Generating embeddings"):
            images = images.to(device)
            features = model(images)
            for feature, region_id in zip(features, region_ids):
                if not torch.all(images[region_ids.index(region_id)] == 0):
                    embeddings[region_id] = feature.cpu().numpy()

    return pd.DataFrame.from_dict(embeddings, orient='index'), dataset.missing_regions

In [None]:
# Function to perform spatial aggregation for missing regions
def spatial_aggregation(missing_regions, embeddings_df, regions_gdf):
    neighborhood = H3Neighbourhood(regions_gdf)
    for region_id in missing_regions:
        neighbors = neighborhood.get_neighbours_at_distance(region_id, 1)
        neighbor_embeddings = embeddings_df.loc[embeddings_df.index.isin(neighbors)]
        if not neighbor_embeddings.empty:
            aggregated_embedding = neighbor_embeddings.mean().values
            embeddings_df.loc[region_id] = aggregated_embedding
    return embeddings_df

In [None]:
# Load your data and generate embeddings
regions_gdf = gpd.read_file("selected_regions_10.geojson").set_index("region_id")
image_dir = r"D:\tu delft\Afstuderen\aerial_images_10"

print("Generating embeddings for central regions...")
embeddings_df, missing_regions = generate_embeddings(model, regions_gdf, image_dir, device, batch_size=128)

print(f"Number of missing regions: {len(missing_regions)}")
print("Performing spatial aggregation for missing regions...")
embeddings_df = spatial_aggregation(missing_regions, embeddings_df, regions_gdf)

# Save the embeddings
output_dir = r"D:\tu delft\Afstuderen\Phase 6 Experiments\embeddings"
os.makedirs(output_dir, exist_ok=True)
output_file = os.path.join(output_dir, f"inferred_embeddings_res_10.csv")
embeddings_df.to_csv(output_file)
print(f"Embeddings saved to {output_file}")

print("Inference completed!")

In [None]:
# from Plotting import pca_plot, cluster_plot
# import warnings
# warnings.filterwarnings("ignore")
# cluster_plot(embeddings_df, regions_gdf, 7)

In [None]:
# Define target columns and their properties
target_columns = ['afw', 'vrz', 'fys', 'soc', 'onv', 'won']
target_names = {
    'afw': 'Liveability',
    'vrz': 'Amenities',
    'fys': 'Physical Environment',
    'soc': 'Social Cohesion',
    'onv': 'Safety',
    'won': 'Housing Stock'
}
colors = {
    'afw': '#808080',  # Dark Grey for Liveability
    'vrz': '#FF4500',  # Orange Red for Amenities
    'fys': '#32CD32',  # Lime Green for Physical Environment
    'soc': '#8A2BE2',  # Blue Violet for Social Cohesion
    'onv': '#1E90FF',  # Dodger Blue for Safety
    'won': '#FFA500'   # Orange for Housing Stock
}

In [None]:
# Prepare PCA-reduced embeddings
X_full = embeddings_df.values
pca = PCA(n_components=30)
X_pca = pca.fit_transform(X_full)

results = {}

# Perform linear regression for each target column
for column in target_columns:
    y = regions_gdf[column]

    # Remove NaN values
    mask = ~(np.isnan(y) | np.isnan(X_pca).any(axis=1))
    X_pca_valid = X_pca[mask]
    y_valid = y[mask]

    if len(y_valid) == 0:
        print(f"Warning: No valid data for {column} after removing NaN values.")
        results[column] = np.nan
        continue

    # PCA-reduced regression
    X_train, X_test, y_train, y_test = train_test_split(X_pca_valid, y_valid, test_size=0.3, random_state=42)
    model = LinearRegression()
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    results[column] = r2_score(y_test, y_pred)

In [None]:
# Plot results
plt.figure(figsize=(12, 6))
x = np.arange(len(target_columns))
width = 0.6

plt.bar(x, [results[col] for col in target_columns], width,
        color=[colors[col] for col in target_columns])

plt.xlabel('Leefbaarometer Scores', fontsize=12)
plt.ylabel('R² Score', fontsize=12)
plt.title('Performance of PCA-reduced (30) Embeddings', fontsize=14)
plt.xticks(x, [target_names[col] for col in target_columns], rotation=45, ha='right')
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.tight_layout()

# Save the plot
output_dir = r"D:\tu delft\Afstuderen\Phase 6 Experiments\embeddings"
os.makedirs(output_dir, exist_ok=True)
plt.savefig(os.path.join(output_dir, 'pca_reduced_performance.png'), dpi=300, bbox_inches='tight')
plt.close()

print("PCA-reduced embedding analysis completed and plot saved.")
print("R² scores for each Leefbaarometer score:")
for col in target_columns:
    print(f"{target_names[col]}: {results[col]:.4f}")