##**Hybrid Neuro-Symbolic Deep Learning for Tropical Cyclone Intensity Prediction and GIS-Based Risk Visualization**

**Problem Statement:**

Traditional neural models for tropical cyclone intensity prediction can achieve good accuracy but may produce physically implausible forecasts (for example, negative wind speeds or unrealistic intensity jumps) and behave unreliably during extreme events. The goal of this project is to develop a hybrid neuro-symbolic model that predicts short-term cyclone intensity from historical track sequences (latitude, longitude, pressure, wind) while incorporating soft physical and domain constraints as differentiable penalties. This approach aims to improve both predictive performance and physical consistency compared to a neural-only baseline.

##**Install + imports**

In [None]:
!pip install pandas numpy scikit-learn torch


In [None]:
import math
import numpy as np
import pandas as pd

from dataclasses import dataclass
from typing import List, Dict, Tuple

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader


What this block does:

Installs and imports everything we need: data handling, scaling, train split, and PyTorch.

##**Parse latitude/longitude strings**

In [None]:
def parse_latlon(s: str) -> float:
    s = str(s).strip()
    if not s:
        return np.nan
    hemi = s[-1].upper()       # N/S/E/W
    val = float(s[:-1])        # numeric part
    if hemi in ("S", "W"):
        return -val
    return val


What it does:

Converts "28.0N" → 28.0

Converts "94.8W" → -94.8

##**Distance between two track points (Haversine)**

We use this for a “sanity rule” (storms usually don’t teleport hundreds of km in 6 hours).

In [None]:
def haversine_km(lat1, lon1, lat2, lon2) -> float:
    R = 6371.0
    p1, p2 = math.radians(lat1), math.radians(lat2)
    dphi = math.radians(lat2 - lat1)
    dlmb = math.radians(lon2 - lon1)
    a = math.sin(dphi/2)**2 + math.cos(p1)*math.cos(p2)*math.sin(dlmb/2)**2
    return 2 * R * math.asin(min(1.0, math.sqrt(a)))


##**Load your atlantic + pacific CSVs and clean**

In [None]:
import numpy as np
import pandas as pd

def load_tracks(atl_path: str, pac_path: str) -> pd.DataFrame:
    atl = pd.read_csv("/content/atlantic.csv")
    # Use on_bad_lines='skip' to ignore rows that cause parsing errors
    pac = pd.read_csv("/content/pacific.csv", on_bad_lines='skip')

    atl["BASIN"] = "ATL"
    pac["BASIN"] = "PAC"
    df = pd.concat([atl, pac], ignore_index=True)

    # --- Clean string columns (optional but helps) ---
    for c in ["ID", "Name", "Event", "Status"]:
        if c in df.columns:
            df[c] = df[c].astype(str).str.strip()

    # --- Replace HURDAT-style missing values ---
    # Your files use -999 for missing radii/pressure etc.
    df = df.replace(-999, np.nan)

    # --- Parse coordinates safely ---
    def _parse_coord(v):
        # if already numeric, return as float
        if pd.isna(v):
            return np.nan
        if isinstance(v, (int, float, np.integer, np.floating)):
            return float(v)
        v = str(v).strip()
        return parse_latlon(v)  # uses your parse_latlon("28.0N") -> 28.0 etc.

    df["lat"] = df["Latitude"].apply(_parse_coord)
    df["lon"] = df["Longitude"].apply(_parse_coord)

    # --- Make Date/Time numeric safely ---
    df["Date"] = pd.to_numeric(df["Date"], errors="coerce")
    df["Time"] = pd.to_numeric(df["Time"], errors="coerce")

    # Create sortable key: YYYYMMDDHHMM (Time is 0/600/1200/1800 etc.)
    df["dt_key"] = df["Date"] * 10000 + df["Time"]

    # --- Ensure target is numeric and filter usable rows ---
    df["Maximum Wind"] = pd.to_numeric(df["Maximum Wind"], errors="coerce")

    # Keep only rows usable for training
    df = df.dropna(subset=["ID", "dt_key", "lat", "lon", "Maximum Wind"]).reset_index(drop=True)

    return df

In [None]:
df = load_tracks("/content/atlantic.csv", "/content/pacific.csv")
print(df.columns)
print(df.head())


##**Build sequences (examples) per storm**

Goal:

Input: last T=6 points

Output: next-step wind (regression)

In [None]:
from dataclasses import dataclass
import numpy as np

@dataclass
class SeqExample:
    x: np.ndarray   # shape: (T, F)
    y: float        # next-step Maximum Wind
    meta: dict      # extra info for symbolic rules / debugging


Build vocabularies for categorical fields:


In [None]:
import pandas as pd
from typing import Dict

def make_vocab(series: pd.Series) -> Dict[str, int]:
    series = series.fillna("UNK").astype(str).str.strip()
    uniq = sorted(series.unique().tolist())
    return {s: i for i, s in enumerate(uniq)}

def one_hot(idx: int, n: int) -> np.ndarray:
    v = np.zeros(n, dtype=np.float32)
    v[idx] = 1.0
    return v


In [None]:
import math

def haversine_km(lat1, lon1, lat2, lon2) -> float:
    R = 6371.0
    p1, p2 = math.radians(lat1), math.radians(lat2)
    dphi = math.radians(lat2 - lat1)
    dlmb = math.radians(lon2 - lon1)
    a = math.sin(dphi/2)**2 + math.cos(p1)*math.cos(p2)*math.sin(dlmb/2)**2
    return 2 * R * math.asin(min(1.0, math.sqrt(a)))


In [None]:
from typing import List, Tuple

