In [None]:
# ============================================================
# 1. Import Libraries
# ============================================================
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import datetime
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
import pickle


In [None]:
# ============================================================
# 2. Dataset Definition
# ============================================================
class InSARDataset(Dataset):
    """
    Custom dataset for InSAR time series.

    Args:
        data (numpy.ndarray): InSAR cube, shape (H, W, T).
        dates (list[str]): List of interferogram intervals, e.g. ["20150815_20150827", ...].
        window_size (int): Temporal window size. Default: 20.
        is_prediction (bool): If True, builds dataset for forecasting the next step.
    """
    def __init__(self, data, dates, window_size=20, is_prediction=False):
        self.data = data
        self.height, self.width, self.time_steps = data.shape
        self.window_size = window_size
        self.is_prediction = is_prediction

        # ---- Extract temporal features ----
        self.time_features = []
        for date_str in dates:
            start_date, end_date = date_str.split('_')
            start = datetime.datetime.strptime(start_date, '%Y%m%d')
            end = datetime.datetime.strptime(end_date, '%Y%m%d')

            year_sin = np.sin(2 * np.pi * start.year / 2100)
            year_cos = np.cos(2 * np.pi * start.year / 2100)
            month_sin = np.sin(2 * np.pi * start.month / 12)
            month_cos = np.cos(2 * np.pi * start.month / 12)
            day_sin = np.sin(2 * np.pi * start.day / 31)
            day_cos = np.cos(2 * np.pi * start.day / 31)
            interval = (end - start).days

            self.time_features.append([year_sin, year_cos, month_sin, month_cos,
                                       day_sin, day_cos, interval])
        self.time_features = np.array(self.time_features)

        # ---- Normalize each pixel individually ----
        self.scalers = {}
        self.scaled_data = np.zeros_like(self.data, dtype=np.float32)

        for i in range(self.height):
            for j in range(self.width):
                pixel_ts = self.data[i, j, :]
                scaler = MinMaxScaler(feature_range=(-1, 1))
                self.scaled_data[i, j, :] = scaler.fit_transform(pixel_ts.reshape(-1, 1)).flatten()
                self.scalers[(i, j)] = scaler

        # ---- Build sample indices ----
        self.samples = [(i, j) for i in range(self.height) for j in range(self.width)]

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        i, j = self.samples[idx]
        pixel_ts = self.scaled_data[i, j, :]

        if not self.is_prediction:  # Training
            x = pixel_ts[:-1]  # first T-1 steps
            y = pixel_ts[-1]   # last step
            time_feat = self.time_features[:-1]
            target_time_feat = self.time_features[-1]
        else:  # Prediction
            x = pixel_ts
            y = 0.0  # placeholder
            time_feat = self.time_features[:self.time_steps]
            target_time_feat = self.time_features[-1]

        return {
            'pixel_coords': torch.tensor([i, j]),
            'x': torch.tensor(x, dtype=torch.float32),
            'time_features': torch.tensor(time_feat, dtype=torch.float32),
            'target_time_features': torch.tensor(target_time_feat, dtype=torch.float32),
            'y': torch.tensor(y, dtype=torch.float32)
        }


In [None]:
# ============================================================
# 3. LSTM Model
# ============================================================
class InSARLSTM(nn.Module):
    def __init__(self, input_dim=1, hidden_dim=64, num_layers=2, dropout=0.1, time_feat_dim=7):
        super().__init__()
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers

        self.input_embedding = nn.Linear(input_dim, hidden_dim)
        self.time_embedding = nn.Linear(time_feat_dim, hidden_dim)

        self.lstm = nn.LSTM(
            input_size=hidden_dim,
            hidden_size=hidden_dim,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0
        )

        self.target_time_proj = nn.Linear(time_feat_dim, hidden_dim)

        self.output_layer = nn.Sequential(
            nn.Linear(hidden_dim * 2, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1)
        )

    def forward(self, src, time_features, target_time_features):
        batch_size = src.size(0)
        src = src.unsqueeze(-1)  # [B, T, 1]
        src = self.input_embedding(src)  # [B, T, H]
        time_embed = self.time_embedding(time_features)  # [B, T, H]

        combined_input = src + time_embed
        lstm_out, (h_n, c_n) = self.lstm(combined_input)
        seq_repr = h_n[-1]  # last hidden state

        target_time_embed = self.target_time_proj(target_time_features)
        combined = torch.cat([seq_repr, target_time_embed], dim=-1)

        output = self.output_layer(combined)
        return output.squeeze(-1)


