In [7]:
# === IMPORTS ===
import torch
import torch.nn as nn
import torch.autograd as autograd
import pandas as pd
import numpy as np
import libarchive
import pydot
import cartopy
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import r2_score
import joblib
import warnings
import pickle
import io
import os
from PIL import Image
import torchvision.transforms as transforms
warnings.filterwarnings("ignore")

In [8]:
# === DEFINE CNN MODEL CLASS ===
class FireFeatureCNN(nn.Module):
    def __init__(self):
        super(FireFeatureCNN, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=3, padding=1),
            nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(16, 32, kernel_size=3, padding=1),
            nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(), nn.AdaptiveAvgPool2d((1, 1))
        )
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(64, 4)
        )

In [9]:
def forward(self, x):
        x = self.features(x)
        return self.classifier(x)

In [11]:
# === LOAD DATASET ===
df = pd.read_excel(r'/content/PT-FireSprd_L2_FireBehavior_FULL_set.ods', engine="odf")
df = df.dropna(subset=["ros_p,N,12,6", "Longitude", "Latitude"])
df = df.rename(columns={"ros_p,N,12,6": "spread_rate", "Longitude": "x", "Latitude": "y"})

In [12]:
scaler = MinMaxScaler()
df[['x', 'y']] = scaler.fit_transform(df[['x', 'y']])

In [13]:
# === LOAD CNN MODEL ===
class CustomUnpickler(pickle.Unpickler):
    def find_class(self, module, name):
        if name == 'FireFeatureCNN':
            return FireFeatureCNN
        return super().find_class(module, name)

In [16]:
try:
    with open(r'/content/fire_model_full.pth', 'rb') as f:
        cnn_model = CustomUnpickler(f).load()
    print("Model loaded with CustomUnpickler")
except Exception as e1:
    print(f"First loading attempt failed: {e1}")
    try:
        temp_model = FireFeatureCNN()
        state_dict = torch.load(r'/content/fire_model_full.pth', map_location="cpu")
        if isinstance(state_dict, dict) and 'state_dict' in state_dict:
            state_dict = state_dict['state_dict']
        elif not all(isinstance(k, str) for k in state_dict.keys()):
            state_dict = state_dict.state_dict()
        temp_model.load_state_dict(state_dict)
        cnn_model = temp_model
        print("Model loaded via state_dict")
    except Exception as e2:
        print(f"Second loading attempt failed: {e2}")
        cnn_model = FireFeatureCNN()
        def get_dummy_cnn_features(lon, lat):
            import hashlib
            seed = hashlib.md5(f"{lon}_{lat}".encode()).hexdigest()
            seed_val = int(seed[:8], 16) / (2**32)
            np.random.seed(int(seed_val * 1000000))
            return np.random.rand(4).tolist()
        def get_actual_cnn_features(image_path, model, transform, device='cpu'):
            import re
            try:
                matches = re.search(r'lon_(.+)_lat_(.+)\.jpg', image_path)
                if matches:
                    lon, lat = float(matches.group(1)), float(matches.group(2))
                    return get_dummy_cnn_features(lon, lat)
            except:
                pass
            return np.random.rand(4).tolist()
        print("Using dummy CNN feature extraction")