def build_examples(df: pd.DataFrame, seq_len: int = 6) -> Tuple[List[SeqExample], List[str]]:
    df = df.copy()

    # Required columns
    required = ["ID", "dt_key", "lat", "lon", "Maximum Wind", "BASIN"]
    missing = [c for c in required if c not in df.columns]
    if missing:
        raise ValueError(f"Missing required columns: {missing}")

    # Clean strings + missing markers
    df = df.replace(-999, np.nan)
    df["BASIN"] = df["BASIN"].fillna("UNK").astype(str).str.strip()
    if "Status" in df.columns:
        df["Status"] = df["Status"].fillna("UNK").astype(str).str.strip()
    else:
        df["Status"] = "UNK"

    # Coerce to numeric safely
    df["Maximum Wind"] = pd.to_numeric(df["Maximum Wind"], errors="coerce")
    df["Minimum Pressure"] = pd.to_numeric(df.get("Minimum Pressure", 0.0), errors="coerce").fillna(0.0)

    # Wind radii columns (optional)
    wind_radius_cols = [
        "Low Wind NE","Low Wind SE","Low Wind SW","Low Wind NW",
        "Moderate Wind NE","Moderate Wind SE","Moderate Wind SW","Moderate Wind NW",
        "High Wind NE","High Wind SE","High Wind SW","High Wind NW"
    ]
    for c in wind_radius_cols:
        if c not in df.columns:
            df[c] = 0.0
        df[c] = pd.to_numeric(df[c], errors="coerce").fillna(0.0)

    # Vocabularies
    status_vocab = make_vocab(df["Status"])
    basin_vocab = {"ATL": 0, "PAC": 1, "UNK": 2}

    # Features per timestep (include current wind)
    numeric_in_cols = ["lat", "lon", "Minimum Pressure"] + wind_radius_cols + ["Maximum Wind"]

    feature_names = (
        numeric_in_cols
        + [f"basin_{k}" for k in basin_vocab.keys()]
        + [f"status_{k}" for k in status_vocab.keys()]
    )

    examples: List[SeqExample] = []

    for storm_id, g in df.groupby("ID"):
        # sort + remove duplicate timestamps
        g = g.sort_values("dt_key").drop_duplicates("dt_key").reset_index(drop=True)

        # Need seq_len history + 1 future point
        if len(g) < seq_len + 1:
            continue

        # Precompute step distances (km)
        lats = g["lat"].to_numpy(dtype=np.float32)
        lons = g["lon"].to_numpy(dtype=np.float32)
        step_km = np.zeros(len(g), dtype=np.float32)
        for i in range(1, len(g)):
            step_km[i] = haversine_km(float(lats[i-1]), float(lons[i-1]),
                                     float(lats[i]), float(lons[i]))

        # Sliding window: predict wind at t+1 from [t-seq_len+1 .. t]
        for t in range(seq_len - 1, len(g) - 1):
            hist = g.iloc[t-(seq_len-1):t+1]   # seq_len rows
            nxt = g.iloc[t+1]

            # Basin one-hot (from last timestep in hist)
            basin_key = str(hist["BASIN"].iloc[-1])
            if basin_key not in basin_vocab:
                basin_key = "UNK"
            basin_oh = one_hot(basin_vocab[basin_key], len(basin_vocab))

            X = []
            for i in range(seq_len):
                row = hist.iloc[i]

                num = row[numeric_in_cols].to_numpy(dtype=np.float32)

                st = str(row["Status"])
                if st not in status_vocab:
                    st = "UNK"
                st_oh = one_hot(status_vocab[st], len(status_vocab))

                feat = np.concatenate([num, basin_oh, st_oh]).astype(np.float32)
                X.append(feat)

            X = np.stack(X, axis=0)  # (T, F)
            y = float(nxt["Maximum Wind"])

            if not np.isfinite(y):
                continue

            meta = {
                "storm_id": storm_id,
                "pressure_hist": hist["Minimum Pressure"].to_numpy(dtype=np.float32),
                "step_km_hist": step_km[t-(seq_len-1):t+1].copy(),
            }

            examples.append(SeqExample(x=X, y=y, meta=meta))

    return examples, feature_names


In [None]:
examples, feature_names = build_examples(df, seq_len=6)
print("Examples:", len(examples))
print("Num features:", len(feature_names))

if examples:
    print("X shape:", examples[0].x.shape, "y:", examples[0].y)


##**Block 5: Dataset + scaling**

**Block 5: Dataset class (with optional scaler)**

In [None]:
from sklearn.preprocessing import StandardScaler
from torch.utils.data import Dataset, DataLoader
import torch
import numpy as np
from typing import List, Optional

class HurricaneSeqDataset(Dataset):
    def __init__(self, examples: List[SeqExample], scaler: Optional[StandardScaler] = None, fit_scaler: bool = False):
        """
        examples: list of SeqExample where ex.x is (T,F)
        scaler: StandardScaler instance to apply
        fit_scaler: if True, fit scaler using these examples (use ONLY on training set)
        """
        self.examples = examples
        self.scaler = scaler

        if self.scaler is not None and fit_scaler:
            # Fit on ALL timesteps from ALL sequences in training set
            X_all = np.concatenate([ex.x.reshape(-1, ex.x.shape[-1]) for ex in examples], axis=0)
            self.scaler.fit(X_all)

    def __len__(self):
        return len(self.examples)

    def __getitem__(self, idx):
        ex = self.examples[idx]
        x = ex.x  # (T, F)

        if self.scaler is not None:
            T, F = x.shape
            x = self.scaler.transform(x.reshape(-1, F)).reshape(T, F).astype(np.float32)

        y = np.float32(ex.y)

        return torch.from_numpy(x), torch.tensor(y), ex.meta


**Block 5B: Collate function (keeps metas as list of dicts)**

In [None]:
def collate_batch(batch):
    xs, ys, metas = zip(*batch)   # tuples length B
    xs = torch.stack(xs, dim=0)   # (B, T, F)
    ys = torch.stack(ys, dim=0)   # (B,)
    return xs, ys, list(metas)


**Block 5C: Train/val split + scaler fit on train only**

In [None]:
from sklearn.model_selection import train_test_split
import numpy as np

# Split indices
idx = np.arange(len(examples))
train_idx, val_idx = train_test_split(idx, test_size=0.2, random_state=42, shuffle=True)

train_examples = [examples[i] for i in train_idx]
val_examples   = [examples[i] for i in val_idx]

# Create scaler and FIT only on training data
scaler = StandardScaler()
train_ds = HurricaneSeqDataset(train_examples, scaler=scaler, fit_scaler=True)

# Reuse the same fitted scaler for val
val_ds = HurricaneSeqDataset(val_examples, scaler=scaler, fit_scaler=False)


**Block 5D: DataLoaders**

In [None]:
train_loader = DataLoader(
    train_ds,
    batch_size=256,
    shuffle=True,
    num_workers=0,
    collate_fn=collate_batch
)

