In [None]:
# Block 1: Imports and Data Loading

import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Masking, LSTM, Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.preprocessing.sequence import pad_sequences
import matplotlib.pyplot as plt
import seaborn as sns
import math
import os
from sklearn.model_selection import StratifiedShuffleSplit
from sklearn.metrics import (accuracy_score, precision_score, recall_score,
                             f1_score, confusion_matrix, classification_report)

from google.colab import drive
drive.mount('/content/drive')

# Set matplotlib font sizes
plt.rcParams.update({
    'font.size': 14,
    'axes.titlesize': 18,
    'axes.labelsize': 16,
    'xtick.labelsize': 14,
    'ytick.labelsize': 14,
    'legend.fontsize': 14
})

# Load CSV from Google Drive
csv_path = "/content/drive/MyDrive/Colab Notebooks/Dataset.csv"
df = pd.read_csv(csv_path)
print("DataFrame loaded. Shape:", df.shape)
df.head()

In [None]:
# Block 2: Preparing Data – Grouping by Pixel

non_feature_cols = ["Pixel_ID", "GDD", "Label"]
feature_cols = [c for c in df.columns if c not in non_feature_cols]

# Group by Pixel_ID and sort each group by GDD
X_list = []   # List of 2D arrays, each array is (time_steps, num_features)
y_list = []   # Label for each pixel (0 for Infected, 1 for Non-infected)
pixel_ids = []  # Store pixel IDs

for pixel_id, group in df.groupby("Pixel_ID", sort=False):
    group_sorted = group.sort_values("GDD")
    label_str = group_sorted["Label"].iloc[0]
    label_num = 0 if label_str == "Infected" else 1
    X_seq = group_sorted[feature_cols].to_numpy(dtype=np.float32)
    X_list.append(X_seq)
    y_list.append(label_num)
    pixel_ids.append(pixel_id)

y_list = np.array(y_list, dtype=np.int32)
print("Number of pixel sequences:", len(X_list))

In [None]:
# Block 3: Stratified Train/Val/Test Split (One Run)

def train_val_test_split(X, y, train_ratio=0.65, val_ratio=0.15, test_ratio=0.20):
    # First split: train vs. temp (train_ratio vs. remaining)
    sss1 = StratifiedShuffleSplit(n_splits=1, test_size=(1 - train_ratio), random_state=42)
    for train_idx, temp_idx in sss1.split(np.zeros(len(y)), y):
        pass
    # Second split: split temp into validation and test
    X_temp = [X[i] for i in temp_idx]
    y_temp = y[temp_idx]
    relative_val = val_ratio / (val_ratio + test_ratio)
    sss2 = StratifiedShuffleSplit(n_splits=1, test_size=(1 - relative_val), random_state=42)
    for val_idx, test_idx in sss2.split(np.zeros(len(y_temp)), y_temp):
        pass
    # Map temp indices back to original indices
    val_idx = np.array(temp_idx)[val_idx]
    test_idx = np.array(temp_idx)[test_idx]
    return train_idx, val_idx, test_idx

tr_idx, va_idx, te_idx = train_val_test_split(X_list, y_list, 0.65, 0.15, 0.20)
print(f"Train: {len(tr_idx)}, Val: {len(va_idx)}, Test: {len(te_idx)}")

In [None]:
# Block 3: LSTM Model Definition
def build_lstm_model(num_features, lstm_units1=64, lstm_units2=32, dropout_rate=0.2, dense_units=32, learning_rate=1e-3):
    model = Sequential()

    # Input: variable-length sequences with num_features features
    model.add(Masking(mask_value=0.0, input_shape=(None, num_features)))

    # First LSTM layer with lstm_units1 and return sequences for stacking
    model.add(LSTM(lstm_units1, return_sequences=True))
    model.add(Dropout(dropout_rate))

    # Second LSTM layer with lstm_units2
    model.add(LSTM(lstm_units2))
    model.add(Dropout(dropout_rate))

    # Dense layer for further processing
    model.add(Dense(dense_units, activation='relu'))

    # Final output layer for binary classification with sigmoid activation
    model.add(Dense(1, activation='sigmoid'))

    # Compile the model
    model.compile(loss='binary_crossentropy',
                  optimizer=Adam(learning_rate=learning_rate),
                  metrics=['accuracy'])
    return model