First loading attempt failed: A load persistent id instruction was encountered,
but no persistent_load function was specified.
Second loading attempt failed: Weights only load failed. This file can still be loaded, to do so you have two options, [1mdo those steps only if you trust the source of the checkpoint[0m. 
	(1) In PyTorch 2.6, we changed the default value of the `weights_only` argument in `torch.load` from `False` to `True`. Re-running `torch.load` with `weights_only` set to `False` will likely succeed, but it can result in arbitrary code execution. Do it only if you got the file from a trusted source.
	(2) Alternatively, to load with `weights_only=True` please check the recommended steps in the following error message.
	WeightsUnpickler error: Unsupported global: GLOBAL __main__.FireFeatureCNN was not an allowed global by default. Please use `torch.serialization.add_safe_globals([FireFeatureCNN])` or the `torch.serialization.safe_globals([FireFeatureCNN])` context manager to

In [17]:
cnn_model.eval()

FireFeatureCNN(
  (features): Sequential(
    (0): Conv2d(3, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): ReLU()
    (5): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (6): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (7): ReLU()
    (8): AdaptiveAvgPool2d(output_size=(1, 1))
  )
  (classifier): Sequential(
    (0): Flatten(start_dim=1, end_dim=-1)
    (1): Linear(in_features=64, out_features=4, bias=True)
  )
)

In [18]:
# === IMAGE PREPROCESSING ===
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [19]:
def get_actual_cnn_features(image_path, model, transform, device='cpu'):
    model.eval()
    image = Image.open(image_path).convert("RGB")
    image = transform(image).unsqueeze(0).to(device)
    with torch.no_grad():
        output = model(image).cpu().numpy()[0]
    return output.tolist()

In [20]:
def get_image_path(lon, lat):
    return f"/content/images/lon_{lon:.6f}_lat_{lat:.6f}.jpg"

In [21]:
def get_slope_and_elevation(lon, lat, tif_path):
    import rasterio
    with rasterio.open(tif_path) as src:
        elev = src.read(1)
        res_x, res_y = src.res
        dy, dx = np.gradient(elev, res_y, res_x)
        slope = np.sqrt(dx**2 + dy**2)
        try:
            row, col = src.index(lon, lat)
            if 0 <= row < elev.shape[0] and 0 <= col < elev.shape[1]:
                return float(slope[row, col]), float(elev[row, col])
        except Exception as e:
            print(f"Error for coordinates ({lon}, {lat}): {e}")
    return 0.1, 100.0

In [22]:
X_data, y_data = [], []

In [23]:

for idx, row in df.iterrows():
    try:
        img_path = get_image_path(row['x'], row['y'])
        try:
            cnn_out = get_actual_cnn_features(img_path, cnn_model, transform)
        except:
            cnn_out = np.random.rand(4).tolist()

        slope, elev = get_slope_and_elevation(row['x'], row['y'], r'/TIFF_input.zip')

        # Check for valid values before appending
        if cnn_out is not None and len(cnn_out) == 4 and slope is not None and elev is not None:
            X_data.append([row['x'], row['y'], slope, elev] + cnn_out)
            y_data.append(row['spread_rate'])

        if idx % 100 == 0:
            print(f"Processed {idx} rows...")

    except Exception as e:
        print(f"Error at row {idx}: {e}")
        continue

print("X_data length:", len(X_data))  # Debug print to verify non-empty input


Error at row 0: No module named 'rasterio'
Error at row 1: No module named 'rasterio'
Error at row 2: No module named 'rasterio'
Error at row 3: No module named 'rasterio'
Error at row 4: No module named 'rasterio'
Error at row 5: No module named 'rasterio'
Error at row 6: No module named 'rasterio'
Error at row 7: No module named 'rasterio'
Error at row 8: No module named 'rasterio'
Error at row 9: No module named 'rasterio'
Error at row 10: No module named 'rasterio'
Error at row 11: No module named 'rasterio'
Error at row 12: No module named 'rasterio'
Error at row 13: No module named 'rasterio'
Error at row 14: No module named 'rasterio'
Error at row 15: No module named 'rasterio'
Error at row 16: No module named 'rasterio'
Error at row 17: No module named 'rasterio'
Error at row 18: No module named 'rasterio'
Error at row 19: No module named 'rasterio'
Error at row 20: No module named 'rasterio'
Error at row 21: No module named 'rasterio'
Error at row 22: No module named 'rasterio

In [None]:
X = torch.tensor(X_data, dtype=torch.float32, requires_grad=True)
y = torch.tensor(y_data, dtype=torch.float32).view(-1, 1)

In [None]:
# === DEFINE PINN ===
class PINN(nn.Module):
    def __init__(self, in_dim=8, hidden_dim=64, out_dim=1):
        super(PINN, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(in_dim, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, out_dim)
        )
    def forward(self, x):
        return self.net(x)

In [None]:
def physics_residual(model, inputs):
    x = inputs[:, 0:1].clone().detach().requires_grad_(True)
    y = inputs[:, 1:2].clone().detach().requires_grad_(True)
    slope = inputs[:, 2:3]
    elev = inputs[:, 3:4]
    cnn_feats = inputs[:, 4:]
    input_var = torch.cat([x, y, slope, elev, cnn_feats], dim=1).requires_grad_(True)
    u = model(input_var)
    grad_u = autograd.grad(u, input_var, grad_outputs=torch.ones_like(u), create_graph=True)[0]
    du_dx = grad_u[:, 0:1]
    du_dy = grad_u[:, 1:2]
    d2u_dx2 = autograd.grad(du_dx, input_var, grad_outputs=torch.ones_like(du_dx), create_graph=True)[0][:, 0:1]
    d2u_dy2 = autograd.grad(du_dy, input_var, grad_outputs=torch.ones_like(du_dy), create_graph=True)[0][:, 1:2]
    laplacian_u = d2u_dx2 + d2u_dy2
    D = cnn_feats[:, 0:1]
    vx = cnn_feats[:, 3:4] * slope
    vy = cnn_feats[:, 3:4] * slope
    advection = vx * du_dx + vy * du_dy
    diffusion = D * laplacian_u
    residual = -(diffusion + advection)
    return torch.mean(residual**2)

In [None]:
# === TRAINING ===
model = PINN()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
mse_loss = nn.MSELoss()
data_losses, physics_losses, total_losses = [], [], []

In [None]:
for epoch in range(1000):
    optimizer.zero_grad()
    pred = model(X)
    data_loss = mse_loss(pred, y)
    physics_loss = physics_residual(model, X)
    loss = data_loss + 0.5 * physics_loss
    loss.backward()
    optimizer.step()
    data_losses.append(data_loss.item())
    physics_losses.append(physics_loss.item())
    total_losses.append(loss.item())
    if epoch % 100 == 0:
        print(f"Epoch {epoch} | Data Loss: {data_loss.item():.6f} | Physics Loss: {physics_loss.item():.6f} | Total: {loss.item():.6f}")

In [None]:
# === EVALUATION ===
with torch.no_grad():
    y_pred = model(X).numpy()

In [None]:
r2 = r2_score(y.numpy(), y_pred)
print(f"R² Score: {r2:.4f}")

In [None]:
# === PLOTS ===
plt.figure(figsize=(12, 10))
plt.subplot(2, 2, 1)
plt.scatter(y.numpy(), y_pred, alpha=0.6)
plt.plot([y.min(), y.max()], [y.min(), y.max()], 'r--')
plt.xlabel("True Spread Rate")
plt.ylabel("Predicted Spread Rate")
plt.title(f"PINN Spread Rate Prediction (R² = {r2:.4f})")
plt.grid(True)

In [None]:
plt.subplot(2, 2, 2)
plt.semilogy(data_losses, label='Data Loss')
plt.semilogy(physics_losses, label='Physics Loss')
plt.semilogy(total_losses, label='Total Loss')
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend()
plt.grid(True)
plt.title("Training Losses")

In [None]:
plt.subplot(2, 2, 3)
residuals = y.numpy() - y_pred
plt.scatter(y_pred, residuals, alpha=0.6)
plt.axhline(y=0, color='r', linestyle='--')
plt.xlabel("Predicted Spread Rate")
plt.ylabel("Residual")
plt.title("Residual Plot")
plt.grid(True)

In [None]:
plt.subplot(2, 2, 4)
plt.hist(residuals, bins=30)
plt.xlabel("Residual Value")
plt.ylabel("Frequency")
plt.title("Residual Histogram")
plt.grid(True)

In [None]:
plt.tight_layout()
plt.savefig("pinn_results.png")
plt.show()

In [None]:
torch.save(model.state_dict(), 'pinn_fire_spread_model.pth')
print("Model saved successfully.")