val_loader = DataLoader(
    val_ds,
    batch_size=256,
    shuffle=False,
    num_workers=0,
    collate_fn=collate_batch
)


**Block 5E: Quick sanity check**

In [None]:
x_batch, y_batch, metas = next(iter(train_loader))
print("x_batch:", x_batch.shape)   # (B, T, F)
print("y_batch:", y_batch.shape)   # (B,)
print("meta keys:", metas[0].keys())


That’s Block 5 done.

If you want, I can also show a variant where:

1. numeric features are scaled, but one-hot (basin/status) are left unscaled, which some people prefer.

##**Block 6: Neural model (GRU)**

In [None]:
import torch
import torch.nn as nn

class GRURegressor(nn.Module):
    def __init__(self, input_dim: int, hidden_dim: int = 128, num_layers: int = 1, dropout: float = 0.0):
        super().__init__()
        self.gru = nn.GRU(
            input_size=input_dim,
            hidden_size=hidden_dim,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0.0
        )
        self.head = nn.Sequential(
            nn.LayerNorm(hidden_dim),
            nn.Linear(hidden_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 1)
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # x: (B, T, F)
        out, _ = self.gru(x)          # out: (B, T, H)
        h_last = out[:, -1, :]        # (B, H)
        y_pred = self.head(h_last).squeeze(-1)  # (B,)
        return y_pred


##**Block 7: Symbolic rules (soft constraints)**

Uses the meta you already create in build_examples():

pressure_hist

step_km_hist

In [None]:
import numpy as np
import torch

def symbolic_penalty(
    y_pred: torch.Tensor,   # (B,)
    metas: list,            # list of dicts, length B
    lambda_nonneg: float = 0.01,
    lambda_pressure: float = 0.05,
    lambda_motion: float = 0.02,
) -> torch.Tensor:

    device = y_pred.device

    # Rule 1: wind should be non-negative
    nonneg = torch.relu(-y_pred).mean()

    # Rule 2: low pressure should not imply extremely low wind (rough heuristic baseline)
    a, b = 200.0, 0.12
    baseline_list = []
    for m in metas:
        p_last = float(m["pressure_hist"][-1]) if "pressure_hist" in m else 0.0
        if (not np.isfinite(p_last)) or p_last <= 0:
            base = 0.0
        else:
            base = max(0.0, a - b * p_last)
        baseline_list.append(base)

    baseline = torch.tensor(baseline_list, dtype=torch.float32, device=device)
    pressure_pen = torch.relu((baseline - y_pred) - 5.0).mean()   # 5 kt slack

    # Rule 3: motion sanity (penalize big jumps in history)
    motion_viol = []
    for m in metas:
        step = m.get("step_km_hist", [])
        mx = float(np.max(step)) if len(step) else 0.0
        motion_viol.append(max(0.0, mx - 400.0))  # km threshold

    motion_pen = torch.tensor(motion_viol, dtype=torch.float32, device=device).mean() / 400.0

    return (
        lambda_nonneg * nonneg
        + lambda_pressure * pressure_pen
        + lambda_motion * motion_pen
    )


##**Block 8: Training loop (neural loss + symbolic loss)**

In [None]:
import numpy as np
import torch
import torch.nn as nn

def train_model(
    model: nn.Module,
    train_loader,
    val_loader,
    epochs: int = 8,
    lr: float = 1e-3,
    lambda_sym: float = 1.0,
    device: str | None = None
):
    if device is None:
        device = "cuda" if torch.cuda.is_available() else "cpu"

    model.to(device)
    opt = torch.optim.AdamW(model.parameters(), lr=lr)
    loss_fn = nn.SmoothL1Loss()

    for ep in range(1, epochs + 1):
        # ---- Train ----
        model.train()
        tr_total, tr_neural, tr_sym = [], [], []

        for x, y, metas in train_loader:
            x = x.to(device)
            y = y.to(device)

            y_pred = model(x)

            neural_loss = loss_fn(y_pred, y)
            sym_loss = symbolic_penalty(y_pred, metas)
            loss = neural_loss + lambda_sym * sym_loss

            opt.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            opt.step()

            tr_total.append(loss.item())
            tr_neural.append(neural_loss.item())
            tr_sym.append(sym_loss.item())

        # ---- Val ----
        model.eval()
        va_total, va_neural, va_sym = [], [], []

        with torch.no_grad():
            for x, y, metas in val_loader:
                x = x.to(device)
                y = y.to(device)

                y_pred = model(x)

                neural_loss = loss_fn(y_pred, y)
                sym_loss = symbolic_penalty(y_pred, metas)
                loss = neural_loss + lambda_sym * sym_loss

                va_total.append(loss.item())
                va_neural.append(neural_loss.item())
                va_sym.append(sym_loss.item())

        print(
            f"Epoch {ep:02d} | "
            f"train={np.mean(tr_total):.4f} (neural={np.mean(tr_neural):.4f}, sym={np.mean(tr_sym):.4f}) | "
            f"val={np.mean(va_total):.4f} (neural={np.mean(va_neural):.4f}, sym={np.mean(va_sym):.4f})"
        )

    return model


Connect them (same style as before)

In [None]:
input_dim = examples[0].x.shape[-1]
model = GRURegressor(input_dim=input_dim, hidden_dim=128)

model = train_model(
    model,
    train_loader=train_loader,
    val_loader=val_loader,
    epochs=8,
    lr=1e-3,
    lambda_sym=1.0
)


##**Block 9: Run everything (end-to-end)t**

**Block 9A: Build examples (from cleaned df)**

In [None]:
SEQ_LEN = 6

examples, feature_names = build_examples(df, seq_len=SEQ_LEN)

print("Examples:", len(examples))
print("Features per timestep:", len(feature_names))

if len(examples) == 0:
    raise RuntimeError("No training examples created. Fix data preprocessing first.")


**Block 9B: Train / validation split + loaders**

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

idx = np.arange(len(examples))
train_idx, val_idx = train_test_split(idx, test_size=0.2, random_state=42, shuffle=True)

train_examples = [examples[i] for i in train_idx]
val_examples   = [examples[i] for i in val_idx]

scaler = StandardScaler()

train_ds = HurricaneSeqDataset(
    train_examples,
    scaler=scaler,
    fit_scaler=True
)

val_ds = HurricaneSeqDataset(
    val_examples,
    scaler=scaler,
    fit_scaler=False
)

train_loader = DataLoader(
    train_ds,
    batch_size=256,
    shuffle=True,
    collate_fn=collate_batch
)

val_loader = DataLoader(
    val_ds,
    batch_size=256,
    shuffle=False,
    collate_fn=collate_batch
)


**Block 9C: Initialize model**

In [None]:
input_dim = train_examples[0].x.shape[-1]

model = GRURegressor(
    input_dim=input_dim,
    hidden_dim=128,
    num_layers=1
)


**Block 9D: Train model (neural + symbolic)**

In [None]:
model = train_model(
    model,
    train_loader=train_loader,
    val_loader=val_loader,
    epochs=8,
    lr=1e-3,
    lambda_sym=1.0
)


**Block 9E: Quick sanity prediction**

In [None]:
model.eval()

x_batch, y_batch, metas = next(iter(val_loader))
device = "cuda" if torch.cuda.is_available() else "cpu"

with torch.no_grad():
    preds = model(x_batch.to(device)).cpu().numpy()

print("True wind (first 10):", y_batch[:10].numpy())
print("Pred wind (first 10):", preds[:10])


In [None]:
import numpy as np

y_true = y_batch.cpu().numpy() if hasattr(y_batch, "cpu") else np.array(y_batch)
y_pred = np.array(preds)

mae = np.mean(np.abs(y_pred - y_true))
rmse = np.sqrt(np.mean((y_pred - y_true) ** 2))

# R²
ss_res = np.sum((y_true - y_pred) ** 2)
ss_tot = np.sum((y_true - np.mean(y_true)) ** 2)
r2 = 1 - ss_res / ss_tot

print(f"MAE  : {mae:.2f} kt")
print(f"RMSE : {rmse:.2f} kt")
print(f"R²   : {r2:.3f}")


**Block B: Accuracy within ±X knots**

In [None]:
def accuracy_within(y_true, y_pred, tol):
    return np.mean(np.abs(y_pred - y_true) <= tol)

for tol in [5, 10, 15]:
    acc = accuracy_within(y_true, y_pred, tol)
    print(f"Accuracy ±{tol} kt: {acc*100:.2f}%")


In [None]:
def wind_to_category(w):
    if w < 34: return 0      # TD
    if w < 64: return 1      # TS
    if w < 83: return 2      # Cat 1
    if w < 96: return 3      # Cat 2
    if w < 113: return 4     # Cat 3
    if w < 137: return 5     # Cat 4
    return 6                 # Cat 5

y_true_cat = np.array([wind_to_category(w) for w in y_true])
y_pred_cat = np.array([wind_to_category(w) for w in y_pred])

cat_acc = np.mean(y_true_cat == y_pred_cat)
print(f"Category accuracy: {cat_acc*100:.2f}%")


##**Graphs**

**True vs Predicted (scatter)**

In [None]:
import matplotlib.pyplot as plt
import numpy as np

y_true = y_batch.cpu().numpy() if hasattr(y_batch, "cpu") else np.array(y_batch)
y_pred = np.array(preds)

plt.figure()
plt.scatter(y_true, y_pred, s=12)
plt.xlabel("True Maximum Wind")
plt.ylabel("Predicted Maximum Wind")
plt.title("True vs Predicted Wind")

# y = x reference line
mn = min(y_true.min(), y_pred.min())
mx = max(y_true.max(), y_pred.max())
plt.plot([mn, mx], [mn, mx])

plt.show()


**First 50 samples (line plot)**

In [None]:
import matplotlib.pyplot as plt
import numpy as np

y_true = y_batch.cpu().numpy() if hasattr(y_batch, "cpu") else np.array(y_batch)
y_pred = np.array(preds)

n = 50
plt.figure()
plt.plot(y_true[:n], label="True")
plt.plot(y_pred[:n], label="Predicted")
plt.xlabel("Sample index (in batch)")
plt.ylabel("Maximum Wind")
plt.title("Batch Slice: True vs Predicted")
plt.legend()
plt.show()


**Absolute Error Distribution (Histogram)**

In [None]:
import matplotlib.pyplot as plt
import numpy as np

abs_error = np.abs(y_pred - y_true)

plt.figure()
plt.hist(abs_error, bins=30)
plt.xlabel("Absolute Error (kt)")
plt.ylabel("Frequency")
plt.title("Absolute Error Distribution")
plt.show()


In [None]:
plt.figure()
plt.scatter(y_true, abs_error, s=12)
plt.xlabel("True Maximum Wind (kt)")
plt.ylabel("Absolute Error (kt)")
plt.title("Error vs True Wind Speed")
plt.show()


In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

def wind_to_category(w):
    if w < 34: return 0
    if w < 64: return 1
    if w < 83: return 2
    if w < 96: return 3
    if w < 113: return 4
    if w < 137: return 5
    return 6

y_true_cat = np.array([wind_to_category(w) for w in y_true])
y_pred_cat = np.array([wind_to_category(w) for w in y_pred])

cm = confusion_matrix(y_true_cat, y_pred_cat)
disp = ConfusionMatrixDisplay(cm)
disp.plot()
plt.title("Wind Category Confusion Matrix")
plt.show()


##'**Compare Neural vs Neuro-Symbolic curves**

**1A) Neural-only model (baseline)**

