In [21]:
import sys
import os

# Add particle_detection to sys.path
sys.path.append(os.path.abspath(".."))

In [25]:
import os
import torch
from sklearn.model_selection import train_test_split
from PIL import Image
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import torch.nn as nn
import numpy as np
import cv2
from particle_detection.autoencoder.model import create_autoencoder

class ImageDataset(Dataset):
    def __init__(self, image_files, data_dir, transform=None):
        """
        Simple Dataset class for loading images.

        :param image_files: List of image file names.
        :param data_dir: Path to the directory containing images.
        :param transform: Transform to apply to the images.
        """
        self.image_files = image_files
        self.data_dir = data_dir
        self.transform = transform

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        image_path = os.path.join(self.data_dir, self.image_files[idx])
        image = Image.open(image_path).convert("RGB")

        if self.transform:
            image = self.transform(image)

        return image


def preprocess_image(image):
    """
    Normalize and convert the image to RGB format.

    :param image: Input PIL image.
    :return: Preprocessed PIL image.
    """
    sample = np.array(image)
    sample = cv2.normalize(sample, None, 0, 255, cv2.NORM_MINMAX).astype(np.uint8)
    processed_image = Image.fromarray(sample).convert('RGB')
    return processed_image


def get_transforms(image_size=(224, 224), is_train=True):
    """
    Simple function to get transformations for training or testing.

    :param image_size: Tuple of desired image dimensions (height, width).
    :param is_train: Whether to include augmentation (True for training, False for testing).
    :return: A torchvision.transforms.Compose object.
    """
    transform_list = [
        transforms.Lambda(preprocess_image),  # Integrate preprocessing here
        transforms.Resize(image_size),
        transforms.ToTensor()
    ]

    if is_train:
        # Add augmentations only for training
        transform_list.extend([
            transforms.RandomRotation(degrees=(-30, 30)),
            transforms.RandomHorizontalFlip(p=0.5),
            transforms.ColorJitter(brightness=0.2, contrast=0.2)
        ])

    return transforms.Compose(transform_list)


def load_image_file_paths(data_dir, extensions=(".tif", ".TIF")):
    """
    List all image file paths in the directory with the given extensions.

    :param data_dir: Path to the directory containing images.
    :param extensions: Tuple of allowed image file extensions.
    :return: List of image file names.
    """
    return [f for f in os.listdir(data_dir) if f.endswith(extensions)]


def split_dataset(image_files, test_size=0.2, random_seed=42):
    """
    Split image file names into training and testing sets.

    :param image_files: List of image file names.
    :param test_size: Proportion of the dataset to use for testing.
    :param random_seed: Random seed for reproducibility.
    :return: Tuple of (train_files, test_files).
    """
    return train_test_split(image_files, test_size=test_size, random_state=random_seed)


def create_dataloaders(data_dir, transform, batch_size, test_size=0.2):
    """
    Create PyTorch DataLoader objects for training and testing.

    :param data_dir: Path to the directory containing images.
    :param transform: Transform to apply to the images.
    :param batch_size: Batch size for DataLoaders.
    :param test_size: Proportion of the dataset to use for testing.
    :return: Tuple of (train_loader, test_loader).
    """
    # Step 1: Load all image file paths
    image_files = load_image_file_paths(data_dir)

    # Step 2: Split into train and test sets
    train_files, test_files = split_dataset(image_files, test_size=test_size)

    # Step 3: Create Dataset objects
    train_dataset = ImageDataset(train_files, data_dir, transform=transform)
    test_dataset = ImageDataset(test_files, data_dir, transform=transform)

    # Step 4: Create DataLoaders
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    return train_loader, test_loader

In [16]:
data_dir = "/Users/blah_m4/Desktop/nanoparticle/images"
transform = get_transforms(image_size=(224, 224), is_train=True)
batch_size = 16

train_loader, test_loader = create_dataloaders(data_dir, transform, batch_size)

