In [None]:
from PIL import Image
import os
import torch
import numpy as np

## Using 14-70 filter

In [None]:
eeg_data_path = "/content/drive/MyDrive/Data/eeg_14_70_std.pth"
eeg_data = torch.load(eeg_data_path)

In [None]:
#### Don't re- run
input_dir = "/content/drive/MyDrive/Data/Subset"
output_dir = "/content/drive/MyDrive/Data/Subset_128x128"

# Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)

def preprocess_images(input_dir, output_dir, size=(128, 128)):
    for folder_name in os.listdir(input_dir):
        input_folder_path = os.path.join(input_dir, folder_name)
        output_folder_path = os.path.join(output_dir, folder_name)
        os.makedirs(output_folder_path, exist_ok=True)  # Create output subfolder

        for file_name in os.listdir(input_folder_path):
            input_file_path = os.path.join(input_folder_path, file_name)
            output_file_path = os.path.join(output_folder_path, file_name)

            # Open, resize, and save the image
            with Image.open(input_file_path) as img:
                img = img.convert("RGB")  # Ensure consistent format
                img = img.resize(size, Image.Resampling.LANCZOS)
                img.save(output_file_path)

    print(f"All images resized to {size} and saved in {output_dir}")

# Run preprocessing
preprocess_images(input_dir, output_dir)


In [None]:
### DONT RE-run

eeg_data_path = "/content/drive/MyDrive/Data/eeg_14_70_std.pth"
eeg_data = torch.load(eeg_data_path)

def preprocess_eeg_signals(eeg_data):
    for entry in eeg_data['dataset']:
        # Get the EEG tensor and trim it
        eeg_signal = entry['eeg']  # Shape: [128, 500]
        entry['eeg'] = eeg_signal[:, 20:-40]  # Trim to [128, 440]

    print("EEG signals preprocessed to shape [128, 440]")
    return eeg_data

# Preprocess EEG
eeg_data_preprocessed = preprocess_eeg_signals(eeg_data)

# Save the preprocessed EEG data
preprocessed_path = "/content/drive/MyDrive/Data/eeg_14_70_preprocessed.pth"
torch.save(eeg_data_preprocessed, preprocessed_path)
print(f"Preprocessed EEG data saved at {preprocessed_path}")

In [None]:
input_dir = "/content/drive/MyDrive/Data/Subset"
output_dir = "/content/drive/MyDrive/Data/Subset_128x128"
preprocessed_path = "/content/drive/MyDrive/Data/eeg_14_70_preprocessed.pth"
eeg_data_preprocessed = torch.load(preprocessed_path)

In [None]:
def normalize_image(img):
    # Convert to numpy array and normalize to [-1, 1]
    img = np.array(img) / 127.5 - 1  # Rescale to [-1, 1]
    return img

def preprocess_images_with_normalization(input_dir, output_dir, size=(128, 128)):
    for folder_name in os.listdir(input_dir):
        input_folder_path = os.path.join(input_dir, folder_name)
        output_folder_path = os.path.join(output_dir, folder_name)
        os.makedirs(output_folder_path, exist_ok=True)  # Create output subfolder

        for file_name in os.listdir(input_folder_path):
            input_file_path = os.path.join(input_folder_path, file_name)
            output_file_path = os.path.join(output_folder_path, file_name)

            # Open, resize, normalize, and save the image
            with Image.open(input_file_path) as img:
                img = img.convert("RGB")  # Ensure consistent format
                img = img.resize(size, Image.Resampling.LANCZOS)
                img = normalize_image(img)

                # Convert back to image and save
                img = Image.fromarray(np.uint8((img + 1) * 127.5))  # Denormalize for saving
                img.save(output_file_path)

    print(f"All images resized and normalized to {size} and saved in {output_dir}")

# Run image preprocessing with normalization
preprocess_images_with_normalization(input_dir, output_dir)


In [None]:
def normalize_eeg_signal(eeg_signal):
    # Normalize to zero mean and unit variance
    return (eeg_signal - eeg_signal.mean()) / eeg_signal.std()