In [None]:
model_neural = GRURegressor(input_dim=input_dim, hidden_dim=128)

model_neural = train_model(
    model_neural,
    train_loader=train_loader,
    val_loader=val_loader,
    epochs=8,
    lr=1e-3,
    lambda_sym=0.0   # <<< IMPORTANT: no symbolic loss
)



**1B) Neuro-symbolic model**

In [None]:
model_neurosym = GRURegressor(input_dim=input_dim, hidden_dim=128)

model_neurosym = train_model(
    model_neurosym,
    train_loader=train_loader,
    val_loader=val_loader,
    epochs=8,
    lr=1e-3,
    lambda_sym=1.0   # <<< symbolic constraints ON
)


**Step 2: Get predictions from BOTH models**

In [None]:
import numpy as np
import torch

def collect_predictions(model, loader):
    model.eval()
    preds, trues = [], []

    device = "cuda" if torch.cuda.is_available() else "cpu"
    model.to(device)

    with torch.no_grad():
        for x, y, metas in loader:
            x = x.to(device)
            y_hat = model(x).cpu().numpy()
            preds.append(y_hat)
            trues.append(y.numpy())

    return np.concatenate(trues), np.concatenate(preds)

y_true, y_pred_neural   = collect_predictions(model_neural, val_loader)
_,      y_pred_neurosym = collect_predictions(model_neurosym, val_loader)


