In [None]:
import os
import pickle as pkl
import pandas as pd
import geopandas as gpd
import numpy as np
import random
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error, mean_absolute_error
import torch
import torch.nn as nn
import torch.optim as optim

# Set a fixed random seed for reproducibility
random.seed(19)

# Path to save the final DataFrame
output_pkl_path = "../data/data_final.pkl"
with open(output_pkl_path, "rb") as f:
    gdf = pkl.load(f)
    
to_pred_gdf = gdf.loc[gdf['date'] >= pd.to_datetime('2017-01-01 00:00:00')].reset_index(drop=True)
gdf = gdf.loc[gdf['date'] < pd.to_datetime('2017-01-01 00:00:00')].reset_index(drop=True)

In [None]:
tiles = gdf['tile_index'].unique().tolist()
random.shuffle(tiles)

train_ratio = 0.8
split_index = int(train_ratio * len(tiles))
train_tiles = tiles[:split_index] 
test_tiles = tiles[split_index:]  

train_gdf = gdf.loc[gdf['tile_index'].isin(train_tiles)]
test_gdf = gdf.loc[gdf['tile_index'].isin(test_tiles)]

X_cols = [col for col in gdf.columns if col.startswith('feature')]
y_col = 'urban_imperviousness'

X_train = train_gdf[X_cols]
y_train = np.stack(train_gdf[y_col].values).reshape(-1, 3, 3) # The target is a 3x3 array (originally a raster tile)

X_test = test_gdf[X_cols]
y_test = np.stack(test_gdf[y_col].values).reshape(-1, 3, 3)

# Convert data to PyTorch tensors
X_train_tensor = torch.tensor(X_train.values, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train.reshape(-1, 9), dtype=torch.float32)  # Flattened 3x3 grids

X_test_tensor = torch.tensor(X_test.values, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test.reshape(-1, 9), dtype=torch.float32)

In [17]:
print(X_train.values.shape)
print(y_train.shape)

(14289, 768)
(14289, 3, 3)


In [None]:
# Baseline Model

# Calculate the mean urban imperviousness for each cell in the 3x3 grid across the training set
baseline_prediction = np.mean(y_train, axis=0)
print("Baseline prediction (3x3 grid):\n", baseline_prediction)

# Flatten the true and predicted 3x3 grids to apply the metrics for the test set
y_test_flat = y_test.reshape(-1, 9)  # Flatten each 3x3 grid in y_test to a 9-element vector

# Repeat the baseline prediction for each sample in y_test for comparison
baseline_predictions_test = np.tile(baseline_prediction.flatten(), (y_test_flat.shape[0], 1))

# Calculate baseline metrics on the test set
mse_baseline_test = mean_squared_error(y_test_flat, baseline_predictions_test)
mae_baseline_test = mean_absolute_error(y_test_flat, baseline_predictions_test)

print(f"Baseline MSE on Test Set: {mse_baseline_test}")
print(f"Baseline MAE on Test Set: {mae_baseline_test}")


Baseline prediction (3x3 grid):
 [[0.03153893 0.03276088 0.03203874]
 [0.03237038 0.03245637 0.03228426]
 [0.03272095 0.03256148 0.03270835]]
Baseline MSE on Test Set: 0.004747930448502302
Baseline MAE on Test Set: 0.038547333329916


In [30]:
def train_model(model, X_train_tensor, y_train_tensor, epochs=100, 
                criterion=nn.MSELoss(), optimizer=None, lr=0.001):
    # Default to Adam optimizer if none is provided
    if optimizer is None:
        optimizer = optim.Adam(model.parameters(), lr=lr)
    
    for epoch in range(epochs):
        # Forward pass
        outputs = model(X_train_tensor)
        loss = criterion(outputs, y_train_tensor)
        
        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        # Print loss every 10 epochs
        if (epoch+1) % 10 == 0:
            print(f'Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}')
    return model

In [None]:
# Simple fully connected (dense) neural network 

class SimpleNN(nn.Module):
    def __init__(self, input_size=768, output_size=9, hidden_units=128):
        super(SimpleNN, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_units)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_units, output_size)  # Output layer matches the size of the 3x3 grid
    
    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        return x