In [None]:
# ============================================================
# 4. Training and Prediction Utilities
# ============================================================
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=50, device='cuda'):
    model.to(device)
    best_val_loss = float('inf')

    for epoch in range(num_epochs):
        model.train()
        train_loss = 0.0
        for batch in train_loader:
            x = batch['x'].to(device)
            time_features = batch['time_features'].to(device)
            target_time_features = batch['target_time_features'].to(device)
            y = batch['y'].to(device)

            optimizer.zero_grad()
            outputs = model(x, time_features, target_time_features)
            loss = criterion(outputs, y)
            loss.backward()
            optimizer.step()

            train_loss += loss.item() * x.size(0)
        train_loss /= len(train_loader.dataset)

        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for batch in val_loader:
                x = batch['x'].to(device)
                time_features = batch['time_features'].to(device)
                target_time_features = batch['target_time_features'].to(device)
                y = batch['y'].to(device)

                outputs = model(x, time_features, target_time_features)
                loss = criterion(outputs, y)
                val_loss += loss.item() * x.size(0)
        val_loss /= len(val_loader.dataset)

        print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), 'best_model.pth')

    return model


def predict_future(model, dataset, device='cuda'):
    model.eval()
    predictions = np.zeros((dataset.height, dataset.width))
    dataloader = DataLoader(dataset, batch_size=256, shuffle=False)

    with torch.no_grad():
        for batch in dataloader:
            coords = batch['pixel_coords'].numpy()
            x = batch['x'].to(device)
            time_features = batch['time_features'].to(device)
            target_time_features = batch['target_time_features'].to(device)

            outputs = model(x, time_features, target_time_features)
            for i in range(len(coords)):
                pixel_i, pixel_j = coords[i]
                scaler = dataset.scalers[(pixel_i, pixel_j)]
                predictions[pixel_i, pixel_j] = scaler.inverse_transform(
                    outputs[i].cpu().numpy().reshape(-1, 1))[0, 0]

    return predictions


In [None]:
# ============================================================
# 5. Example Usage
# ============================================================

# ---- Device ----
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# ============================================================
# Load Data and Dates
# ============================================================

# ---- Data ----
# data_path should point to your InSAR data file (.npy format).
# The .npy file must contain a 3D NumPy array of shape (H, W, T):
#   H: image height (number of pixels in vertical dimension)
#   W: image width  (number of pixels in horizontal dimension)
#   T: number of temporal observations (e.g., 20)
#
# Each element data[i, j, t] represents the displacement (or wrapped phase, etc.)
# for pixel (i, j) at time index t.
#
# Example:
# data_path = "/your/path/to/data.npy"
# data = np.load(data_path)

data_path = "/your/path/to/insar_data.npy"   # <-- replace with your file path
data = np.load(data_path)
print(f"Data shape: {data.shape}")   # should be (H, W, T)

# ---- Dates ----
# `dates` must be a list of acquisition intervals in the form "YYYYMMDD_YYYYMMDD".
# Each string represents the start and end acquisition dates of an interferogram.
#
# Example:
# dates = ["20150815_20150827", "20150827_20150920", ... ]
#
# The length of dates should match the temporal dimension T of your data array.

dates = [...]  # <-- replace with your list of interferogram date intervals

# ---- Dataset ----
full_dataset = InSARDataset(data, dates)
train_size = int(0.8 * len(full_dataset))
val_size = len(full_dataset) - train_size
train_dataset, val_dataset = torch.utils.data.random_split(full_dataset, [train_size, val_size])

train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=128, shuffle=False)

# ---- Model ----
model = InSARLSTM(input_dim=1, hidden_dim=64, num_layers=2, dropout=0.1, time_feat_dim=7)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# ---- Train ----
model = train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=15, device=device)

# ---- Load Best Model ----
model.load_state_dict(torch.load('best_model.pth'))

# ---- Predict ----
next_date = "20160821_20160902"
all_dates = dates + [next_date]
predict_dataset = InSARDataset(data, all_dates, is_prediction=True)

future_predictions = predict_future(model, predict_dataset, device)
np.save('future_predictions.npy', future_predictions)

# ---- Visualization ----
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.imshow(data[:, :, -1], cmap='viridis')
plt.colorbar()
plt.title('Last Available Time Point')

plt.subplot(1, 2, 2)
plt.imshow(future_predictions, cmap='viridis')
plt.colorbar()
plt.title('Predicted Future Time Point (LSTM)')

plt.tight_layout()
plt.savefig('prediction.png')
plt.show()


In [None]:
# ============================================================
# 6. Save Dataset for Reuse
# ============================================================
with open('predict_dataset.pkl', 'wb') as f:
    pickle.dump(predict_dataset, f)