**Step 3: Accuracy-within-tolerance CURVE (most important)**

In [None]:
import matplotlib.pyplot as plt

abs_err_neural   = np.abs(y_pred_neural - y_true)
abs_err_neurosym = np.abs(y_pred_neurosym - y_true)

tols = np.arange(0, 31, 1)

acc_neural   = [np.mean(abs_err_neural   <= t) for t in tols]
acc_neurosym = [np.mean(abs_err_neurosym <= t) for t in tols]

plt.figure()
plt.plot(tols, acc_neural,   label="Neural only")
plt.plot(tols, acc_neurosym, label="Neuro-symbolic")
plt.xlabel("Tolerance (± kt)")
plt.ylabel("Accuracy")
plt.title("Accuracy vs Tolerance: Neural vs Neuro-Symbolic")
plt.legend()
plt.show()

**Step 5: Residual comparison (bias check)**

In [None]:
plt.figure()
plt.scatter(y_true, y_pred_neural - y_true, s=10, label="Neural only")
plt.scatter(y_true, y_pred_neurosym - y_true, s=10, label="Neuro-symbolic")
plt.axhline(0)
plt.xlabel("True Wind (kt)")
plt.ylabel("Residual (Pred − True)")
plt.title("Residuals: Neural vs Neuro-Symbolic")
plt.legend()
plt.show()


In [None]:
def summarize(y_true, y_pred, name):
    mae = np.mean(np.abs(y_pred - y_true))
    rmse = np.sqrt(np.mean((y_pred - y_true)**2))
    acc10 = np.mean(np.abs(y_pred - y_true) <= 10)
    print(f"{name}: MAE={mae:.2f}, RMSE={rmse:.2f}, Acc±10kt={acc10*100:.2f}%")

summarize(y_true, y_pred_neural,   "Neural only")
summarize(y_true, y_pred_neurosym, "Neuro-symbolic")


##**GIS Block: Tracks + Risk Grid Map (Atlantic/Pacific)**

**1) Install + imports**

In [None]:
!pip install geopandas shapely folium

import numpy as np
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point, LineString, box
import folium


**2) Load Kaggle files + parse lat/lon**

In [None]:
def parse_latlon(v):
    # handles formats like '28.0N', '94.8W'
    if pd.isna(v):
        return np.nan
    s = str(v).strip()
    if len(s) < 2:
        return np.nan
    hemi = s[-1].upper()
    try:
        val = float(s[:-1])
    except:
        return np.nan
    if hemi in ("S", "W"):
        val = -val
    return val

def load_kaggle_hurdat(atl_path: str, pac_path: str) -> pd.DataFrame:
    atl = pd.read_csv(atl_path)
    pac = pd.read_csv(pac_path)
    atl["BASIN"] = "ATL"
    pac["BASIN"] = "PAC"
    df = pd.concat([atl, pac], ignore_index=True)

    df = df.replace(-999, np.nan)

    df["lat"] = df["Latitude"].apply(parse_latlon)
    df["lon"] = df["Longitude"].apply(parse_latlon)

    df["Date"] = pd.to_numeric(df["Date"], errors="coerce")
    df["Time"] = pd.to_numeric(df["Time"], errors="coerce")
    df["dt_key"] = df["Date"] * 10000 + df["Time"]

    df["Maximum Wind"] = pd.to_numeric(df["Maximum Wind"], errors="coerce")

    # keep usable rows
    df["BASIN"] = df["BASIN"].fillna("UNK").astype(str).str.strip()
    df["ID"] = df["ID"].astype(str).str.strip()
    df = df.dropna(subset=["ID", "dt_key", "lat", "lon", "Maximum Wind"]).reset_index(drop=True)
    return df


**3) Convert to GeoDataFrame (GIS points)**

In [None]:
def to_points_gdf(df: pd.DataFrame) -> gpd.GeoDataFrame:
    d = df.copy()
    d["geometry"] = [Point(float(x), float(y)) for x, y in zip(d["lon"], d["lat"])]
    return gpd.GeoDataFrame(d, geometry="geometry", crs="EPSG:4326")


**4) Build track lines (GIS polylines per storm)**

In [None]:
def build_track_lines(points_gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
    rows = []
    for storm_id, g in points_gdf.sort_values("dt_key").groupby("ID"):
        if len(g) < 2:
            continue
        line = LineString(list(g.geometry.values))
        rows.append({
            "ID": storm_id,
            "BASIN": str(g["BASIN"].iloc[0]),
            "n_points": int(len(g)),
            "max_wind": float(np.nanmax(g["Maximum Wind"].values)),
            "geometry": line
        })
    return gpd.GeoDataFrame(rows, geometry="geometry", crs="EPSG:4326")


**5) Build a GIS “risk grid” (max wind per 1° cell)**

In [None]:
def make_risk_grid(points_gdf: gpd.GeoDataFrame, cell_deg: float = 1.0) -> gpd.GeoDataFrame:
    d = points_gdf.copy()

    # cell id by flooring lon/lat to grid
    d["gx"] = np.floor(d.geometry.x / cell_deg) * cell_deg
    d["gy"] = np.floor(d.geometry.y / cell_deg) * cell_deg

    agg = d.groupby(["gx", "gy"], as_index=False)["Maximum Wind"].max()
    polys = [box(x, y, x + cell_deg, y + cell_deg) for x, y in zip(agg["gx"], agg["gy"])]

    grid = gpd.GeoDataFrame(agg, geometry=polys, crs="EPSG:4326")
    grid = grid.rename(columns={"Maximum Wind": "wind_max"})
    return grid


**6) Interactive GIS map (Folium) + save HTML**