def preprocess_eeg_signals_with_normalization(eeg_data):
    for entry in eeg_data['dataset']:
        # Get the EEG signal and normalize it
        eeg_signal = entry['eeg']  # Shape: [128, 440]
        entry['eeg'] = normalize_eeg_signal(eeg_signal)

    print("EEG signals normalized to zero mean and unit variance")
    return eeg_data

# Preprocess EEG with normalization
eeg_data_normalized = preprocess_eeg_signals_with_normalization(eeg_data)

# Save the preprocessed EEG data
preprocessed_path = "/content/drive/MyDrive/Data/eeg_14_70_normalized.pth"
torch.save(eeg_data_normalized, preprocessed_path)
print(f"Preprocessed EEG data saved at {preprocessed_path}")


In [None]:
eeg_data_normalized = torch.load("/content/drive/MyDrive/Data/eeg_14_70_normalized.pth")

In [None]:
eeg_data_normalized['dataset'][0]['eeg'].shape

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn.utils import spectral_norm  # Import for spectral normalization

# Define ResBlock with Spectral Normalization
# class ResBlock(nn.Module):
#     def __init__(self, in_channels, out_channels, downsample=False):
#         super(ResBlock, self).__init__()

#         # Convolution layers
#         self.conv1 = spectral_norm(nn.Conv2d(in_channels, out_channels, 3, padding=1))
#         self.bn1 = nn.BatchNorm2d(out_channels)
#         self.conv2 = spectral_norm(nn.Conv2d(out_channels, out_channels, 3, padding=1))
#         self.bn2 = nn.BatchNorm2d(out_channels)

#         # Downsampling or upsampling
#         self.downsample = nn.Conv2d(in_channels, out_channels, 1, stride=2, padding=0) if downsample else None

#     def forward(self, x):
#         residual = x
#         x = F.relu(self.bn1(self.conv1(x)))
#         x = self.bn2(self.conv2(x))

#         # If downsample is needed (for reducing the spatial dimension)
#         if self.downsample:
#             residual = self.downsample(residual)

#         x += residual  # Add the residual connection
        # return F.relu(x)

# Generator model with Spectral Normalization
class Generator(nn.Module):
    def __init__(self, z_dim=100, y_dim=128):
        super(Generator, self).__init__()

        # Input: z + y
        self.input_dim = z_dim + y_dim

        # Dense layer
        self.dense = nn.Linear(self.input_dim, 4 * 4 * 1024)

        # ResBlocks for upsampling
        self.resblock1 = ResBlock(1024, 1024)
        self.resblock2 = ResBlock(1024, 512)
        self.resblock3 = ResBlock(512, 256)
        self.resblock4 = ResBlock(256, 128)
        self.resblock5 = ResBlock(128, 64)

        # Final convolution to produce an image of shape (3, 128, 128)
        self.conv = spectral_norm(nn.Conv2d(64, 3, 3, padding=1))
        self.tanh = nn.Tanh()

    def forward(self, z, y):
        # Concatenate noise vector z and conditional vector y
        x = torch.cat([z, y], dim=1)

        # Pass through the dense layer
        x = self.dense(x)
        x = x.view(-1, 1024, 4, 4)  # Reshape to (batch, 1024, 4, 4)

        # Apply ResBlocks for upsampling
        x = self.resblock1(x)
        x = self.resblock2(x)
        x = self.resblock3(x)
        x = self.resblock4(x)
        x = self.resblock5(x)

        # Final convolution and Tanh activation
        x = self.conv(x)
        x = self.tanh(x)  # Output image range [-1, 1]
        return x

# Discriminator model with Spectral Normalization

class ResBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(ResBlock, self).__init__()

        # Apply spectral normalization directly to convolution layers
        self.conv1 = spectral_norm(nn.Conv2d(in_channels, out_channels, 3, padding=1))
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = spectral_norm(nn.Conv2d(out_channels, out_channels, 3, padding=1))
        self.bn2 = nn.BatchNorm2d(out_channels)

        # Downsampling or upsampling (if required)
        self.downsample = nn.Conv2d(in_channels, out_channels, 1) if in_channels != out_channels else None

    def forward(self, x):
        residual = x

        x = F.relu(self.bn1(self.conv1(x)))
        x = self.bn2(self.conv2(x))

        # If downsampling is needed (for reducing spatial dimensions)
        if self.downsample:
            residual = self.downsample(residual)

        x += residual  # Add residual connection
        return F.relu(x)


class Discriminator(nn.Module):
    def __init__(self, y_dim=128):
        super().__init__()  # Corrected the super call

        # ResBlocks with spectral normalization applied within the blocks
        self.resblock1 = ResBlock(3, 64)  # Input: 3 channels (RGB), output: 64 channels
        self.resblock2 = ResBlock(64, 128)
        self.resblock3 = ResBlock(128, 256)
        self.resblock4 = ResBlock(256, 512)

        # Conditional embedding layer for labels
        self.embed = nn.Embedding(y_dim, 1024)

        # Final fully connected layer to output probability
        self.fc = nn.Linear(1024, 1)

    def forward(self, x, y):
        # Pass image through ResBlocks
        x = self.resblock1(x)
        x = self.resblock2(x)
        x = self.resblock3(x)
        x = self.resblock4(x)

        # Flatten image tensor and embed the conditional label vector y
        y_embedded = self.embed(y)
        x = x.view(x.size(0), -1)  # Flatten the image
        x = torch.cat([x, y_embedded], dim=1)  # Concatenate with the conditional label vector

        # Final fully connected layer to get probability
        x = self.fc(x)
        return torch.sigmoid(x)

In [None]:
# Discriminator loss
def discriminator_loss(D_real, D_fake, epsilon=1e-8):
    real_loss = torch.mean(torch.log(D_real + epsilon))
    fake_loss = torch.mean(torch.log(1 - D_fake + epsilon))
    return -(real_loss + fake_loss)

# Generator loss
def generator_loss(D_fake, epsilon=1e-8):
    return -torch.mean(torch.log(D_fake + epsilon))


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

In [None]:
# import os
# import pandas as pd

# # Path to the folder containing the 40 class subfolders
# input_dir = "/content/drive/MyDrive/Data/Subset_128x128"

# # List to store filename and corresponding label
# data = []

# # Loop through each class folder (0-39)
# for label, folder_name in enumerate(os.listdir(input_dir)):
#     folder_path = os.path.join(input_dir, folder_name)

#     # Skip non-directory files
#     if not os.path.isdir(folder_path):
#         continue

#     # Loop through each image in the folder
#     for image_name in os.listdir(folder_path):
#         if image_name.endswith(".JPEG"):
#             data.append([image_name, label])

# # Convert the list into a pandas DataFrame
# df = pd.DataFrame(data, columns=["filename", "label"])

# # Save the DataFrame to a CSV file
# labels_csv_path = "/content/drive/MyDrive/Data/labels.csv"
# df.to_csv(labels_csv_path, index=False)

# print(f"Labels CSV created at {labels_csv_path}")


In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
import pandas as pd
import os

class ImageDataset(Dataset):
    def __init__(self, csv_file, img_dir, transform=None):
        # Read the CSV file and make sure there is no header or extra unwanted characters
        self.data_frame = pd.read_csv(csv_file)

        # Make sure to drop any rows that might have non-numeric labels (if any)
        self.data_frame['label'] = pd.to_numeric(self.data_frame['label'], errors='coerce')

        # Remove any rows where the label conversion failed (if any)
        self.data_frame = self.data_frame.dropna(subset=['label'])

        # Ensure that label column is in integer format
        self.data_frame['label'] = self.data_frame['label'].astype(int)

        self.img_dir = img_dir
        self.transform = transform

    def __len__(self):
        return len(self.data_frame)

    def __getitem__(self, idx):
        # Get the synset ID for the image and construct the path correctly
        label = self.data_frame.iloc[idx, 1]
        img_name = self.data_frame.iloc[idx, 0]  # This should give the full filename, e.g., 'n03180011_1108.JPEG'

        # Extract the subfolder name from the filename (e.g., 'n03180011')
        subfolder_name = img_name.split('_')[0]

        # Construct the full image path by joining the subfolder and filename
        img_path = os.path.join(self.img_dir, subfolder_name, img_name)

        image = Image.open(img_path)

        if self.transform:
            image = self.transform(image)

        return image, label