print(f"[INFO] Number of training batches: {len(train_loader)}")
print(f"[INFO] Number of testing batches: {len(test_loader)}")

[INFO] Number of training batches: 1
[INFO] Number of testing batches: 1


In [17]:
for images in train_loader:
    print(images.shape)  # Prints the shape of the first batch of images
    break

torch.Size([8, 3, 224, 224])


In [29]:
num_epochs=1
# Setup device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"[INFO] Using device: {device}")

# Get transforms
transform = get_transforms(image_size=(1024, 1024), is_train=True)

# Create dataloaders
train_loader, test_loader = create_dataloaders(data_dir, transform, batch_size)

# Validate batch size
if batch_size > len(train_loader.dataset):
    print(f"[WARNING] Batch size ({batch_size}) exceeds dataset size ({len(train_loader.dataset)}). Adjusting batch size.")
    batch_size = len(train_loader.dataset)

model = create_autoencoder()
model = nn.DataParallel(model)
#model.to(device)

criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)

# Training loop
for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0

    for batch in train_loader:
        batch = batch.to(device)

        # Forward pass
        outputs = model(batch)
        loss = criterion(outputs, batch)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        train_loss += loss.item()

    print(f"[INFO] Epoch [{epoch + 1}/{num_epochs}], Loss: {train_loss / len(train_loader):.4f}")

[INFO] Using device: cpu


AttributeError: 'tuple' object has no attribute 'size'

In [28]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class VAE(nn.Module):
    def __init__(self, in_channels=3, latent_dim=128):
        super(VAE, self).__init__()
        
        # Encoder
        self.encoder = nn.Sequential(
            nn.Conv2d(in_channels, 64, kernel_size=4, stride=2, padding=1),  # 1024 -> 512
            nn.ReLU(),
            nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1),  # 512 -> 256
            nn.ReLU(),
            nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1),  # 256 -> 128
            nn.ReLU()
        )

        # Latent space
        self.fc_mu = nn.Linear(256 * 128 * 128, latent_dim)
        self.fc_logvar = nn.Linear(256 * 128 * 128, latent_dim)

        # Decoder
        self.fc_decoder = nn.Linear(latent_dim, 256 * 128 * 128)
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(256, 128, kernel_size=4, stride=2, padding=1),  # 128 -> 256
            nn.ReLU(),
            nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1),  # 256 -> 512
            nn.ReLU(),
            nn.ConvTranspose2d(64, in_channels, kernel_size=4, stride=2, padding=1),  # 512 -> 1024
            nn.Sigmoid()  # Output normalized to [0, 1]
        )

    def reparameterize(self, mu, logvar):
        """
        Reparameterization trick: z = mu + std * epsilon
        """
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std

    def forward(self, x):
        # Encode
        encoded = self.encoder(x)
        encoded_flat = encoded.view(encoded.size(0), -1)
        
        # Latent variables
        mu = self.fc_mu(encoded_flat)
        logvar = self.fc_logvar(encoded_flat)
        z = self.reparameterize(mu, logvar)

        # Decode
        decoded_flat = self.fc_decoder(z)
        decoded = decoded_flat.view(-1, 256, 128, 128)
        x_reconstructed = self.decoder(decoded)

        return x_reconstructed, mu, logvar

# Loss function for VAE
def vae_loss(recon_x, x, mu, logvar):
    """
    Computes the VAE loss as the sum of reconstruction loss and KL divergence.
    :param recon_x: Reconstructed input.
    :param x: Original input.
    :param mu: Mean of the latent distribution.
    :param logvar: Log variance of the latent distribution.
    """
    recon_loss = F.mse_loss(recon_x, x, reduction='sum')
    kl_div = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    return recon_loss + kl_div

def create_vae():
    """
    Creates and returns a Variational Autoencoder (VAE) instance.
    """
    return VAE()

