In [1]:
import numpy as np
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset
from PIL import Image

In [2]:
# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [5]:
class ImportOwnDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.images = []
        self.labels = []
        self.classes = os.listdir(root_dir)
        self.class_to_idx = {cls: idx for idx, cls in enumerate(self.classes)}

        for cls in self.classes:
            class_dir = os.path.join(root_dir, cls)
            for img_name in os.listdir(class_dir):
                img_path = os.path.join(class_dir, img_name)
                try:
                    img = Image.open(img_path)
                    # Check if the image is not truncated
                    img.verify()
                    self.images.append(img_path)
                    self.labels.append(self.class_to_idx[cls])
                except Exception as e:
                    print(f"Skipping {img_path}: {e}")

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img_path = self.images[idx]
        label = self.labels[idx]
        img = Image.open(img_path).convert("RGB")  # Ensure RGB mode
        if self.transform:
            img = self.transform(img)
        return img, label

In [6]:
# Define transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize images to a fixed size
    transforms.ToTensor()
])

In [None]:
# Load datasets
train_dataset = ImportOwnDataset(root_dir='../Dataset/images/train', transform=transform)
val_dataset = ImportOwnDataset(root_dir='../Dataset/images/validation', transform=transform)


In [None]:
# Data loaders
train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True).to(device)
val_loader = DataLoader(val_dataset, batch_size=128).to(device)

In [None]:
# Autoencoder model
class Autoencoder(nn.Module):
    def __init__(self, encoding_dim):
        super(Autoencoder, self).__init__()
        self.encoder = nn.Linear(3 * 224 * 224, encoding_dim)
        self.decoder = nn.Linear(encoding_dim, 3 * 224 * 224)

    def forward(self, x):
        x = x.view(x.size(0), -1)
        x = F.relu(self.encoder(x))
        x = torch.sigmoid(self.decoder(x))
        x = x.view(x.size(0), 3, 224, 224)
        return x

In [None]:
# Initialize model and send to device
encoding_dim = 32
autoencoder = Autoencoder(encoding_dim).to(device)


In [None]:
# Check if CUDA (GPU) is available
if torch.cuda.is_available():
    # Move model to GPU
    autoencoder.cuda()

    # Move data tensors to GPU
    #data_tensor = data_tensor.cuda()

    # Check the device of model parameters
    for name, param in autoencoder.named_parameters():
        print(f"Parameter {name} is on device: {param.device}")

    # Check the device of data tensor
    print(f"loader_train is on device: {train_loader.device}")
    print(f"loader_val is on device: {val_loader.device}")

else:
    print("CUDA (GPU) is not available.")


In [None]:
# Loss function and optimizer
criterion = nn.MSELoss()
optimizer = optim.Adam(autoencoder.parameters(), lr=0.001)


In [None]:
from datetime import datetime
current_datetime = datetime.now()
date_time = str(current_datetime)[:-7].replace('-','').replace(':','').replace(' ','_')
model_name = 'Autoencoder'


In [None]:
def output_log_writer(s, end='\r'):
    with open(f'model_result/log/output_{model_name}_{date_time}.txt', 'a') as output_file:
        output_file.write(s+'\n')
        print(s,end,flush=True)


In [None]:
# Training loop
num_epochs = 10
for epoch in range(num_epochs):
    output_log_writer(f'-------------------------------[Epoch {epoch+1}]---------------------------------')
    output_log_writer(f'[Epoch {epoch+1}] Training...', end='')
    autoencoder.train()
    running_loss = 0.0
    for images, _ in train_loader:
        print('=', end='')
        images = images.to(device)
        optimizer.zero_grad()
        outputs = autoencoder(images)
        
        output_log_writer(f'[Epoch {epoch+1}] Computing Train Loss...')
        loss = criterion(outputs, images)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    print(f"Epoch {epoch+1}, Loss: {running_loss / len(train_loader)}")
    
    # save model checkpoint
    ckpt_save_path = f'model_result/model_checkpoint/MODEL_CKPT_{epoch}_{model_name}_{date_time}.pt'
    torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': losses[-2],
            }, ckpt_save_path)
    output_log_writer(f'[Epoch {epoch+1}] Model checkpoint is saved.')

# Save model
torch.save(autoencoder, f'model_result/model_pth/MODEL_{model_name}_{date_time}.pth')
print('Autoencoder training complete.')
