## **Homework 3**
### Engineering Image Analysis - Combustion Images
#### Step 1 - Train 2D CNN for Classifying Flames
##### Using PyTorch instead of Tensorflow

In [1]:
import h5py
import numpy as np

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

In [2]:
# Load the data from the .mat file
with h5py.File('combustion_img_13.mat', 'r') as f:
    x_train = np.array(f['train_set_x'])
    y_train = np.array(f['train_set_y'])
    x_test = np.array(f['test_set_x'])
    y_test = np.array(f['test_set_y'])
    x_val = np.array(f['valid_set_x'])
    y_val = np.array(f['valid_set_y'])

In [3]:
# print(x_train.shape)
# print(y_train.shape)
# print(x_test.shape)
# print(y_test.shape)
# print(x_val.shape)
# print(y_val.shape)

In [4]:
class CombustionDataset(Dataset):
    def __init__(self, x, y):
        self.x = x
        self.y = y

    def __len__(self):
        return len(self.x)

    def __getitem__(self, idx):
        return torch.Tensor(self.x[idx]), torch.Tensor([self.y[idx]])

In [5]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 16, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(16, 32, 5)
        self.fc1 = nn.Linear(32 * 122 * 47, 128)
        self.fc2 = nn.Linear(128, 1)

    def forward(self, x):
        x = self.conv1(x)
        x = nn.functional.relu(x)
        x = self.pool(x)
        x = self.conv2(x)
        x = nn.functional.relu(x)
        x = self.pool(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = nn.functional.relu(x)
        x = self.fc2(x)
        x = nn.functional.sigmoid(x)
        return x

In [6]:
# Set the device to use for training
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Create the dataset and data loaders
train_dataset = CombustionDataset(x_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_dataset = CombustionDataset(x_val, y_val)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

# Define the model and optimizer
model = Net()
model.to(device)
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Define the loss function
criterion = nn.BCEWithLogitsLoss()

In [59]:
# Train the model
best_val_acc = 0
num_epochs = 10

for epoch in range(num_epochs):
    train_loss = 0
    train_correct = 0
    model.train()
    for x_batch, y_batch in train_loader:
        x_batch, y_batch = x_batch.to(device), y_batch.to(device)
        x_batch = x_batch.unsqueeze(1) # Add channel dimension
        x_batch = x_batch.transpose(-1, -2) # Swap last two dimensions
        x_batch = x_batch.view(x_batch.size(0), 1, x_batch.size(-2), x_batch.size(-1)) # Reshape input tensor
        optimizer.zero_grad()
        outputs = model(x_batch)
        loss = criterion(outputs, y_batch.float())
        loss.backward()
        optimizer.step()
        train_loss += loss.item() * x_batch.size(0)
        train_correct += (torch.sigmoid(outputs) >= 0.5).sum().item()

    train_loss /= len(train_loader.dataset)
    train_accuracy = train_correct / len(train_loader.dataset)
    print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}')

    val_loss = 0
    val_correct = 0
    model.eval()
    with torch.no_grad():
        for x_batch, y_batch in val_loader:
            x_batch, y_batch = x_batch.to(device), y_batch.to(device)
            x_batch = x_batch.unsqueeze(1) # Add channel dimension
            x_batch = x_batch.transpose(-1, -2) # Swap last two dimensions
            x_batch = x_batch.view(x_batch.size(0), 1, x_batch.size(-2), x_batch.size(-1)) # Reshape input tensor
            outputs = model(x_batch)
            loss = criterion(outputs, y_batch.float())
            val_loss += loss.item() * x_batch.size(0)
            val_correct += (torch.sigmoid(outputs) >= 0.5).sum().item()

    val_loss /= len(val_dataset)
    val_accuracy = val_correct / len(val_dataset)
    print(f'Epoch [{epoch+1}/{num_epochs}], Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}')
    
    if val_accuracy > best_val_acc:
        best_val_acc = val_accuracy
        torch.save(model.state_dict(), 'best_model.pth')


RuntimeError: Calculated padded input size per channel: (54000 x 1). Kernel size: (5 x 5). Kernel size can't be greater than actual input size

In [None]:
print(f'Epoch {epoch+1}/{10}, Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}')

# Save the best model
if val_acc > best_val_acc:
    best_val_acc = val_acc
    torch.save(model.state_dict(), 'best_model.pt')

#### Step 2 - Train an autoencoder to allow a sequential model to operate

In [1]:
import h5py
import numpy as np

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

In [2]:
# Load the data from the .mat file
with h5py.File('combustion_img_13.mat', 'r') as f:
    train_x = np.array(f['train_set_x'])
    train_y = np.array(f['train_set_y'])
    test_x = np.array(f['test_set_x'])
    test_y = np.array(f['test_set_y'])
    val_x = np.array(f['valid_set_x'])
    val_y = np.array(f['valid_set_y'])

In [6]:
class Autoencoder(nn.Module):
    def __init__(self):
        super(Autoencoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1),           
            nn.ReLU(),
        )
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(128, 64, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(64, 32, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(32, 16, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(16, 1, kernel_size=3, stride=1, padding=1),
            nn.Sigmoid()
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x


In [7]:
gpu = True
device = torch.device("cuda:0" if gpu and torch.cuda.is_available() else "cpu")

autoencoder = Autoencoder().to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(autoencoder.parameters(), lr=0.001)

for epoch in range(10):
    train_loss = 0
    for x_batch, _ in train_loader:
        x_batch = x_batch.view(x_batch.size(0), -1, 28, 28).to(device)  # Reshape to 4D tensor
        optimizer.zero_grad()
        outputs = autoencoder(x_batch)
        loss = criterion(outputs, x_batch)
        loss.backward()
        optimizer.step()
        train_loss += loss.item() * x_batch.size(0)

    train_loss /= len(train_dataset)
    print(f'Epoch {epoch+1}/{10}, Train Loss: {train_loss:.4f}')

torch.save(autoencoder.state_dict(), 'autoencoder.pt')


NameError: name 'train_loader' is not defined

In [8]:
#Using autoencoder to extract compressed latents from the combustion images and use them as input to a sequential model:
class SequentialModel(nn.Module):
    def __init__(self):
        super(SequentialModel, self).__init__()
        self.rnn = nn.LSTM(input_size=128, hidden_size=64, num_layers=2, batch_first=True)
        self.fc = nn.Linear(64, 1)

    def forward(self, x):
        x, _ = self.rnn(x)
        x = self.fc(x[:, -1, :])
        return x


In [9]:
autoencoder.load_state_dict(torch.load('autoencoder.pt'))
sequential_model = SequentialModel().to(device)
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(sequential_model.parameters(), lr=0.001)

for epoch in range(10):
    train_loss = 0
    for x_batch, y_batch in train_loader:
        x_batch = x_batch.to(device)
        y_batch = y_batch.to(device)
        with torch.no_grad():
            x_batch = autoencoder.encoder(x_batch)
        optimizer.zero_grad()
        outputs = sequential_model(x_batch)
        loss = criterion(outputs.squeeze(), y_batch.float())
        loss.backward()
        optimizer.step()
        train_loss += loss.item() * x_batch.size(0)

    train_loss /= len(train_dataset)
    print(f'Epoch {epoch+1}/{10}, Train Loss: {train_loss:.4f}')

torch.save(sequential_model.state_dict(), 'sequential_model.pt')


FileNotFoundError: [Errno 2] No such file or directory: 'autoencoder.pt'

In [None]:
autoencoder.load_state_dict(torch.load('autoencoder.pt'))
sequential_model.load_state_dict(torch.load('sequential_model.pt'))
test_loss = 0
y_true = []
y_pred = []
with torch.no_grad():
    for x_batch, y_batch in test_loader:
        x_batch = x_batch.to(device)
        y_batch = y_batch.to(device)
        y_true += y_batch.cpu().tolist()
        x_batch = autoencoder.encoder(x_batch)
        outputs = sequential_model(x_batch)
        loss = criterion(outputs.squeeze(), y_batch.float())
        test_loss += loss.item() * x_batch.size(0)
        y_pred += (torch.sigmoid(outputs) > 0.5).cpu().tolist()

test_loss /= len(test_dataset)
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred)
recall = recall_score(y_true, y_pred)
f1 = f1_score(y_true, y_pred)

print(f'Test Loss: {test_loss:.4f}')
print(f'Accuracy: {accuracy:.4f}')
print(f'Precision: {precision:.4f}')
print(f'Recall: {recall:.4f}')
print(f'F1 Score: {f1:.4f}')