In [None]:
def folium_gis_map(points_gdf, lines_gdf, grid_gdf, out_html="hurdat_gis_map.html",
                   max_tracks=200, max_points=5000):

    center = [float(points_gdf["lat"].mean()), float(points_gdf["lon"].mean())]
    m = folium.Map(location=center, zoom_start=3, tiles="CartoDB positron")

    # Risk grid layer
    if grid_gdf is not None and len(grid_gdf) > 0:
        folium.GeoJson(
            grid_gdf.to_json(),
            name="Risk Grid (max wind)",
            style_function=lambda feat: {"weight": 0.2, "fillOpacity": 0.35},
            tooltip=folium.GeoJsonTooltip(fields=["wind_max"], aliases=["Max wind:"])
        ).add_to(m)

    # Tracks layer (subset)
    if lines_gdf is not None and len(lines_gdf) > 0:
        sample_lines = lines_gdf.sample(min(len(lines_gdf), max_tracks), random_state=42)
        for _, r in sample_lines.iterrows():
            coords = [(y, x) for x, y in list(r.geometry.coords)]  # lat,lon
            folium.PolyLine(coords, weight=2, opacity=0.7,
                            tooltip=f"ID={r['ID']} | Basin={r['BASIN']} | MaxWind={r['max_wind']}").add_to(m)

    # Points layer (subset)
    sample_pts = points_gdf.sample(min(len(points_gdf), max_points), random_state=42)
    for _, r in sample_pts.iterrows():
        folium.CircleMarker(
            location=[float(r["lat"]), float(r["lon"])],
            radius=2,
            tooltip=f"Wind={float(r['Maximum Wind'])}",
            fill=True,
            fill_opacity=0.7
        ).add_to(m)

    folium.LayerControl().add_to(m)
    m.save(out_html)
    return m, out_html


In [None]:
def wind_category(w):
    if w < 34:
        return "TD"
    elif w < 64:
        return "TS"
    elif w < 83:
        return "C1"
    elif w < 96:
        return "C2"
    elif w < 113:
        return "C3"
    elif w < 137:
        return "C4"
    else:
        return "C5"


def wind_color(w):
    if w < 34:
        return "blue"
    elif w < 64:
        return "green"
    elif w < 83:
        return "yellow"
    elif w < 96:
        return "orange"
    elif w < 113:
        return "red"
    elif w < 137:
        return "darkred"
    else:
        return "purple"


In [None]:
df_gis = load_kaggle_hurdat("/content/atlantic.csv", "/content/pacific.csv")
points_gdf = to_points_gdf(df_gis)
lines_gdf = build_track_lines(points_gdf)

#Add wind type + color to GeoDataFrame
points_gdf["wind_type"] = points_gdf["Maximum Wind"].apply(wind_category)
points_gdf["wind_color"] = points_gdf["Maximum Wind"].apply(wind_color)

In [None]:
def folium_wind_category_map(points_gdf, lines_gdf, out_html="wind_category_map.html",
                             max_points=6000, max_tracks=200):

    center = [float(points_gdf["lat"].mean()), float(points_gdf["lon"].mean())]
    m = folium.Map(location=center, zoom_start=3, tiles="CartoDB positron")

    # ---- Storm tracks (neutral color) ----
    if lines_gdf is not None:
        for _, r in lines_gdf.sample(min(len(lines_gdf), max_tracks), random_state=42).iterrows():
            coords = [(y, x) for x, y in list(r.geometry.coords)]
            folium.PolyLine(
                coords,
                color="black",
                weight=2,
                opacity=0.6,
                tooltip=f"Storm ID: {r['ID']}"
            ).add_to(m)

    # ---- Colored wind points ----
    pts = points_gdf.sample(min(len(points_gdf), max_points), random_state=42)
    for _, r in pts.iterrows():
        folium.CircleMarker(
            location=[float(r["lat"]), float(r["lon"])],
            radius=3,
            color=r["wind_color"],
            fill=True,
            fill_color=r["wind_color"],
            fill_opacity=0.8,
            tooltip=f"Wind: {int(r['Maximum Wind'])} kt ({r['wind_type']})"
        ).add_to(m)

    # ---- Legend (HTML) ----
    legend_html = """
    <div style="
        position: fixed;
        bottom: 50px; left: 50px; width: 210px;
        background-color: white; z-index:9999;
        padding: 10px; border: 2px solid grey;
        font-size: 13px;">
    <b>Wind Categories</b><br>
    <i style="color:blue">●</i> TD (&lt;34 kt)<br>
    <i style="color:green">●</i> TS (34–63 kt)<br>
    <i style="color:yellow">●</i> Cat 1 (64–82 kt)<br>
    <i style="color:orange">●</i> Cat 2 (83–95 kt)<br>
    <i style="color:red">●</i> Cat 3 (96–112 kt)<br>
    <i style="color:darkred">●</i> Cat 4 (113–136 kt)<br>
    <i style="color:purple">●</i> Cat 5 (≥137 kt)
    </div>
    """
    m.get_root().html.add_child(folium.Element(legend_html))

    m.save(out_html)
    return m, out_html


In [None]:
m, html_file = folium_wind_category_map(
    points_gdf,
    lines_gdf,
    out_html="hurricane_wind_categories.html"
)

print("Saved GIS wind-category map to:", html_file)
m


In [None]:
AREAS = {
    "Bay of Bengal":  {"lat_min": 5,  "lat_max": 25, "lon_min": 75,  "lon_max": 100},
    "Arabian Sea":    {"lat_min": 5,  "lat_max": 25, "lon_min": 55,  "lon_max": 75},
    "India":          {"lat_min": 6,  "lat_max": 36, "lon_min": 68,  "lon_max": 98},
    "USA East Coast": {"lat_min": 20, "lat_max": 50, "lon_min": -90, "lon_max": -60},
    "Caribbean":      {"lat_min": 10, "lat_max": 30, "lon_min": -90, "lon_max": -60},
}


In [None]:
def filter_by_bbox(points_gdf, lat_min, lat_max, lon_min, lon_max):
    return points_gdf[
        (points_gdf["lat"] >= lat_min) & (points_gdf["lat"] <= lat_max) &
        (points_gdf["lon"] >= lon_min) & (points_gdf["lon"] <= lon_max)
    ].copy()


In [None]:
import folium

