In [None]:
import numpy as np

# Example: load feature and label arrays
X = np.load("path_to_feature_file.npy")
Y = np.load("path_to_label_file.npy")

# Reshape data into format (N, C, H, W)
X = X.reshape(X.shape[0], 1, X.shape[1], X.shape[2])
Y = Y.reshape(Y.shape[0])


Input Files
Feature file (.npy): A NumPy array storing the input data.
Shape: (N, H, W)

N: number of samples

H, W: spatial dimensions of each sample

Label file (.npy): A NumPy array storing the target values.
Shape: (N,) or (N, 1) depending on the task.

Both files should be created beforehand (e.g., from remote sensing .tif data).

In [None]:
import torch
from torch.utils.data import Dataset

class MyDataset(Dataset):
    def __init__(self, data_x, data_y, split=0):
        """
        Args:
            data_x: feature matrix (numpy array)
            data_y: labels (numpy array)
            split: 0 = training, 1 = validation/test
        """
        self.split = split
        self.x = data_x
        self.y = data_y
        self.x_train, self.x_test = self.x, self.x
        self.y_train, self.y_test = self.y, self.y

    def __getitem__(self, index):
        if self.split == 0:  # training
            x_item, y_item = self.x_train[index], self.y_train[index]
        else:                # validation/test
            x_item, y_item = self.x_test[index], self.y_test[index]

        return torch.tensor(x_item), torch.tensor(y_item)

    def __len__(self):
        return self.x_train.shape[0] if self.split == 0 else self.x_test.shape[0]


In [None]:
import torch.nn as nn
from einops import rearrange, repeat
from einops.layers.torch import Rearrange
from torch import einsum

# --- Helper functions ---
def pair(t):
    return t if isinstance(t, tuple) else (t, t)

# --- Model components ---
class PreNorm(nn.Module):
    def __init__(self, dim, fn):
        super().__init__()
        self.norm = nn.LayerNorm(dim)
        self.fn = fn
    def forward(self, x, **kwargs):
        return self.fn(self.norm(x), **kwargs)

class FeedForward(nn.Module):
    def __init__(self, dim, hidden_dim, dropout=0.):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(dim, hidden_dim),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim, dim), 
            nn.Dropout(dropout)
        )
    def forward(self, x):
        return self.net(x)

class Attention(nn.Module):
    def __init__(self, dim, heads=8, dim_head=64, dropout=0.):
        super().__init__()
        inner_dim = dim_head * heads
        self.heads = heads
        self.scale = dim_head ** -0.5

        self.attend = nn.Softmax(dim=-1)
        self.to_qkv = nn.Linear(dim, inner_dim * 3, bias=False)
        self.to_out = nn.Sequential(
            nn.Linear(inner_dim, dim),
            nn.Dropout(dropout),
        )

    def forward(self, x):
        b, n, _, h = *x.shape, self.heads
        q, k, v = self.to_qkv(x).chunk(3, dim=-1)
        q, k, v = map(lambda t: rearrange(t, 'b n (h d) -> b h n d', h=h), (q, k, v))

        dots = einsum('b h i d, b h j d -> b h i j', q, k) * self.scale
        attn = self.attend(dots)

        out = einsum('b h i j, b h j d -> b h i d', attn, v)
        out = rearrange(out, 'b h n d -> b n (h d)')
        return self.to_out(out)

class Transformer(nn.Module):
    def __init__(self, dim, depth, heads, dim_head, mlp_dim, dropout=0.):
        super().__init__()
        self.layers = nn.ModuleList([
            nn.ModuleList([
                PreNorm(dim, Attention(dim, heads=heads, dim_head=dim_head, dropout=dropout)),
                PreNorm(dim, FeedForward(dim, mlp_dim, dropout=dropout))
            ]) for _ in range(depth)
        ])
    def forward(self, x):
        for attn, ff in self.layers:
            x = attn(x) + x
            x = ff(x) + x
        return x

