In [13]:
import os
import shutil
import random

source_dir = "Preprocessed_Data"
target_dir = "Split_Data"
splits = ['train', 'val', 'test']
split_ratios = {'train': 0.6, 'val': 0.2, 'test': 0.2}

# For each time folder (e.g., "0 hr", "24 hr", ...)
for time_folder in os.listdir(source_dir):
    time_path = os.path.join(source_dir, time_folder)
    if not os.path.isdir(time_path):
        continue

    # For each condition folder inside each time step
    for cond_folder in os.listdir(time_path):
        cond_path = os.path.join(time_path, cond_folder)
        if not os.path.isdir(cond_path):
            continue

        # List and shuffle images
        images = [f for f in os.listdir(cond_path) if f.lower().endswith(('.jpg', '.jpeg', '.png'))]
        random.seed(42)
        random.shuffle(images)

        # Calculate split indices
        n = len(images)
        n_train = int(split_ratios['train'] * n)
        n_val = int(split_ratios['val'] * n)
        n_test = n - n_train - n_val

        split_files = {
            'train': images[:n_train],
            'val': images[n_train:n_train + n_val],
            'test': images[n_train + n_val:]
        }

        # Copy files to target structure
        for split in splits:
            dest_folder = os.path.join(target_dir, split, time_folder, cond_folder)
            os.makedirs(dest_folder, exist_ok=True)

            for img in split_files[split]:
                src_img_path = os.path.join(cond_path, img)
                dest_img_path = os.path.join(dest_folder, img)
                shutil.copy2(src_img_path, dest_img_path)

print("✅ Split complete: Each split contains all folders from all time steps, with different images.")


✅ Split complete: Each split contains all folders from all time steps, with different images.


In [1]:
import os
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from PIL import Image

In [3]:
class HydrogelSequenceDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform

        self.samples = {}  # condition: list of (time, path)
        
        # Iterate over the time folders
        time_folders = sorted(os.listdir(root_dir), key=lambda x: int(x.split()[0]))

        for time_folder in time_folders:
            t = int(time_folder.split()[0].replace("hr", "").strip())  # Get the time (e.g., 0, 5)
            time_path = os.path.join(root_dir, time_folder)
            
            # Iterate over condition folders inside each time folder
            for cond_folder in os.listdir(time_path):
                cond_path = os.path.join(time_path, cond_folder)
                if not os.path.isdir(cond_path):
                    continue

                # Make the condition key based on the folder name
                key = cond_folder.strip().lower()
                
                # Add images to the corresponding condition
                for img_file in os.listdir(cond_path):
                    if img_file.endswith(('.JPG', '.jpeg', '.png')):
                        path = os.path.join(cond_path, img_file)
                        if key not in self.samples:
                            self.samples[key] = []
                        self.samples[key].append((t, path))

        # Create a list of sequences (sorted by time)
        self.sequence_data = []
        for cond, lst in self.samples.items():
            lst.sort()  # sort by time
            imgs = [path for _, path in lst]
            self.sequence_data.append((cond, imgs))

    def __len__(self):
        return len(self.sequence_data)

    def __getitem__(self, idx):
        cond, paths = self.sequence_data[idx]
        imgs = []
        for path in paths:
            img = Image.open(path).convert("RGB")
            if self.transform:
                img = self.transform(img)
            imgs.append(img)

        images = torch.stack(imgs)  # shape: [seq_len, 3, H, W]

        # extract labels from cond
        parts = cond.split()
        pH = float(parts[0].replace("ph", ""))  # Example: "pH5" -> 5.0
        remaining_time = 140 - (len(images) - 1) * 5  # Assuming 5hr steps for remaining time

        return images, torch.tensor(pH), torch.tensor(remaining_time)

In [4]:
class ResNetLSTM(nn.Module):
    def __init__(self, hidden_dim=128, num_layers=1):
        super().__init__()

        resnet = models.resnet18(pretrained=True)
        self.feature_extractor = nn.Sequential(*list(resnet.children())[:-1])  # remove final FC
        self.feature_dim = resnet.fc.in_features

        self.lstm = nn.LSTM(self.feature_dim, hidden_dim, num_layers, batch_first=True)

        self.ph_head = nn.Linear(hidden_dim, 1)      # Predict pH
        self.time_head = nn.Linear(hidden_dim, 1)    # Predict remaining time

    def forward(self, x_seq):  # x_seq: [B, T, 3, H, W]
        B, T, C, H, W = x_seq.size()
        x_seq = x_seq.view(B * T, C, H, W)

        # Extract features for each frame using ResNet
        with torch.no_grad():  # Freeze ResNet during LSTM training (optional)
            features = self.feature_extractor(x_seq)  # [B*T, feat_dim, 1, 1]
            features = features.view(B, T, -1)        # [B, T, feat_dim]

        # Feed sequence of features into LSTM
        lstm_out, _ = self.lstm(features)             # [B, T, hidden_dim]

        # Take the last time step's output
        last_hidden = lstm_out[:, -1, :]              # [B, hidden_dim]

        # Predict pH and time-to-degradation
        ph_pred = self.ph_head(last_hidden)           # [B, 1]
        time_pred = self.time_head(last_hidden)       # [B, 1]

        return ph_pred.squeeze(1), time_pred.squeeze(1)

In [14]:
from torchvision import transforms

transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize to ResNet input size
    transforms.ToTensor(),          # Convert PIL Image to tensor
    transforms.Normalize(           # Normalize as per ImageNet
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    )
])

train_dataset = HydrogelSequenceDataset("Split_Data/train", transform=transform)
val_dataset = HydrogelSequenceDataset("Split_Data/val", transform=transform)
test_dataset = HydrogelSequenceDataset("Split_Data/test", transform=transform)