def area_map(points_gdf, area_name="USA East Coast", out_html="area_map.html", max_points=8000):
    area = AREAS[area_name]

    sub = filter_by_bbox(points_gdf, **area)
    if len(sub) == 0:
        raise ValueError(f"No points found in selected area: {area_name}")

    center = [float((area["lat_min"] + area["lat_max"]) / 2),
              float((area["lon_min"] + area["lon_max"]) / 2)]

    m = folium.Map(location=center, zoom_start=5, tiles="CartoDB positron")

    # add points (colored)
    pts = sub.sample(min(len(sub), max_points), random_state=42)
    for _, r in pts.iterrows():
        w = float(r["Maximum Wind"])
        folium.CircleMarker(
            location=[float(r["lat"]), float(r["lon"])],
            radius=3,
            color=wind_color(w),
            fill=True,
            fill_color=wind_color(w),
            fill_opacity=0.85,
            tooltip=f"Wind: {int(w)} kt ({wind_category(w)}) | ID: {r['ID']}"
        ).add_to(m)

    # draw bounding box on map
    bounds = [[area["lat_min"], area["lon_min"]], [area["lat_max"], area["lon_max"]]]
    folium.Rectangle(bounds=bounds, color="black", fill=False).add_to(m)

    m.save(out_html)
    return m, out_html


In [None]:
m, html_file = area_map(points_gdf, area_name="USA East Coast", out_html="usa_east_coast_map.html")
print("Saved:", html_file)
m


**Carribean**

In [None]:
CARIBBEAN = {
    "lat_min": 10,
    "lat_max": 30,
    "lon_min": -90,
    "lon_max": -60
}


In [None]:
def wind_category(w):
    if w < 34: return "TD"
    elif w < 64: return "TS"
    elif w < 83: return "C1"
    elif w < 96: return "C2"
    elif w < 113: return "C3"
    elif w < 137: return "C4"
    else: return "C5"

def wind_color(w):
    if w < 34: return "blue"
    elif w < 64: return "green"
    elif w < 83: return "yellow"
    elif w < 96: return "orange"
    elif w < 113: return "red"
    elif w < 137: return "darkred"
    else: return "purple"


In [None]:
from IPython.display import display, HTML
import folium

def show_folium_2x2(maps, titles=None, width="48%", height=360):
    """
    maps: list of 4 folium.Map objects
    titles: list of 4 strings (optional)
    """
    if titles is None:
        titles = ["Map 1", "Map 2", "Map 3", "Map 4"]

    html_blocks = []
    for m, t in zip(maps, titles):
        m_html = m.get_root().render()
        block = f"""
        <div style="width:{width}; padding:6px; box-sizing:border-box;">
            <div style="font-weight:600; margin:4px 0 6px 2px;">{t}</div>
            <div style="border:1px solid #ddd; border-radius:6px; overflow:hidden;">
                {m_html}
            </div>
        </div>
        """
        html_blocks.append(block)

    grid_html = f"""
    <div style="display:flex; flex-wrap:wrap; justify-content:space-between;">
        {''.join(html_blocks)}
    </div>
    <style>
      .folium-map {{ height: {height}px !important; }}
    </style>
    """

    display(HTML(grid_html))

# --- Start of moved code from r6Ojbix4Eej6 to define Atlantic Region maps ---
# --- Atlantic bounding box (Caribbean + Gulf + MDR) ---
ATLANTIC = {
    "lat_min": 5,
    "lat_max": 40,
    "lon_min": -100,
    "lon_max": -10
}

# --- Wind category ---
def wind_category(w):
    if w < 34: return "TD"
    elif w < 64: return "TS"
    elif w < 83: return "C1"
    elif w < 96: return "C2"
    elif w < 113: return "C3"
    elif w < 137: return "C4"
    else: return "C5"

def wind_color(cat):
    return {"TD":"blue", "TS":"green", "C1":"yellow", "C2":"orange"}.get(cat, "gray")

# --- Filter Atlantic points ---
atl_pts = points_gdf[
    (points_gdf["lat"] >= ATLANTIC["lat_min"]) &
    (points_gdf["lat"] <= ATLANTIC["lat_max"]) &
    (points_gdf["lon"] >= ATLANTIC["lon_min"]) &
    (points_gdf["lon"] <= ATLANTIC["lon_max"])
].copy()

atl_pts["wind_type"] = atl_pts["Maximum Wind"].apply(lambda x: wind_category(float(x)))

# --- Map builder (one category) ---
def make_atlantic_map(df, category, title, max_points=6000):
    center_lat = (ATLANTIC["lat_min"] + ATLANTIC["lat_max"]) / 2
    center_lon = (ATLANTIC["lon_min"] + ATLANTIC["lon_max"]) / 2

    m = folium.Map(location=[center_lat, center_lon], zoom_start=4, tiles="CartoDB positron")

    sub = df[df["wind_type"] == category].copy()
    print(f"{category} points:", len(sub))

    if len(sub) == 0:
        return m

    sub = sub.sample(min(len(sub), max_points), random_state=42)

    for _, r in sub.iterrows():
        folium.CircleMarker(
            location=[float(r["lat"]), float(r["lon"])],
            radius=3,
            color=wind_color(category),
            fill=True,
            fill_color=wind_color(category),
            fill_opacity=0.85,
            tooltip=f"{title} | Wind {int(r['Maximum Wind'])} kt"
        ).add_to(m)

    return m

# --- Create 4 Atlantic maps ---
map_TD = make_atlantic_map(atl_pts, "TD", "Atlantic: Tropical Depression")
map_TS = make_atlantic_map(atl_pts, "TS", "Atlantic: Tropical Storm")
map_C1 = make_atlantic_map(atl_pts, "C1", "Atlantic: Category 1 Hurricane")
map_C2 = make_atlantic_map(atl_pts, "C2", "Atlantic: Category 2 Hurricane")
# --- End of moved code ---

# --- Show your 4 category maps in a 2×2 grid ---
show_folium_2x2(
    maps=[map_TD, map_TS, map_C1, map_C2],
    titles=["TD (Tropical Depression)", "TS (Tropical Storm)", "C1 (Category 1)", "C2 (Category 2)"],
    height=360
)

**Atlantic Region**

In [None]:
import folium
from IPython.display import display, HTML

# --- Atlantic bounding box (Caribbean + Gulf + MDR) ---
ATLANTIC = {
    "lat_min": 5,
    "lat_max": 40,
    "lon_min": -100,
    "lon_max": -10
}