# Transformations
transform = transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

# Define dataset and dataloader
image_dir = "/content/drive/MyDrive/Data/Subset_128x128"
label_csv = "/content/drive/MyDrive/Data/labels.csv"
dataset = ImageDataset(label_csv, image_dir, transform=transform)
dataloader = DataLoader(dataset, batch_size=64, shuffle=True)

In [None]:
import torch
import torch.optim as optim
import torch.nn.functional as F
from torchvision.utils import save_image
import numpy as np

# Set device (GPU if available)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Initialize models
z_dim = 100  # Size of noise vector
y_dim = 128  # Number of classes
generator = Generator(z_dim=z_dim, y_dim=y_dim).to(device)
discriminator = Discriminator(y_dim=y_dim).to(device)

# Optimizers for Generator and Discriminator
lr = 0.0002
beta1, beta2 = 0.5, 0.999
optimizer_G = optim.Adam(generator.parameters(), lr=lr, betas=(beta1, beta2))
optimizer_D = optim.Adam(discriminator.parameters(), lr=lr, betas=(beta1, beta2))

# Number of epochs
num_epochs = 50
sample_interval = 100  # Interval to save sample images

# Start training loop
for epoch in range(num_epochs):
    for i, (real_images, labels) in enumerate(dataloader):
        # Move data to device
        real_images = real_images.to(device)
        labels = labels.to(device)

        # Create noise vector z
        z = torch.randn(real_images.size(0), z_dim, device=device)  # Random noise

        # Train Discriminator
        optimizer_D.zero_grad()

        # Real images
        D_real = discriminator(real_images, labels)
        D_real_loss = discriminator_loss(D_real, torch.ones_like(D_real, device=device))

        # Fake images
        fake_images = generator(z, labels)
        D_fake = discriminator(fake_images.detach(), labels)
        D_fake_loss = discriminator_loss(D_fake, torch.zeros_like(D_fake, device=device))

        # Total Discriminator loss and backpropagation
        d_loss = D_real_loss + D_fake_loss
        d_loss.backward()
        optimizer_D.step()

        # Train Generator
        optimizer_G.zero_grad()

        # Generate fake images
        D_fake = discriminator(fake_images, labels)

        # Generator loss
        g_loss = generator_loss(D_fake)
        g_loss.backward()
        optimizer_G.step()

        # Print and save the losses at sample interval
        if i % sample_interval == 0:
            print(f'Epoch [{epoch}/{num_epochs}], Step [{i}/{len(dataloader)}], D Loss: {d_loss.item()}, G Loss: {g_loss.item()}')

            # Save generated samples for visualization
            with torch.no_grad():
                sample_z = torch.randn(64, z_dim, device=device)  # Generate sample noise
                sample_labels = torch.randint(0, y_dim, (64,), device=device)  # Random labels
                samples = generator(sample_z, sample_labels)
                samples = (samples + 1) / 2  # Scale back to [0, 1] for visualization
                save_image(samples, f'./samples/epoch_{epoch}_step_{i}.png', nrow=8, normalize=True)

    # Optional: Save model checkpoints every 10 epochs
    if (epoch + 1) % 10 == 0:
        torch.save(generator.state_dict(), f'generator_epoch_{epoch+1}.pth')
        torch.save(discriminator.state_dict(), f'discriminator_epoch_{epoch+1}.pth')


## Trying with CPU

In [None]:
# device = torch.device("cpu")

In [None]:
# for i, (real_images, labels) in enumerate(dataloader):
#     real_images = real_images.to(device)  # Move to CPU
#     labels = labels.to(device)