In [34]:
import torch
import torch.nn as nn

# Parameters
num_epochs = 20
batch_size = 8
data_dir = "/Users/blah_m4/Desktop/nanoparticle/images"  # Replace with your dataset path

# Setup device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"[INFO] Using device: {device}")

# Get transforms
transform = get_transforms(image_size=(1024,1024), is_train=True)

# Create dataloaders
train_loader, test_loader = create_dataloaders(data_dir, transform, batch_size)

# Validate batch size
if batch_size > len(train_loader.dataset):
    print(f"[WARNING] Batch size ({batch_size}) exceeds dataset size ({len(train_loader.dataset)}). Adjusting batch size.")
    batch_size = len(train_loader.dataset)

# Create VAE model
model = create_vae()
model = nn.DataParallel(model)
model.to(device)

# Define loss function and optimizer
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)

# Training loop
for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0

    for batch in train_loader:
        batch = batch.to(device)

        # Forward pass
        reconstructed, mu, logvar = model(batch)
        loss = vae_loss(reconstructed, batch, mu, logvar)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        train_loss += loss.item()

    print(f"[INFO] Epoch [{epoch + 1}/{num_epochs}], Loss: {train_loss / len(train_loader):.4f}")

print("[INFO] Testing complete!")

[INFO] Using device: cpu
[INFO] Epoch [1/20], Loss: 5899865.0000
[INFO] Epoch [2/20], Loss: 7429085.0000
[INFO] Epoch [3/20], Loss: 5658522.5000
[INFO] Epoch [4/20], Loss: 5403704.5000
[INFO] Epoch [5/20], Loss: 5466216.0000
[INFO] Epoch [6/20], Loss: 4563363.0000
[INFO] Epoch [7/20], Loss: 4068160.7500
[INFO] Epoch [8/20], Loss: 5280898.5000
[INFO] Epoch [9/20], Loss: 5148251.0000
[INFO] Epoch [10/20], Loss: 4681949.5000
[INFO] Epoch [11/20], Loss: 5505831.5000
[INFO] Epoch [12/20], Loss: 4605000.0000
[INFO] Epoch [13/20], Loss: 4297038.0000
[INFO] Epoch [14/20], Loss: 4288450.5000
[INFO] Epoch [15/20], Loss: 3508215.5000
[INFO] Epoch [16/20], Loss: 4004722.7500
[INFO] Epoch [17/20], Loss: 3237115.7500
[INFO] Epoch [18/20], Loss: 2965999.0000
[INFO] Epoch [19/20], Loss: 2811727.2500
[INFO] Epoch [20/20], Loss: 2576518.7500
[INFO] Testing complete!


In [35]:
num_epochs = 2

# Training loop
for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0
    recon_loss_total = 0.0
    kl_div_total = 0.0

    for batch in train_loader:
        batch = batch.to(device)

        # Forward pass
        reconstructed, mu, logvar = model(batch)
        
        # Compute losses
        recon_loss = nn.functional.mse_loss(reconstructed, batch, reduction='sum')  # Reconstruction loss
        kl_div = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())  # KL Divergence
        loss = recon_loss + kl_div

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        recon_loss_total += recon_loss.item()
        kl_div_total += kl_div.item()

    print(f"[INFO] Epoch [{epoch + 1}/{num_epochs}], Loss: {train_loss / len(train_loader):.4f}, "
          f"Reconstruction Loss: {recon_loss_total / len(train_loader):.4f}, "
          f"KL Divergence: {kl_div_total / len(train_loader):.4f}")

print("[INFO] Testing complete!")


[INFO] Epoch [1/2], Loss: 2085031.5000, Reconstruction Loss: 1928827.1250, KL Divergence: 156204.3281
[INFO] Epoch [2/2], Loss: 2441944.5000, Reconstruction Loss: 2314545.7500, KL Divergence: 127398.7578
[INFO] Testing complete!