# Test the model summary using the number of features from feature_cols
model_test = build_lstm_model(num_features=len(feature_cols))
model_test.summary()

In [None]:
# Block 4: Training Loop, Model Saving, and Plotting

# Directories for saving models and plots
model_save_dir = "/content/drive/MyDrive/Colab Notebooks/models"
os.makedirs(model_save_dir, exist_ok=True)
plots_dir = "/content/drive/MyDrive/Colab Notebooks/plots"
os.makedirs(plots_dir, exist_ok=True)

num_epochs = 100
batch_size = 128

# Extract training, validation, and test sequences and labels
X_train_seqs = [X_list[i] for i in tr_idx]
y_train = y_list[tr_idx]
X_val_seqs = [X_list[i] for i in va_idx]
y_val = y_list[va_idx]
X_test_seqs = [X_list[i] for i in te_idx]
y_test = y_list[te_idx]

# Determine the maximum sequence length from the training set
max_len_train = max(seq.shape[0] for seq in X_train_seqs)
print(f"Max training sequence length: {max_len_train}")

# Pad sequences (pad with zeros at the end)
X_train_padded = pad_sequences(X_train_seqs, maxlen=max_len_train, dtype='float32', padding='post')
X_val_padded = pad_sequences(X_val_seqs, maxlen=max_len_train, dtype='float32', padding='post')
X_test_padded = pad_sequences(X_test_seqs, maxlen=max_len_train, dtype='float32', padding='post')

# Build the model using the architecture
model = build_lstm_model(num_features=len(feature_cols), lstm_units1=64, lstm_units2=32, dropout_rate=0.2, dense_units=32, learning_rate=1e-3)

# Set up callbacks: ModelCheckpoint to save best model and EarlyStopping
model_filename = os.path.join(model_save_dir, "model_run1.h5")
checkpoint_cb = ModelCheckpoint(model_filename, monitor='val_loss', save_best_only=True, verbose=1)
earlystop_cb = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

# Train the model
history = model.fit(
    X_train_padded, y_train,
    validation_data=(X_val_padded, y_val),
    epochs=num_epochs,
    batch_size=batch_size,
    callbacks=[checkpoint_cb, earlystop_cb],
    verbose=1
)

# Plot training history: Loss and Accuracy
plt.figure(figsize=(12, 5))

# Loss plot
plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], marker='o', label="Train Loss")
plt.plot(history.history['val_loss'], marker='o', label="Val Loss")
plt.title("Training and Validation Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend()

# Accuracy plot
plt.subplot(1, 2, 2)
plt.plot(history.history['accuracy'], marker='o', label="Train Accuracy")
plt.plot(history.history['val_accuracy'], marker='o', label="Val Accuracy")
plt.title("Training and Validation Accuracy")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.legend()

plt.tight_layout()
plot_path = os.path.join(plots_dir, "training_run1.png")
plt.savefig(plot_path)
plt.show()

# Evaluate on test set
test_loss, test_acc = model.evaluate(X_test_padded, y_test, verbose=0)
print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}")

# Predict on test set
y_pred_prob = model.predict(X_test_padded)
y_pred = (y_pred_prob >= 0.5).astype(int).reshape(-1)

# Calculate evaluation metrics
acc = accuracy_score(y_test, y_pred)
prec = precision_score(y_test, y_pred)
rec = recall_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
cm = confusion_matrix(y_test, y_pred)
report = classification_report(y_test, y_pred)

print("Test Set Metrics:")
print("Accuracy:", acc)
print("Precision:", prec)
print("Recall:", rec)
print("F1 Score:", f1)
print("Confusion Matrix:\n", cm)
print("Classification Report:\n", report)

