In [None]:
import os

import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset, random_split
from torchvision.datasets import MNIST
import pandas as pd

import matplotlib.pyplot as plt

import pytorch_lightning as pl


random_seed = 42
torch.manual_seed(random_seed)

BATCH_SIZE=128
AVAIL_GPUS = min(1, torch.cuda.device_count())
NUM_WORKERS=int(os.cpu_count() / 2)

In [None]:
# Define CustomDataset class
class CustomDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.wavelet_files = os.listdir(root_dir)
        self.transform = transform

    def __len__(self):
        return len(self.wavelet_files)

    def __getitem__(self, idx):
        wavelet_path = os.path.join(self.root_dir, self.wavelet_files[idx])
        wavelet = pd.read_csv(wavelet_path)
        
        if self.transform:
            wavelet = self.transform(wavelet)
        return wavelet

In [None]:
## Set Lable in Name
## Import Data is 2D Matrix so it need convert matrix to 6*n*m so it need divide 6 to fist size of matrix to get n 

class CustomDataModule(pl.LightningDataModule):
    def __init__(self, csv_root, batch_size=32, num_workers=0):
        super().__init__()
        self.csv_root = csv_root
        self.batch_size = batch_size
        self.num_workers = num_workers

    def prepare_data(self):
        pass



    def setup(self, stage=None):
        self.dataset = CustomDataset(self.csv_root)
        self.dataset_train, self.dataset_val = random_split(self.dataset,[int(len(self.dataset)*0.7),len(self.dataset)-int(len(self.dataset)*0.7)])

    def train_dataloader(self):
        return DataLoader(self.dataset_train, batch_size=self.batch_size, shuffle=True, num_workers=self.num_workers)

    def val_dataloader(self):
        return DataLoader(self.dataset_train, batch_size=self.batch_size, shuffle=True, num_workers=self.num_workers)

    def test_dataloader(self):
        return None

In [None]:
# Set root directory of dataset
root_dir = 'path/to/your/dataset'

# Define transformations for dataset
transformations = transforms.Compose([
    transforms.ToTensor(),
])

# Create custom dataset and dataloader
data_module = CustomDataModule(root_dir,transforms=transformations)


In [None]:
# Iterate through dataloader
for batch in data_module:
    print(batch.shape)

In [None]:
# Detective: fake or no fake -> 1 output [0, 1]

## TODO : Change the Channel and input size

class Discriminator(nn.Module):
    def __init__(self):
        super().__init__()
        # Simple CNN
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 1)
  
    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        # Flatten the tensor so it can be fed into the FC layers
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return torch.sigmoid(x)

In [None]:

## TODO : Change the Channel and input size


class Generator(nn.Module):
    def __init__(self, latent_dim):
        super().__init__()
        self.lin1 = nn.Linear(latent_dim, 7*7*64)  # [n, 256, 7, 7]
        self.ct1 = nn.ConvTranspose2d(64, 32, 4, stride=2) # [n, 64, 16, 16]
        self.ct2 = nn.ConvTranspose2d(32, 16, 4, stride=2) # [n, 16, 34, 34]
        self.conv = nn.Conv2d(16, 1, kernel_size=7)  # [n, 1, 28, 28]
    

    def forward(self, x):
        # Pass latent space input into linear layer and reshape
        x = self.lin1(x)
        x = F.relu(x)
        x = x.view(-1, 64, 7, 7)  #256
        
        # Upsample (transposed conv) 16x16 (64 feature maps)
        x = self.ct1(x)
        x = F.relu(x)
        
        # Upsample to 34x34 (16 feature maps)
        x = self.ct2(x)
        x = F.relu(x)
        
        # Convolution to 28x28 (1 feature map)
        return self.conv(x)

In [1]:
class GAN(pl.LightningDataModule):
    def __init__(self, latent_dim=100, lr=0.0002):
        super().__init__()
        self.save_hyperparameters()
        
        self.generator = Generator(latent_dim=self.hparams.latent_dim)
        self.discriminator = Discriminator()
        
        
        # random noise
        
        self.validation_z= torch.randn(6, self.hparams.latent_dim)
        
    def forward(self, z):
        return self.generator(z)
    
    
    def adverarial_loss(self, y_hat,y):
        return F.binary_cross_entropy(y_hat,y)
    
    
    def training_step(self, batch, batch_dim, optimizer_idx):   
        real_imgs, _ = batch
        # sample noise
        
        z= torch.randn(real_imgs.shape[0], self.hparams.latent_dim)
        z= z.type_as(real_imgs)
        
        # train generator : max log(D(G(z)))
       
       ## TODO Change the input Data using the label
        
        if optimizer_idx==0:
            fake_imgs=self(z)
            y_hat = self.discriminator(fake_imgs)
            
            y = torch.ones(real_imgs.size[0], 1)
            z= z.type_as(real_imgs)
            
            g_loss =self.adverarial_loss(y_hat, y)
            
            log_dict = {"g_loss" : g_loss }
            return {"loss": g_loss, "progress bar" : log_dict, "log": log_dict}
        
        # train discriminator : max log(D(x)) + log(1-D(G(z)))
                        
        if optimizer_idx== 1:
            
            # how well can it label as real
            y_hat_real = self.discriminator(real_imgs)
            
            y_real = torch.ones(real_imgs.size[0], 1)
            
            y_real = y_real.type_as(real_imgs)
            
            real_loss = self.adverarial_loss(y_hat_real, y_real)
            
            # how well can it label as fake
            y_hat_fake =self.discriminator(self(z))
            
            y_fake = torch.ones(real_imgs.size[0], 1)
            y_fake = y_fake.type_as(real_imgs)
            
            
            fake_loss =self.adverarial_loss(y_hat_fake,y_fake)
            
            d_loss= (real_loss + fake_loss) / 2
            
            log_dict = {"d_loss" : d_loss }
            return {"loss": d_loss, "progress bar" : log_dict, "log": log_dict}
                
    def configure_optimizers(self):
        lr=self.hparams.lr
        opt_g = torch. optim.Adam(self.generator.parameters(),lr=lr)
        opt_d = torch. optim.Adam(self.discriminator.parameters(),lr=lr)
        return [opt_g,opt_d], []
    
    
    def plot_imgs(self):
        z= self.validation_z.type_as(self.generator.lin1.weight)
        sample_imgs=self(z).cpu()
        
        print('epoch',self.current_epoch)
        fig=plt.figure()
        
        for i in range(sample_imgs.size(0)):
            plt.subplot(2,3,i+1)
            plt.tight_layout()
            plt.imshow(sample_imgs.detach()[i,0,:,:],cmap='gray_r',interpolation='none')
            plt.title("Generated Data")
            plt.xticks([])
            plt.yticks([])
            plt.axes('off')
        plt.show()
    
    def on_epoch_end(self):
        self.plot_imgs()
        
        

NameError: name 'pl' is not defined

In [None]:
dm =MNISTDataModule()

model = GAN()


In [None]:
model.plot_imgs()

In [None]:
trainer =pl.Trainer(max_epochs=20,Gpus=AVAIL_GPUS)

trainer.fit(model,dm)