# --- Wind category ---
def wind_category(w):
    if w < 34: return "TD"
    elif w < 64: return "TS"
    elif w < 83: return "C1"
    elif w < 96: return "C2"
    elif w < 113: return "C3"
    elif w < 137: return "C4"
    else: return "C5"

def wind_color(cat):
    return {"TD":"blue", "TS":"green", "C1":"yellow", "C2":"orange"}.get(cat, "gray")

# --- Filter Atlantic points ---
atl_pts = points_gdf[
    (points_gdf["lat"] >= ATLANTIC["lat_min"]) &
    (points_gdf["lat"] <= ATLANTIC["lat_max"]) &
    (points_gdf["lon"] >= ATLANTIC["lon_min"]) &
    (points_gdf["lon"] <= ATLANTIC["lon_max"])
].copy()

atl_pts["wind_type"] = atl_pts["Maximum Wind"].apply(lambda x: wind_category(float(x)))

# --- Map builder (one category) ---
def make_atlantic_map(df, category, title, max_points=6000):
    center_lat = (ATLANTIC["lat_min"] + ATLANTIC["lat_max"]) / 2
    center_lon = (ATLANTIC["lon_min"] + ATLANTIC["lon_max"]) / 2

    m = folium.Map(location=[center_lat, center_lon], zoom_start=4, tiles="CartoDB positron")

    sub = df[df["wind_type"] == category].copy()
    print(f"{category} points:", len(sub))

    if len(sub) == 0:
        return m

    sub = sub.sample(min(len(sub), max_points), random_state=42)

    for _, r in sub.iterrows():
        folium.CircleMarker(
            location=[float(r["lat"]), float(r["lon"])],
            radius=3,
            color=wind_color(category),
            fill=True,
            fill_color=wind_color(category),
            fill_opacity=0.85,
            tooltip=f"{title} | Wind {int(r['Maximum Wind'])} kt"
        ).add_to(m)

    return m

# --- Create 4 Atlantic maps ---
map_TD = make_atlantic_map(atl_pts, "TD", "Atlantic: Tropical Depression")
map_TS = make_atlantic_map(atl_pts, "TS", "Atlantic: Tropical Storm")
map_C1 = make_atlantic_map(atl_pts, "C1", "Atlantic: Category 1 Hurricane")
map_C2 = make_atlantic_map(atl_pts, "C2", "Atlantic: Category 2 Hurricane")

# --- 2×2 layout display ---
def show_2x2(maps, titles, height=360):
    blocks = ""
    for m, t in zip(maps, titles):
        blocks += f"""
        <div style="width:48%; padding:6px; box-sizing:border-box;">
            <b>{t}</b>
            {m.get_root().render()}
        </div>
        """
    display(HTML(f"""
    <div style="display:flex; flex-wrap:wrap; justify-content:space-between;">
        {blocks}
    </div>
    <style>.folium-map {{ height:{height}px !important; }}</style>
    """))

show_2x2([map_TD, map_TS, map_C1, map_C2], ["TD", "TS", "C1", "C2"], height=360)


**Pacific Region**

In [None]:
PACIFIC = {
    "lat_min": 0,
    "lat_max": 30,
    "lon_min": -140,
    "lon_max": -90
}


In [None]:
import folium
from IPython.display import display, HTML

# --- Wind category ---
def wind_category(w):
    if w < 34: return "TD"
    elif w < 64: return "TS"
    elif w < 83: return "C1"
    elif w < 96: return "C2"
    elif w < 113: return "C3"
    elif w < 137: return "C4"
    else: return "C5"

def wind_color(cat):
    return {"TD":"blue", "TS":"green", "C1":"yellow", "C2":"orange"}.get(cat, "gray")

# --- Filter Pacific points ---
pacific_pts = points_gdf[
    (points_gdf["lat"] >= PACIFIC["lat_min"]) &
    (points_gdf["lat"] <= PACIFIC["lat_max"]) &
    (points_gdf["lon"] >= PACIFIC["lon_min"]) &
    (points_gdf["lon"] <= PACIFIC["lon_max"])
].copy()

pacific_pts["wind_type"] = pacific_pts["Maximum Wind"].apply(lambda x: wind_category(float(x)))

# --- Map builder for one category ---
def make_pacific_map(df, category, title, max_points=6000):
    center_lat = (PACIFIC["lat_min"] + PACIFIC["lat_max"]) / 2
    center_lon = (PACIFIC["lon_min"] + PACIFIC["lon_max"]) / 2

    m = folium.Map(
        location=[center_lat, center_lon],
        zoom_start=4,
        tiles="CartoDB positron"
    )

    sub = df[df["wind_type"] == category]
    print(f"{category} points:", len(sub))

    sub = sub.sample(min(len(sub), max_points), random_state=42)

    for _, r in sub.iterrows():
        folium.CircleMarker(
            location=[float(r["lat"]), float(r["lon"])],
            radius=3,
            color=wind_color(category),
            fill=True,
            fill_color=wind_color(category),
            fill_opacity=0.85,
            tooltip=f"{title} | Wind {int(r['Maximum Wind'])} kt"
        ).add_to(m)

    return m

# --- Create maps ---
map_TD = make_pacific_map(pacific_pts, "TD", "Pacific: Tropical Depression")
map_TS = make_pacific_map(pacific_pts, "TS", "Pacific: Tropical Storm")
map_C1 = make_pacific_map(pacific_pts, "C1", "Pacific: Category 1 Hurricane")
map_C2 = make_pacific_map(pacific_pts, "C2", "Pacific: Category 2 Hurricane")

# --- 2×2 layout ---
def show_2x2(maps, titles, height=360):
    html = ""
    for m, t in zip(maps, titles):
        html += f"""
        <div style="width:48%; padding:6px;">
            <b>{t}</b>
            {m.get_root().render()}
        </div>
        """
    display(HTML(f"""
    <div style="display:flex; flex-wrap:wrap; justify-content:space-between;">
        {html}
    </div>
    <style>.folium-map {{ height:{height}px !important; }}</style>
    """))

show_2x2(
    [map_TD, map_TS, map_C1, map_C2],
    ["TD", "TS", "C1", "C2"]
)
