<a href="https://colab.research.google.com/github/Raoina/Spectra-2-Image/blob/main/notebooks/Models/2D_CNN_CR.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 2D-CNN pipeline (images ready)

In [None]:
# ====== Imports ======
import os, random
import numpy as np
import pandas as pd
from scipy.stats import iqr
from PIL import Image

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, Subset
from torchvision import transforms

from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import KFold, train_test_split
from sklearn.metrics import mean_squared_error, r2_score

In [None]:
# ====== Colab drive mount ======
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# ====== Paths ======
IMG_DIR = "/content/drive/MyDrive/CR_train_65x65"
CSV_PATH = "/content/drive/MyDrive/train_Moi_NDF_Starch.csv"

TARGETS = ['Moi',       'NDF',  'Starch']
IMG_SIZE = 65 #------------------------------------------>16

In [None]:
# ====== Hyperparams ======
batch_size = 128
epochs = 100
lr = 0.0001
n_splits = 5
seed = 42
torch.manual_seed(seed)
np.random.seed(seed)
random.seed(seed)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

Device: cpu


In [None]:
# ====== Load CSV ======
df = pd.read_csv(CSV_PATH)
# df = df.dropna(subset=TARGETS).reset_index(drop=True)
print("Data shape:", df.shape)

Data shape: (784, 3)


In [None]:
# ====== Scale targets ======
y_scaler = MinMaxScaler()
targets_scaled = y_scaler.fit_transform(df[TARGETS].values.astype(np.float32))

In [None]:
# ====== Dataset class (images already ready) ======
class SoilImageDataset(Dataset):
    def __init__(self, df, img_dir, targets_scaled, transform=None):
        self.df = df
        self.img_dir = img_dir
        self.targets = targets_scaled.astype(np.float32)
        self.transform = transform if transform else transforms.ToTensor()

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, f"CUT_{idx}.png")  # assume  0.png, 1.png ...
        img = Image.open(img_path).convert("L").resize((IMG_SIZE, IMG_SIZE))  # grayscale
        img = self.transform(img)  # shape (1,H,W)
        target = torch.from_numpy(self.targets[idx])
        return img, target

transform = transforms.Compose([
    transforms.ToTensor(),  # (H,W) -> (1,H,W), float in [0,1]
])

dataset = SoilImageDataset(df, IMG_DIR, targets_scaled, transform=transform)
print("Dataset length:", len(dataset))

Dataset length: 784


In [None]:
# ====== CNN model (as in Table 2) ======
class CNN2D(nn.Module):
    def __init__(self, in_channels=1, num_outputs=3): # Changed num_outputs to 3
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels, 64, 3, padding=1)
        self.pool = nn.MaxPool2d(2,2)
        self.conv2 = nn.Conv2d(64, 128, 3, padding=1)
        self.conv3a = nn.Conv2d(128, 256, 3, padding=1)
        self.conv3b = nn.Conv2d(256, 256, 3, padding=1)
        self.conv4a = nn.Conv2d(256, 512, 3, padding=1)
        self.conv4b = nn.Conv2d(512, 512, 3, padding=1)
        self.conv5a = nn.Conv2d(512, 512, 3, padding=1)
        self.conv5b = nn.Conv2d(512, 512, 3, padding=1)

        self.flattened = 512 * 2 * 2
        self.fc1 = nn.Linear(self.flattened, 128)
        self.fc2 = nn.Linear(128, num_outputs)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.relu(self.conv1(x)); x = self.pool(x)
        x = self.relu(self.conv2(x)); x = self.pool(x)
        x = self.relu(self.conv3a(x)); x = self.relu(self.conv3b(x)); x = self.pool(x)
        x = self.relu(self.conv4a(x)); x = self.relu(self.conv4b(x)); x = self.pool(x)
        x = self.relu(self.conv5a(x)); x = self.relu(self.conv5b(x)); x = self.pool(x)
        x = x.view(x.size(0), -1)
        x = self.relu(self.fc1(x))
        return self.fc2(x)

In [None]:
# ====== Metrics ======
def compute_metrics_orig(y_true, y_pred):
    results = []
    for i in range(y_true.shape[1]):
        yt, yp = y_true[:, i], y_pred[:, i]
        rmse = np.sqrt(mean_squared_error(yt, yp))
        r2 = r2_score(yt, yp)
        rpiq = float(iqr(yt) / rmse) if rmse > 1e-8 else float("inf")
        results.append({"RMSE": rmse, "R2": r2, "RPIQ": rpiq})
    return results

In [None]:
# ====== Split train/test ======
indices = np.arange(len(dataset))
trainval_idx, test_idx = train_test_split(indices, test_size=0.3, random_state=seed)