train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=4, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=4, shuffle=False)

In [15]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader

# Set device: use GPU if available, otherwise fall back to CPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Initialize model, loss function, and optimizer
model = ResNetLSTM(hidden_dim=128, num_layers=1).to(device)  # Move model to device

criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Set the number of epochs for training
num_epochs = 10

for epoch in range(num_epochs):
    model.train()  # Training mode
    running_loss = 0.0  # Track loss

    # Iterate through the training data
    for images, ph_labels, time_labels in train_loader:
        images = images.to(device)  # Move data to GPU or CPU
        ph_labels = ph_labels.to(device)
        time_labels = time_labels.to(device)

        optimizer.zero_grad()  # Zero gradients

        # Forward pass
        ph_pred, time_pred = model(images)

        # Calculate losses for pH and time
        ph_loss = criterion(ph_pred, ph_labels)
        time_loss = criterion(time_pred, time_labels)
        loss = ph_loss + time_loss

        # Backward pass
        loss.backward()
        optimizer.step()  # Update model weights

        running_loss += loss.item()

    # Print average loss for this epoch
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader)}")

# Optional: Save the trained model
torch.save(model.state_dict(), 'resnet_lstm_model.pth')



RuntimeError: stack expects each tensor to be equal size, but got [294, 3, 224, 224] at entry 0 and [287, 3, 224, 224] at entry 1

In [18]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pad_sequence

# Device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# ======== Custom collate function for LSTM ========
def collate_fn_lstm(batch):
    sequences, ph_labels, time_labels = zip(*batch)

    # Stack image sequences [seq_len, 3, 224, 224]
    # sequences = [torch.stack(seq) for seq in sequences]  # List of tensors

    # Pad sequences to same length
    padded_seqs = pad_sequence(sequences, batch_first=True)  # [B, T, 3, 224, 224]

    # Labels
    ph_labels = torch.tensor(ph_labels, dtype=torch.float32)
    time_labels = torch.tensor(time_labels, dtype=torch.float32)

    return padded_seqs, ph_labels, time_labels

# ======== Loaders (example usage) ========
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True, collate_fn=collate_fn_lstm)
val_loader = DataLoader(val_dataset, batch_size=4, shuffle=False, collate_fn=collate_fn_lstm)
test_loader = DataLoader(test_dataset, batch_size=4, shuffle=False, collate_fn=collate_fn_lstm)

# ======== Model definition example (ResNet + LSTM) ========
class ResNetLSTM(nn.Module):
    def __init__(self, hidden_dim=128, num_layers=1):
        super(ResNetLSTM, self).__init__()
        from torchvision.models import resnet18
        resnet = resnet18(pretrained=True)
        self.cnn = nn.Sequential(*list(resnet.children())[:-1])  # Remove FC
        self.feature_dim = 512  # ResNet18 final feature dim

        self.lstm = nn.LSTM(input_size=self.feature_dim, hidden_size=hidden_dim,
                            num_layers=num_layers, batch_first=True)

        self.fc_ph = nn.Linear(hidden_dim, 1)
        self.fc_time = nn.Linear(hidden_dim, 1)

    def forward(self, x):  # x: [B, T, 3, 224, 224]
        B, T, C, H, W = x.shape
        x = x.view(B * T, C, H, W)
        with torch.no_grad():  # Freeze CNN if desired
            cnn_feats = self.cnn(x).view(B, T, -1)  # [B, T, F]
        lstm_out, _ = self.lstm(cnn_feats)  # [B, T, hidden_dim]
        last_hidden = lstm_out[:, -1, :]  # Use last time step
        ph_pred = self.fc_ph(last_hidden).squeeze(1)
        time_pred = self.fc_time(last_hidden).squeeze(1)
        return ph_pred, time_pred

# ======== Training Setup ========
model = ResNetLSTM(hidden_dim=128, num_layers=1).to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
num_epochs = 20

# ======== Training Loop ========
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0

    for images, ph_labels, time_labels in train_loader:
        images = images.to(device)
        ph_labels = ph_labels.to(device)
        time_labels = time_labels.to(device)

        optimizer.zero_grad()
        ph_pred, time_pred = model(images)

        ph_loss = criterion(ph_pred, ph_labels)
        time_loss = criterion(time_pred, time_labels)
        loss = ph_loss + time_loss

        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    avg_loss = running_loss / len(train_loader)
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}")

# ======== Save the Model ========
torch.save(model.state_dict(), 'resnet_lstm_model.pth')


Epoch [1/20], Loss: 1672259.1250
Epoch [2/20], Loss: 1670240.6250
Epoch [3/20], Loss: 1668567.1250
Epoch [4/20], Loss: 1667194.2500
Epoch [5/20], Loss: 1666039.2500
Epoch [6/20], Loss: 1665021.7500
Epoch [7/20], Loss: 1664086.7500
Epoch [8/20], Loss: 1663196.7500
Epoch [9/20], Loss: 1662329.2500
Epoch [10/20], Loss: 1661487.2500
Epoch [11/20], Loss: 1660671.6250
Epoch [12/20], Loss: 1659863.6250
Epoch [13/20], Loss: 1659062.5000
Epoch [14/20], Loss: 1658294.2500
Epoch [15/20], Loss: 1657542.7500
Epoch [16/20], Loss: 1656789.8750
Epoch [17/20], Loss: 1656037.3750
Epoch [18/20], Loss: 1655320.0000
Epoch [19/20], Loss: 1654662.1250
Epoch [20/20], Loss: 1654060.3750
