# Initial Model Research (Baseline)

## Google Colab or Jupyter Settings

### Google Colab

In [None]:
# Repo

# GITHUB_PASSWORD = 'PASSWORD'
# GIHUB_TOKEN = 'TOKEN'
# GITHUB_USERNAME = 'USERNAME'
# GUTHUB_EMAIL = 'EMAIL'
# GITHUB_REPO = 'nesm-gan'
# GITHUB_BRANCH = 'BRANCH'

# !git config --global user.name {GITHUB_USERNAME}
# !git config --global user.email {GUTHUB_EMAIL}
# !git config --global user.password {GITHUB_PASSWORD}
# !git clone https://{GIHUB_TOKEN}@github.com/{GITHUB_USERNAME}/{GITHUB_REPO}

# %cd {GITHUB_REPO}
# !git checkout {GITHUB_BRANCH}
# !git pull origin {GITHUB_BRANCH}

In [None]:
# Data

# from google.colab import drive  # To Log In
# drive.mount('/content/drive')

# !for f in "/content/drive/My Drive/Colab/nesm_gan_data/"*; \
# do ln -s "$f" "/content/nesm-gan/data/$(basename "$f")"; done
# !for f in "/content/drive/My Drive/Colab/nesm_gan_models/"*; \
# do ln -s "$f" "/content/nesm-gan/models/$(basename "$f")"; done
# !pip install dvc  # requirements.txt
# !dvc pull

# ROOT_DIR = '/content/nesm-gan/'

### Jupyter

In [None]:
# Repo, data is stored locally

# import os
# import sys


# ROOT_DIR = os.path.abspath('..')  # ROOT_DIR = Path(__file__).parents[1].resolve()
# if ROOT_DIR not in sys.path:
#     sys.path.append(ROOT_DIR)
# sys.path.append(ROOT_DIR)

## GAN

In [None]:
!pip install mlflow
!pip install pyngrok


from pathlib import Path

import matplotlib.pyplot as plt
import mlflow
import numpy as np
# from pyngrok import ngrok
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset
from torch.utils.data import DataLoader

from tqdm.notebook import tqdm


# Random
%matplotlib inline

RANDOM_SEED = 13
np.random.seed(RANDOM_SEED)

torch.manual_seed(RANDOM_SEED)
torch.cuda.manual_seed(RANDOM_SEED)
torch.cuda.manual_seed_all(RANDOM_SEED)

torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

# Configuration
NOISE_VECTOR_LEN = 100

# Train data
DATA_PATH = ROOT_DIR / Path('data/nesmdb24_seprsco_train_ready')
MODELS_PATH = ROOT_DIR / Path('data/models')
SAMPLE_LEN = 256 # 128 = 4-5sec
ROWS_CNT = 8

for file in DATA_PATH.iterdir():
    song = np.load(file)
    print(song.shape)
    plt.imshow(song, cmap='gray')
    plt.show()
    break

#### Generator

In [None]:
class Generator(nn.Module):
    def __init__(self, noise_len: int):
        super(Generator, self).__init__()

        # Transform input noise vector to a suitable shape for deconvolutional layers.
        # Layer input: [batch_size, noise_len]
        # Layer output: [batch_size, 128*8*8] (will be reshaped to [batch_size, 128, 8, 8])
        self.fc = nn.Linear(noise_len, 128 * 8 * 8, bias=False)

        # First deconvolution layer increases spatial dimensions.
        # Layer input: [batch_size, 128, 8, 8]
        # Layer output: [batch_size, 64, 16, 16]
        self.conv1 = nn.ConvTranspose2d(128, 64, 4, stride=2, padding=1, bias=False)

        # Batch normalization stabilizes training.
        # Layer input: [batch_size, 64, 16, 16]
        # Layer output: [batch_size, 64, 16, 16]
        self.batchnorm1 = nn.BatchNorm2d(64)

        # Second deconvolution layer further increases spatial dimensions.
        # Layer input: [batch_size, 64, 16, 16]
        # Layer output: [batch_size, 32, 32, 32]
        self.conv2 = nn.ConvTranspose2d(64, 32, 4, stride=1, padding=1, bias=False)

        # Another batch normalization layer.
        # Layer input: [batch_size, 32, 32, 32]
        # Layer output: [batch_size, 32, 32, 32]
        self.batchnorm2 = nn.BatchNorm2d(32)

        # Final deconvolution layer to produce the output image.
        # Layer input: [batch_size, 32, 32, 32]
        # Layer output: [batch_size, 1, 32, 32]
        self.conv3 = nn.ConvTranspose2d(32, 1, 2, stride=2, padding=1, bias=True)
        # self.conv3 = nn.ConvTranspose2d(32, 1, 4, stride=2, padding=2, dilation=1, bias=True)

    def forward(self, x):
        x = self.fc(x)
        x = x.view(-1, 128, 8, 8)  # Reshape to [batch_size, 128, 8, 8]
        x = F.leaky_relu(self.batchnorm1(self.conv1(x)), 0.2)  # LeakyReLU for non-linearity
        x = F.leaky_relu(self.batchnorm2(self.conv2(x)), 0.2)
        x = torch.sigmoid(self.conv3(x))  # Final activation to get output in [0, 1]
        return x