In [None]:
# ====== 5-Fold CV ======
kf = KFold(n_splits=n_splits, shuffle=True, random_state=seed)
for fold, (t_idx, v_idx) in enumerate(kf.split(trainval_idx)):
    print(f"\n--- Fold {fold+1}/{n_splits} ---")
    train_loader = DataLoader(Subset(dataset, trainval_idx[t_idx]), batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(Subset(dataset, trainval_idx[v_idx]), batch_size=batch_size, shuffle=False)

    model = CNN2D().to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = nn.MSELoss()

    for epoch in range(epochs):
        model.train()
        for xb, yb in train_loader:
            xb, yb = xb.to(device), yb.to(device)
            optimizer.zero_grad()
            loss = criterion(model(xb), yb)
            loss.backward()
            optimizer.step()
        if (epoch+1) % 10 == 0:
            print(f"Epoch {epoch+1}/{epochs}")

    # Validation
    model.eval()
    y_true, y_pred = [], []
    with torch.no_grad():
        for xb, yb in val_loader:
            preds = model(xb.to(device)).cpu().numpy()
            yb_np = yb.cpu().numpy()
            preds_orig = y_scaler.inverse_transform(preds)
            yb_orig = y_scaler.inverse_transform(yb_np)
            y_true.append(yb_orig); y_pred.append(preds_orig)
    y_true, y_pred = np.vstack(y_true), np.vstack(y_pred)
    metrics = compute_metrics_orig(y_true, y_pred)
    for i, t in enumerate(TARGETS):
        print(f"{t}: RMSE={metrics[i]['RMSE']:.3f}, R2={metrics[i]['R2']:.3f}, RPIQ={metrics[i]['RPIQ']:.3f}")


--- Fold 1/5 ---


In [None]:

# ====== Train final on all trainval + test evaluation ======
final_loader = DataLoader(Subset(dataset, trainval_idx), batch_size=batch_size, shuffle=True)
test_loader = DataLoader(Subset(dataset, test_idx), batch_size=batch_size, shuffle=False)

final_model = CNN2D().to(device)
optimizer = torch.optim.Adam(final_model.parameters(), lr=lr)
criterion = nn.MSELoss()

for epoch in range(epochs):
    final_model.train()
    for xb, yb in final_loader:
        xb, yb = xb.to(device), yb.to(device)
        optimizer.zero_grad()
        loss = criterion(final_model(xb), yb)
        loss.backward()
        optimizer.step()

# Test
final_model.eval()
y_true, y_pred = [], []
with torch.no_grad():
    for xb, yb in test_loader:
        preds = final_model(xb.to(device)).cpu().numpy()
        yb_np = yb.cpu().numpy()
        preds_orig = y_scaler.inverse_transform(preds)
        yb_orig = y_scaler.inverse_transform(yb_np)
        y_true.append(yb_orig); y_pred.append(preds_orig)
y_true, y_pred = np.vstack(y_true), np.vstack(y_pred)
test_metrics = compute_metrics_orig(y_true, y_pred)

print("\n== Test metrics ==")
for i, t in enumerate(TARGETS):
    print(f"{t}: RMSE={test_metrics[i]['RMSE']:.3f}, R2={test_metrics[i]['R2']:.3f}, RPIQ={test_metrics[i]['RPIQ']:.3f}")

torch.save(final_model.state_dict(), "/content/2d_cnn_images_ready.pth")
print("Saved model to /content/2d_cnn_images_ready.pth")

In [None]:
# ====== Average Calibration Metrics ======
# The current code doesn't explicitly have a 'calibration_results' variable.
# Assuming the 'metrics' from the last fold of the CV loop in cell 'QgTF6fYoQGF3'
# are representative of calibration or a final training step before test evaluation.
# I will use the test_metrics from cell 'kYtpCczZQTrS' as the final evaluation metrics.

# Calculate bias for test set
def compute_bias(y_true, y_pred):
    bias = np.mean(y_pred - y_true, axis=0)
    return bias

test_bias = compute_bias(y_true, y_pred)


# Create DataFrame for Test metrics
test_metrics_df = []
for i, col in enumerate(TARGETS):
    test_metrics_df.append({'Property': col, 'RMSE': test_metrics[i]['RMSE'], 'R²': test_metrics[i]['R2'], 'Bias': test_bias[i]})
test_metrics_df = pd.DataFrame(test_metrics_df)
# Reorder columns for Test metrics
test_metrics_df = test_metrics_df[['Property', 'R²', 'RMSE', 'Bias']]

print("\n=== Test Set Metrics ===")
display(test_metrics_df)

# Note: The provided notebook structure performs k-fold cross-validation
# and then a final training on the combined train+validation set followed by test evaluation.
# There isn't a separate "calibration" set or explicit calculation of
# average CV metrics across all folds stored in a single variable like `fold_results`.
# If you need average CV metrics, we would need to modify the CV loop
# to store the metrics from each fold.