In [None]:
# Block 5: LSTM Feature Importance

def permutation_importance(model, X, y, baseline_score, feature_names, n_repeats=5):
    """
    Compute permutation feature importance.

    Parameters:
      model: Trained Keras model.
      X: Test data (padded sequences), shape (n_samples, time_steps, n_features).
      y: True labels for the test set.
      baseline_score: Accuracy score of the model on X (without shuffling).
      feature_names: List of feature names corresponding to the last axis of X.
      n_repeats: Number of times to repeat the shuffling for averaging.

    Returns:
      A dictionary mapping feature names to the average drop in accuracy.
    """
    importances = {name: [] for name in feature_names}

    for _ in range(n_repeats):
        for j, feature in enumerate(feature_names):
            X_permuted = np.copy(X)
            # Permute the entire time-series for feature j across samples
            permuted_indices = np.random.permutation(X.shape[0])
            X_permuted[:, :, j] = X_permuted[permuted_indices, :, j]
            score = model.evaluate(X_permuted, y, verbose=0)[1]  # use accuracy as metric
            importance = baseline_score - score
            importances[feature].append(importance)

    # Average the importance scores over the repeats
    avg_importances = {feature: np.mean(imp_list) for feature, imp_list in importances.items()}
    return avg_importances

# Compute baseline accuracy on test data
baseline_score = model.evaluate(X_test_padded, y_test, verbose=0)[1]

# Get feature importances (the higher the drop, the more important the feature)
feature_importances = permutation_importance(model, X_test_padded, y_test, baseline_score, feature_cols, n_repeats=5)

print("Permutation Feature Importances (average drop in accuracy):")
for feature, importance in feature_importances.items():
    print(f"{feature}: {importance:.4f}")

In [None]:
# Block 6: KDE Histogram on Most Important Features

# Read CSV file
csv_path = "/content/drive/MyDrive/Colab Notebooks/Dataset.csv"
df = pd.read_csv(csv_path)

# Filter rows where Adjusted GDD == 0 (correspond to the vegetation density peak stage)
df_gdd0 = df[df["GDD"] == 0]

# Define the features for which we need KDE plots
features = ["NDMI", "CCC", "FAPAR", "CHL-RED-EDGE"]

# Color mapping (Non-infected = darker, Infected = lighter)
color_mapping = {
    "NDMI": ("#0D3B66", "#66A5AD"),       # Non-infected: dark blue; Infected: light blue
    "CCC": ("#004D40", "#26A69A"),        # Non-infected: dark teal; Infected: light teal
    "FAPAR": ("#4A148C", "#BA68C8"),      # Non-infected: dark purple; Infected: light purple
    "CHL-RED-EDGE": ("#006400", "#90EE90")  # Non-infected: dark green; Infected: light green
}

# Loop over each feature and create a KDE plot
for feature in features:
    plt.figure(figsize=(12, 8))

    # KDE for Non-infected (Healthy) samples, now labeled as "Non-infected"
    sns.kdeplot(
        data=df_gdd0[df_gdd0["Label"] == "Healthy"],
        x=feature,
        bw_adjust=3,
        fill=True,
        common_norm=False,
        alpha=0.7,
        linewidth=2,
        color=color_mapping[feature][0],
        label="Non-infected"
    )

    # KDE for Infected samples
    sns.kdeplot(
        data=df_gdd0[df_gdd0["Label"] == "Infected"],
        x=feature,
        bw_adjust=3,
        fill=True,
        common_norm=False,
        alpha=0.7,
        linewidth=2,
        color=color_mapping[feature][1],
        label="Infected"
    )

    plt.title(f"KDE Plot for {feature} (Vegetation Density Peak Stage)", fontsize=20)
    plt.xlabel(feature, fontsize=16)
    plt.ylabel("Density", fontsize=16)
    plt.legend(fontsize=14)
    plt.tight_layout()

    # Save the figure with 300 dpi and transparent background
    plt.savefig(f"{feature}_kde.png", dpi=300, transparent=True)
    plt.show()