generator = Generator(NOISE_VECTOR_LEN)
noise = torch.randn(1, NOISE_VECTOR_LEN)
with torch.no_grad():
    generated_image = generator(noise)
    print(generated_image.shape)

    image = generated_image[0].permute(1, 2, 0).numpy()
    print(image.shape)
    plt.imshow(image, cmap='gray')
    plt.show()

#### Discriminator

In [None]:
class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()

        # Convolution layer to downsample the image.
        # Layer input: [batch_size, 1, 32, 32] / [batch_size, channels, height, width]
        # Layer output: [batch_size, 32, 16, 16]
        self.conv1 = nn.Conv2d(1, 32, 4, stride=2, padding=1)
        self.batchnorm1 = nn.BatchNorm2d(32)

        # Second convolution layer to further downsample.
        # Layer input: [batch_size, 32, 16, 16]
        # Layer output: [batch_size, 64, 8, 8]
        self.conv2 = nn.Conv2d(32, 64, 4, stride=2, padding=1)
        self.batchnorm2 = nn.BatchNorm2d(64)

        # Third convolution layer.
        # Layer input: [batch_size, 64, 8, 8]
        # Layer output: [batch_size, 128, 4, 4]
        self.conv3 = nn.Conv2d(64, 128, 4, stride=2, padding=1)
        self.batchnorm3 = nn.BatchNorm2d(128)

        # Fully connected layer to output a single value (real/fake probability).
        # Layer input: [batch_size, 128*4*4]
        # Layer output: [batch_size, 1]
        self.fc = nn.Linear(128 * 4 * 4, 1)

    def forward(self, x):
        x = F.leaky_relu(self.batchnorm1(self.conv1(x)), 0.2)
        x = F.leaky_relu(self.batchnorm2(self.conv2(x)), 0.2)
        x = F.leaky_relu(self.batchnorm3(self.conv3(x)), 0.2)
        x = x.view(-1, 128 * 4 * 4)
        x = torch.sigmoid(self.fc(x))  # Sigmoid activation to get a probability
        return x


discriminator = Discriminator()
with torch.no_grad():
    decision = discriminator(generated_image)
    print(decision)

## Data Loader

In [None]:
class CustomDataset(Dataset):
    def __init__(self, data_dir: Path):
        self.data_dir = data_dir
        self.file_list = list(data_dir.iterdir())

    def __len__(self):
        return len(self.file_list)

    def __getitem__(self, idx: int):
        file_path = self.file_list[idx]
        image = np.load(file_path)
        image = torch.tensor(image, dtype=torch.float32).permute(2, 0, 1)
        return image

## Training
NESMGAN Experiment 0

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print('Using device:', device)

In [None]:
# Configuration
EXPERIMENT_NAME = '0'
BATCH_SIZE = 256
LR = 0.0002
EPOCHS = 1000