class ViT(nn.Module):
    def __init__(self, *, image_size, patch_size, num_classes, dim, depth, heads, mlp_dim,
                 pool='cls', channels=1, dim_head=64, dropout=0., emb_dropout=0.):
        super().__init__()
        image_height, image_width = pair(image_size)
        patch_height, patch_width = pair(patch_size)

        num_patches = (image_height // patch_height) * (image_width // patch_width)
        patch_dim = channels * patch_height * patch_width
        assert pool in {'cls', 'mean'}

        self.to_patch_embedding = nn.Sequential(
            Rearrange('b c (h p1) (w p2) -> b (h w) (p1 p2 c)', p1=patch_height, p2=patch_width),
            nn.Linear(patch_dim, dim)
        )

        self.pos_embedding = nn.Parameter(torch.randn(1, num_patches + 1, dim))
        self.cls_token = nn.Parameter(torch.randn(1, 1, dim))
        self.dropout = nn.Dropout(emb_dropout)

        self.transformer = Transformer(dim, depth, heads, dim_head, mlp_dim, dropout)
        self.pool = pool
        self.to_latent = nn.Identity()
        self.mlp_head = nn.Sequential(
            nn.LayerNorm(dim),
            nn.Linear(dim, num_classes)
        )

    def forward(self, img):
        x = self.to_patch_embedding(img)
        b, n, _ = x.shape

        cls_tokens = repeat(self.cls_token, '() n d -> b n d', b=b)
        x = torch.cat((cls_tokens, x), dim=1)
        x += self.pos_embedding[:, :(n + 1)]
        x = self.dropout(x)

        x = self.transformer(x)
        x = x.mean(dim=1) if self.pool == 'mean' else x[:, 0]
        return self.mlp_head(x)


In [None]:
from torch.utils.data import DataLoader
import torch.optim as optim
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split

# Split into train/test
x_train, x_test, y_train, y_test = train_test_split(X, Y, test_size=0.30, random_state=42)

train_dataset = MyDataset(x_train, y_train, split=0)
val_dataset   = MyDataset(x_test,  y_test,  split=1)

train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
val_loader   = DataLoader(val_dataset,   batch_size=128, shuffle=False)

# Initialize model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = ViT(
    image_size = 12,
    patch_size = 6,
    num_classes = 1,
    dim = 64,
    depth = 6,
    heads = 6,
    mlp_dim = 64,
    dropout = 0.1,
    emb_dropout = 0.1
).to(device)

criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
train_loss, val_loss = [], []
epochs = 10

for epoch in range(epochs):
    model.train()
    running_loss = 0.0
    for inputs, labels in train_loader:
        inputs, labels = inputs.float().to(device), labels.float().to(device)
        labels = labels.view(-1, 1)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    train_loss.append(running_loss / len(train_loader))

    # validation
    model.eval()
    running_loss = 0.0
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.float().to(device), labels.float().to(device)
            labels = labels.view(-1, 1)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            running_loss += loss.item()
    val_loss.append(running_loss / len(val_loader))

    print(f"Epoch {epoch+1}/{epochs}, Train Loss: {train_loss[-1]:.6f}, Val Loss: {val_loss[-1]:.6f}")

# Plot loss curves
plt.plot(train_loss, label="Train")
plt.plot(val_loss, label="Validation")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()
plt.show()


In [None]:
# Save model
torch.save(model, "vit_model.pth")

# Example: predictions on test set
predictions = []
model.eval()
with torch.no_grad():
    for inputs, _ in val_loader:
        inputs = inputs.float().to(device)
        outputs = model(inputs)
        predictions.extend(outputs.cpu().numpy())

predictions = np.array(predictions)
print("Predictions shape:", predictions.shape)