# Train the model using the train_model function
simple_nn_model = SimpleNN(input_size=768, output_size=9, hidden_units=128)
trained_simple_nn_model = train_model(
    model=simple_nn_model,  
    X_train_tensor=X_train_tensor,
    y_train_tensor=y_train_tensor,
    epochs=100,  
    criterion=nn.MSELoss(),
    optimizer=optim.Adam(simple_nn_model.parameters(), lr=0.001)
)

# Prediction on test data
with torch.no_grad():
    y_test_pred = trained_simple_nn_model(X_test_tensor)

# Calculate metrics
mse_test = nn.MSELoss()(y_test_pred, y_test_tensor).item()
mae_test = nn.L1Loss()(y_test_pred, y_test_tensor).item()

print(f"SimpleNN MSE on Test Set: {mse_test}")
print(f"SimpleNN MAE on Test Set: {mae_test}")

Epoch [10/100], Loss: 0.0043
Epoch [20/100], Loss: 0.0032
Epoch [30/100], Loss: 0.0028
Epoch [40/100], Loss: 0.0026
Epoch [50/100], Loss: 0.0024
Epoch [60/100], Loss: 0.0023
Epoch [70/100], Loss: 0.0021
Epoch [80/100], Loss: 0.0020
Epoch [90/100], Loss: 0.0020
Epoch [100/100], Loss: 0.0019
SimpleNN MSE on Test Set: 0.0022218564990907907
SimpleNN MAE on Test Set: 0.024465523660182953


In [None]:
# Deep Fully Connected Neural Network

class DeepNN(nn.Module):
    def __init__(self, input_size=768, output_size=9, hidden_units=128):
        super(DeepNN, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_units)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_units, hidden_units)
        self.fc3 = nn.Linear(hidden_units, output_size)  # Output layer matches 3x3 grid size

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        x = self.relu(x)
        x = self.fc3(x)
        return x


# Train the model using the train_model function
deep_nn_model = DeepNN(input_size=768, output_size=9, hidden_units=128)
trained_deep_nn_model = train_model(
    model=deep_nn_model,  
    X_train_tensor=X_train_tensor,
    y_train_tensor=y_train_tensor,
    epochs=100, 
    criterion=nn.MSELoss(),
    optimizer=optim.Adam(deep_nn_model.parameters(), lr=0.001)
)

# Prediction on test data
with torch.no_grad():
    y_test_pred = trained_deep_nn_model(X_test_tensor)

# Calculate metrics
mse_test = nn.MSELoss()(y_test_pred, y_test_tensor).item()
mae_test = nn.L1Loss()(y_test_pred, y_test_tensor).item()

print(f"DeepNN MSE on Test Set: {mse_test}")
print(f"DeepNN MAE on Test Set: {mae_test}")


Epoch [10/100], Loss: 0.0046
Epoch [20/100], Loss: 0.0032
Epoch [30/100], Loss: 0.0028
Epoch [40/100], Loss: 0.0026
Epoch [50/100], Loss: 0.0024
Epoch [60/100], Loss: 0.0023
Epoch [70/100], Loss: 0.0022
Epoch [80/100], Loss: 0.0020
Epoch [90/100], Loss: 0.0019
Epoch [100/100], Loss: 0.0018
SimpleNN MSE on Test Set: 0.002130069537088275
SimpleNN MAE on Test Set: 0.02332741767168045


In [None]:
class CNNModel(nn.Module):
    def __init__(self, input_size=768, output_size=9, hidden_units=128):
        super(CNNModel, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_units)
        self.relu = nn.ReLU()
        self.hidden_units = hidden_units  # Store hidden_units as a class attribute
        self.output_size = output_size  # Store output_size as a class attribute

        # Define a Conv1d layer
        self.conv1 = nn.Conv1d(in_channels=1, out_channels=16, kernel_size=3, stride=1)
        
        # Second fully connected layer to project to the output size
        conv_out_size = (hidden_units - 3 + 1) * 16  # Calculating the output size of conv layer
        self.fc2 = nn.Linear(conv_out_size, output_size)

    def forward(self, x):
        # Fully connected layer followed by ReLU activation
        x = self.fc1(x)
        x = self.relu(x)
        
        # Reshape to match Conv1d input shape (batch_size, channels, sequence_length)
        x = x.view(-1, 1, self.hidden_units)
        
        # Convolutional layer followed by flattening
        x = self.conv1(x)
        x = x.view(x.size(0), -1)  # Flatten before passing to the output layer
        
        # Output layer
        x = self.fc2(x)
        return x