print("Prediction dataset saved as 'predict_dataset.pkl'")


In [None]:
# ============================================================
# 1. Import Libraries
# ============================================================
import numpy as np
import os
import matplotlib.pyplot as plt


In [None]:
# ============================================================
# 2. Function: Normalized Difference Calculation
# ============================================================
def calculate_difference(interferogram1, interferogram2, chunk_size=1024):
    """
    Compute the normalized difference between two interferograms
    with block-wise processing to reduce memory usage.
    
    Args:
        interferogram1 (np.ndarray): First interferogram (2D array).
        interferogram2 (np.ndarray): Second interferogram (2D array).
        chunk_size (int): Block size for processing to avoid memory overflow.
        
    Returns:
        np.ndarray: Difference map (float32), with NaN for invalid pixels.
    """
    if interferogram1.shape != interferogram2.shape:
        raise ValueError("Both interferograms must have the same shape.")
    
    rows, cols = interferogram1.shape
    difference = np.zeros((rows, cols), dtype=np.float32)

    chunk_rows = max(1, min(chunk_size, rows))
    chunk_cols = max(1, min(chunk_size, cols))

    for i in range(0, rows, chunk_rows):
        for j in range(0, cols, chunk_cols):
            end_i = min(i + chunk_rows, rows)
            end_j = min(j + chunk_cols, cols)

            if end_i <= i or end_j <= j:
                continue

            chunk1 = interferogram1[i:end_i, j:end_j]
            chunk2 = interferogram2[i:end_i, j:end_j]

            epsilon = 1e-8
            denominator = chunk1 + chunk2 + epsilon
            valid_mask = denominator != 0

            diff_chunk = np.full_like(chunk1, np.nan, dtype=np.float32)
            diff_chunk[valid_mask] = (chunk1[valid_mask] - chunk2[valid_mask]) / denominator[valid_mask]

            difference[i:end_i, j:end_j] = diff_chunk

    mask = np.isnan(interferogram1) | np.isnan(interferogram2) | (interferogram1 == 0) | (interferogram2 == 0)
    difference[mask] = np.nan

    return difference


In [None]:
# ============================================================
# 3. Input Data Description
# ============================================================

# Input files:
# - assumed co-disaster value (predicted by the LSTM model)
# - geninue co-disaster value (observed data)

# Both should be saved as `.npy` files with shape (H, W),
# where each element represents the interferometric phase
# at pixel (i, j).

# Example file structure:
# base_dir/
#   ├── geninue.npy     (geninue co-disaster value)
#   ├── predicted.npy (predicted co-disaster value)


In [None]:
# ============================================================
# 4. Load Input Data
# ============================================================

base_dir = "/your/path/to/"   # <-- Replace with your path

file1_path = os.path.join(base_dir, "geninue.npy")   # ground truth
file2_path = os.path.join(base_dir, "predicted.npy")  # model prediction

ifg_true = np.load(file1_path)
ifg_pred = np.load(file2_path)

print(f"Loaded file: {file1_path}, shape: {ifg_true.shape}")
print(f"Loaded file: {file2_path}, shape: {ifg_pred.shape}")


In [None]:
# ============================================================
# 5. Compute Normalized Difference Score
# ============================================================

phase_score = calculate_difference(ifg_true, ifg_pred, chunk_size=512)

# Apply masking for NaN/zeros
phase_score = np.where(np.isnan(ifg_true), np.nan,
                      np.where(ifg_true == 0, 0, phase_score))
phase_score = np.where(np.isnan(ifg_pred), np.nan,
                      np.where(ifg_pred == 0, 0, phase_score))

print("Score map computed successfully.")


In [None]:
# ============================================================
# 6. Save Results
# ============================================================

output_filename = "score.npy"
output_path = os.path.join(base_dir, output_filename)

np.save(output_path, phase_score)
print(f"Result saved as {output_filename}")


In [None]:
# ============================================================
# 7. Visualization Example
# ============================================================

plt.figure(figsize=(15,5))
plt.subplot(1,3,1)
plt.imshow(ifg_true, cmap="viridis")
plt.title("Geninue Co-Disaster Value")
plt.colorbar()

plt.subplot(1,3,2)
plt.imshow(ifg_pred, cmap="viridis")
plt.title("Predicted Co-Disaster Value")
plt.colorbar()

plt.subplot(1,3,3)
plt.imshow(phase_score, cmap="bwr", vmin=-1, vmax=1)
plt.title("Normalized Difference Score")
plt.colorbar()

plt.tight_layout()
plt.show()