In [None]:
# ============================================================
# 2. Function: Normalized Difference Calculation
# ============================================================
def calculate_difference(interferogram1, interferogram2, chunk_size=1024):
    """
    Compute the normalized difference between two interferograms
    with block-wise processing to reduce memory usage.
    
    Args:
        interferogram1 (np.ndarray): First interferogram (2D array).
        interferogram2 (np.ndarray): Second interferogram (2D array).
        chunk_size (int): Block size for processing to avoid memory overflow.
        
    Returns:
        np.ndarray: Difference map (float32), with NaN for invalid pixels.
    """
    if interferogram1.shape != interferogram2.shape:
        raise ValueError("Both interferograms must have the same shape.")
    
    rows, cols = interferogram1.shape
    difference = np.zeros((rows, cols), dtype=np.float32)

    chunk_rows = max(1, min(chunk_size, rows))
    chunk_cols = max(1, min(chunk_size, cols))

    for i in range(0, rows, chunk_rows):
        for j in range(0, cols, chunk_cols):
            end_i = min(i + chunk_rows, rows)
            end_j = min(j + chunk_cols, cols)

            if end_i <= i or end_j <= j:
                continue

            chunk1 = interferogram1[i:end_i, j:end_j]
            chunk2 = interferogram2[i:end_i, j:end_j]

            epsilon = 1e-8
            denominator = chunk1 + chunk2 + epsilon
            valid_mask = denominator != 0

            diff_chunk = np.full_like(chunk1, np.nan, dtype=np.float32)
            diff_chunk[valid_mask] = (chunk1[valid_mask] - chunk2[valid_mask]) / denominator[valid_mask]

            difference[i:end_i, j:end_j] = diff_chunk

    mask = np.isnan(interferogram1) | np.isnan(interferogram2) | (interferogram1 == 0) | (interferogram2 == 0)
    difference[mask] = np.nan

    return difference


In [None]:
# ============================================================
# 3. Input Data Description
# ============================================================

# Input files:
# - assumed co-disaster value (predicted by the LSTM model)
# - geninue co-disaster value (observed data)

# Both should be saved as `.npy` files with shape (H, W),
# where each element represents the interferometric phase
# at pixel (i, j).

# Example file structure:
# base_dir/
#   ├── geninue.npy     (geninue co-disaster value)
#   ├── predicted.npy (predicted co-disaster value)


In [None]:
# ============================================================
# 4. Load Input Data
# ============================================================

base_dir = "/your/path/to/"   # <-- Replace with your path

file1_path = os.path.join(base_dir, "geninue.npy")   # ground truth
file2_path = os.path.join(base_dir, "predicted.npy")  # model prediction

ifg_true = np.load(file1_path)
ifg_pred = np.load(file2_path)

print(f"Loaded file: {file1_path}, shape: {ifg_true.shape}")
print(f"Loaded file: {file2_path}, shape: {ifg_pred.shape}")


In [None]:
# ============================================================
# 5. Compute Normalized Difference Score
# ============================================================

phase_score = calculate_difference(ifg_true, ifg_pred, chunk_size=512)

# Apply masking for NaN/zeros
phase_score = np.where(np.isnan(ifg_true), np.nan,
                      np.where(ifg_true == 0, 0, phase_score))
phase_score = np.where(np.isnan(ifg_pred), np.nan,
                      np.where(ifg_pred == 0, 0, phase_score))

print("Score map computed successfully.")


In [None]:
# ============================================================
# 6. Save Results
# ============================================================

output_filename = "score.npy"
output_path = os.path.join(base_dir, output_filename)

np.save(output_path, phase_score)
print(f"Result saved as {output_filename}")


In [None]:
# ============================================================
# 7. Visualization Example
# ============================================================

plt.figure(figsize=(15,5))
plt.subplot(1,3,1)
plt.imshow(ifg_true, cmap="viridis")
plt.title("Geninue Co-Disaster Value")
plt.colorbar()

plt.subplot(1,3,2)
plt.imshow(ifg_pred, cmap="viridis")
plt.title("Predicted Co-Disaster Value")
plt.colorbar()

plt.subplot(1,3,3)
plt.imshow(phase_score, cmap="bwr", vmin=-1, vmax=1)
plt.title("Normalized Difference Score")
plt.colorbar()

plt.tight_layout()
plt.show()