# Train the model using the train_model function
cnn_model = CNNModel(input_size=768, output_size=9, hidden_units=128)
trained_cnn_model = train_model(
    model=cnn_model,  
    X_train_tensor=X_train_tensor,
    y_train_tensor=y_train_tensor,
    epochs=100,
    criterion=nn.MSELoss(),
    optimizer=optim.Adam(cnn_model.parameters(), lr=0.001)
)

# Prediction on test data
with torch.no_grad():
    y_test_pred = trained_cnn_model(X_test_tensor)

# Calculate metrics
mse_test = nn.MSELoss()(y_test_pred, y_test_tensor).item()
mae_test = nn.L1Loss()(y_test_pred, y_test_tensor).item()

print(f"CNN MSE on Test Set: {mse_test}")
print(f"CNN MAE on Test Set: {mae_test}")


Epoch [10/200], Loss: 0.0567
Epoch [20/200], Loss: 0.0196
Epoch [30/200], Loss: 0.0078
Epoch [40/200], Loss: 0.0045
Epoch [50/200], Loss: 0.0037
Epoch [60/200], Loss: 0.0033
Epoch [70/200], Loss: 0.0030
Epoch [80/200], Loss: 0.0029
Epoch [90/200], Loss: 0.0028
Epoch [100/200], Loss: 0.0027
Epoch [110/200], Loss: 0.0027
Epoch [120/200], Loss: 0.0026
Epoch [130/200], Loss: 0.0026
Epoch [140/200], Loss: 0.0025
Epoch [150/200], Loss: 0.0025
Epoch [160/200], Loss: 0.0024
Epoch [170/200], Loss: 0.0024
Epoch [180/200], Loss: 0.0023
Epoch [190/200], Loss: 0.0023
Epoch [200/200], Loss: 0.0023
SimpleNN MSE on Test Set: 0.0023209117352962494
SimpleNN MAE on Test Set: 0.02437349036335945


In [39]:
class LSTMModel(nn.Module):
    def __init__(self, input_size=768, output_size=9, hidden_units=128, lstm_layers=1):
        super(LSTMModel, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_units)
        self.relu = nn.ReLU()
        self.lstm = nn.LSTM(input_size=hidden_units, hidden_size=hidden_units, num_layers=lstm_layers, batch_first=True)
        self.fc2 = nn.Linear(hidden_units, output_size)

    def forward(self, x):
        # First fully connected layer
        x = self.fc1(x)
        x = self.relu(x)
        
        # Reshape for LSTM: (batch_size, sequence_length, input_size)
        x = x.unsqueeze(1)  # Adding a sequence dimension, shape becomes (batch_size, 1, hidden_units)
        
        # LSTM layer
        lstm_out, _ = self.lstm(x)
        
        # Take the output of the last LSTM cell
        x = lstm_out[:, -1, :]
        
        # Final fully connected layer
        x = self.fc2(x)
        return x

# Initialize the LSTM model
lstm_model = LSTMModel(input_size=768, output_size=9, hidden_units=128, lstm_layers=1)

# Train the model using the `train_model` function
trained_lstm_model = train_model(
    model=lstm_model,  
    X_train_tensor=X_train_tensor,
    y_train_tensor=y_train_tensor,
    epochs=100,
    criterion=nn.MSELoss(),
    optimizer=optim.Adam(lstm_model.parameters(), lr=0.001)
)

# Prediction on test data
with torch.no_grad():
    y_test_pred = trained_lstm_model(X_test_tensor)

# Calculate metrics
mse_test = nn.MSELoss()(y_test_pred, y_test_tensor).item()
mae_test = nn.L1Loss()(y_test_pred, y_test_tensor).item()

print(f"LSTM MSE on Test Set: {mse_test}")
print(f"LSTM MAE on Test Set: {mae_test}")


Epoch [10/100], Loss: 0.0054
Epoch [20/100], Loss: 0.0040
Epoch [30/100], Loss: 0.0031
Epoch [40/100], Loss: 0.0028
Epoch [50/100], Loss: 0.0026
Epoch [60/100], Loss: 0.0025
Epoch [70/100], Loss: 0.0024
Epoch [80/100], Loss: 0.0023
Epoch [90/100], Loss: 0.0022
Epoch [100/100], Loss: 0.0021
LSTM MSE on Test Set: 0.0021767104044556618
LSTM MAE on Test Set: 0.024041717872023582