def train(generator: nn.Module,
          discriminator: nn.Module,
          optimizer_G: optim.Adam,
          optimizer_D: optim.Adam,
          loss: nn.Module,
          dataloader: torch.utils.data.DataLoader):
    for epoch in range(EPOCHS):
        for batch_idx, real_images in enumerate(dataloader):
            batch_size = real_images.size(0)

            # Generate fake images using random noise
            noise = torch.randn(batch_size, NOISE_VECTOR_LEN)
            fake_images = generator(noise)

            # Train Discriminator
            optimizer_D.zero_grad()
            real_labels = torch.ones(batch_size, 1)
            fake_labels = torch.zeros(batch_size, 1)
            # print(f'{real_images.shape=}, {fake_labels.shape=}')

            # Detach fake_images to avoid generator gradients
            real_outputs = discriminator(real_images)
            fake_outputs = discriminator(fake_images.detach())
            # print(f'{real_outputs.shape=}, {fake_outputs.shape=}')

            d_loss_real = loss(real_outputs, real_labels)
            d_loss_fake = loss(fake_outputs, fake_labels)
            d_loss = d_loss_real + d_loss_fake  # / 2.0
            d_loss.backward()
            optimizer_D.step()

            # Train Generator
            optimizer_G.zero_grad()
            noise = torch.randn(batch_size, NOISE_VECTOR_LEN)
            fake_images = generator(noise)
            fake_outputs = discriminator(fake_images)

            # Generator wants discriminator to output 1 for fakes
            g_loss = loss(fake_outputs, real_labels)
            g_loss.backward()
            optimizer_G.step()

            mlflow.log_metric("D Loss", d_loss.item(), step=epoch * len(dataloader) + batch_idx)
            mlflow.log_metric("G Loss", g_loss.item(), step=epoch * len(dataloader) + batch_idx)
            if batch_idx % 100 == 0:
                print(f'Epoch [{epoch}/{EPOCHS}], Batch [{batch_idx}/{len(dataloader)}], '
                    f'D Loss: {d_loss.item():.4f}, G Loss: {g_loss.item():.4f}')

        if (epoch + 1) % 10 == 0 or epoch + 1 == EPOCHS:
            model_save_path = \
                f'{MODELS_PATH}/{EXPERIMENT_NAME}_model_checkpoints/epoch_{epoch + 1}'
            model_save_path.mkdir(parents=True, exist_ok=True)
            torch.save(generator.state_dict(),
                        model_save_path / 'generator_state_dict.pth')
            torch.save(discriminator.state_dict(),
                        model_save_path / 'discriminator_state_dict.pth')


dataset = CustomDataset(DATA_PATH)
dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True)
generator = Generator(NOISE_VECTOR_LEN)
discriminator = Discriminator()
optimizer_G = optim.Adam(generator.parameters(), lr=LR)
optimizer_D = optim.Adam(discriminator.parameters(), lr=LR)
loss = nn.BCELoss()


# Training
mlflow.set_experiment(f'NESMGAN Experiment {EXPERIMENT_NAME}')
with mlflow.start_run():
    mlflow.log_param("batch_size", BATCH_SIZE)
    mlflow.log_param("learning_rate", LR)
    mlflow.log_param("epochs", EPOCHS)

    train(generator, discriminator, optimizer_G, optimizer_D, loss, dataloader)

    mlflow.pytorch.log_model(generator, "generator")
    mlflow.pytorch.log_model(discriminator, "discriminator")


# Generate and visualize images after training
num_images_to_show = 5
noise = torch.randn(num_images_to_show, NOISE_VECTOR_LEN)
generated_images = generator(noise).detach().cpu()

fig, axs = plt.subplots(1, num_images_to_show, figsize=(12, 3))

for i in range(num_images_to_show):
    axs[i].imshow(generated_images[i].squeeze().numpy(), cmap='gray')
    axs[i].axis('off')

plt.tight_layout()
plt.show()

In [None]:
# NGROK_AUTH_TOKEN = 'NGROK_AUTH_TOKEN'
# ngrok.set_auth_token(NGROK_AUTH_TOKEN)
# get_ipython().system_raw("mlflow ui --port 5000 &")
# public_url = ngrok.connect(port=5000)
# print(f'MLFlow Tracking UI: {public_url}')
# !mlflow ui