# GAN

In [1]:
!pip install livelossplot



In [2]:
import numpy as np
import datetime, time
import matplotlib.pyplot as plt

from sklearn.metrics import mean_squared_error
from torch.utils.data import TensorDataset, DataLoader
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import torchvision.transforms as transforms
from torchvision import datasets
from torch.utils.data import Dataset

from sklearn.metrics import accuracy_score
from sklearn.model_selection import StratifiedShuffleSplit

from livelossplot import PlotLosses


In [3]:
from google.colab import drive
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [4]:
# load the training data and reshape
train = np.load('/content/gdrive/MyDrive/Data/Ferguson_fire_train.npy')

print(f"Before reshaping: {np.shape(train)}")

train_1D = np.reshape(
    train, (np.shape(train)[0], np.shape(train)[1] * np.shape(train)[2])
)
print(f"After reshaping: {train_1D.shape}")

train = np.array(train)
train_1D = np.array(train_1D)

Before reshaping: (12500, 256, 256)
After reshaping: (12500, 65536)


In [5]:
# load the test data and reshape
test = np.load('/content/gdrive/MyDrive/Data/Ferguson_fire_test.npy')

print(f"Before reshaping: {np.shape(test)}")

test_1D = np.reshape(
    test, (np.shape(test)[0], np.shape(test)[1] * np.shape(test)[2])
)
print(f"After reshaping: {test_1D.shape}")

test = np.array(test)
test_1D = np.array(test_1D)

Before reshaping: (5000, 256, 256)
After reshaping: (5000, 65536)


In [6]:
device = 'cpu'
if torch.cuda.device_count() > 0 and torch.cuda.is_available():
    print("Cuda installed! Running on GPU!")
    device = 'cuda'
else:
    print("No GPU available!")

No GPU available!


In [7]:
class Generator(nn.Module):
    def __init__(self, g_input_dim=100, g_output_dim=256*256):
        super().__init__()
        self.fc1 = nn.Linear(g_input_dim, 256)
        self.fc2 = nn.Linear(self.fc1.out_features, self.fc1.out_features*2)
        self.fc3 = nn.Linear(self.fc2.out_features, self.fc2.out_features*2)
        self.fc4 = nn.Linear(self.fc3.out_features, g_output_dim)

    # forward method
    def forward(self, x):
        x = F.leaky_relu(self.fc1(x), 0.2)
        x = F.leaky_relu(self.fc2(x), 0.2)
        x = F.leaky_relu(self.fc3(x), 0.2)
        return torch.tanh(self.fc4(x))

class Discriminator(nn.Module):
    def __init__(self, d_input_dim=256*256):
        super().__init__()
        self.fc1 = nn.Linear(d_input_dim, 1024)
        self.fc2 = nn.Linear(self.fc1.out_features, self.fc1.out_features//2)
        self.fc3 = nn.Linear(self.fc2.out_features, self.fc2.out_features//2)
        self.fc4 = nn.Linear(self.fc3.out_features, 1)

    # forward method
    def forward(self, x):
        x = F.leaky_relu(self.fc1(x), 0.2)
        x = F.dropout(x, 0.3)
        x = F.leaky_relu(self.fc2(x), 0.2)
        x = F.dropout(x, 0.3)
        x = F.leaky_relu(self.fc3(x), 0.2)
        x = F.dropout(x, 0.3)
        return torch.sigmoid(self.fc4(x))

# build model
G = Generator().to(device)
D = Discriminator().to(device)

# define loss
criterion = nn.BCELoss()
z_dim = 100

# optimiser
lr = 0.0001
G_optimizer = torch.optim.Adam(G.parameters(), lr = lr)
D_optimizer = torch.optim.Adam(D.parameters(), lr = lr)

In [8]:
def D_train(x):
    D.train()
    D_optimizer.zero_grad()

    # train discriminator on real data -- assign high score (use 1 here)
    x_real, y_real = x.view(-1, 256*256), torch.ones(batch_size, 1)  # we are assigning the label 'real data' to the samples (don't care anymore about what number they are)
    x_real, y_real = x_real.to(device), y_real.to(device)

    D_output = D(x_real)
    D_real_loss = criterion(D_output, y_real)

    # train discriminator on fake data -- assign low score (use 0 here)
    # sample vector and produce generator output
    z = torch.randn(batch_size, z_dim).to(device)
    x_fake, y_fake = G(z), torch.zeros(batch_size, 1).to(device)

    D_output = D(x_fake)
    D_fake_loss = criterion(D_output, y_fake)

    # combine the losses
    D_loss = D_real_loss + D_fake_loss

    # model update
    D_loss.backward()
    D_optimizer.step()

    return  D_loss.data.item()  ### deprecated version of loss.detach(), basically gets access to the tensor without the computational graph attached


In [9]:
# DataLoader

# Convert your training data to PyTorch tensors
train_data = torch.tensor(train_1D, dtype=torch.float32)
test_data = torch.tensor(test_1D, dtype=torch.float32)

# Create a TensorDataset from your training data

# transform = transforms.Compose([transforms.ToTensor(),
#                                 transforms.Normalize(mean=(0.5), std=(0.5))])

train_dataset = TensorDataset(train_data)
test_dataset = TensorDataset(test_data)

# Specify batch size
batch_size = 100

# Create a DataLoader for your training dataset
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

In [10]:
def G_train(x):
    G.train()
    G_optimizer.zero_grad()

    # sample vector and produce generator output
    z = torch.randn(batch_size, z_dim).to(device)
    G_output = G(z)

    # obtain scores from D for the generated data
    D_output = D(G_output)

    # train generator to "fool" discriminator
    y = torch.ones(batch_size, 1).to(device)
    G_loss = criterion(D_output, y)

    # model update
    G_loss.backward()
    G_optimizer.step()

    return G_loss.data.item()  ### deprecated version of loss.detach(), basically gets access to the tensor without the computational graph attached


In [None]:
n_epoch = 100
groups = {'Loss': ['D_Loss', 'G_Loss']}
liveloss = PlotLosses(groups=groups)

for epoch in range(1, n_epoch + 1):
    D_losses, G_losses = [], []
    logs = {}

    for batch_idx, (real_images,) in enumerate(train_loader):
        real_images = real_images.view(-1, 256*256).to(device)

        D_loss = D_train(real_images)
        G_loss = G_train(real_images.size(0))

        D_losses.append(D_loss)
        G_losses.append(G_loss)

    logs['D_Loss'] = torch.tensor(D_losses).mean().item()
    logs['G_Loss'] = torch.tensor(G_losses).mean().item()

    liveloss.update(logs)
    liveloss.draw()

    # save after training
    if(np.mod(epoch, 100) == 0):
      torch.save(G.state_dict(), "./GAN/Generator_{:03d}.pth".format(epoch))

In [None]:
G.load_state_dict(torch.load("./GAN/Generator_{:03d}.pth".format(epoch)))

with torch.no_grad():
    test_z = torch.randn(batch_size, z_dim).to(device)
    generated = G(test_z)

    # save_image(generated.view(generated.size(0), 1, 28, 28), './sample_' + '.png')
fig, axarr = plt.subplots(10, 10, figsize=(12, 12))
for ax, img in zip(axarr.flatten(), generated.view(generated.size(0), 256, 256).cpu()):
  ax.imshow(img, cmap="gray")
plt.title('Epoch = {:03d}'.format(epoch))

# Testing DCGAN

In [26]:
class Generator(nn.Module):
    def __init__(self, g_input_dim=100, g_output_dim=256):
        super().__init__()
        self.init_size = g_output_dim // 8
        self.fc1 = nn.Linear(g_input_dim, 128 * self.init_size ** 2)
        self.conv_blocks = nn.Sequential(
            nn.BatchNorm2d(128),
            nn.Upsample(scale_factor=2),
            nn.Conv2d(128, 64, 3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Upsample(scale_factor=2),
            nn.Conv2d(64, 32, 3, stride=1, padding=1),
            nn.BatchNorm2d(32),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(32, 16, 3, stride=1, padding=1),
            nn.BatchNorm2d(16),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(16, 1, 3, stride=1, padding=1),
            nn.Tanh()
        )

    def forward(self, x):
        print("Generator input shape:", x.shape)
        x = self.fc1(x)
        print("After fc1 shape:", x.shape)
        x = x.view(x.shape[0], 128, self.init_size, self.init_size)
        print("After view shape:", x.shape)
        x = self.conv_blocks(x)
        print("After conv_blocks shape:", x.shape)
        return x

class Discriminator(nn.Module):
    def __init__(self, d_input_dim=256):
        super().__init__()
        self.model = nn.Sequential(
            nn.Conv2d(1, 16, 3, 2, 1),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Dropout2d(0.25),
            nn.Conv2d(16, 32, 3, 2, 1),
            # nn.ZeroPad2d((0, 1, 0, 1)),  # the fucking mismatch is from here
            nn.BatchNorm2d(32, 0.8),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Dropout2d(0.25),
            nn.Conv2d(32, 64, 3, 2, 1),
            nn.BatchNorm2d(64, 0.8),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Dropout2d(0.25),
            nn.Conv2d(64, 128, 3, 1, 1),
            nn.BatchNorm2d(128, 0.8),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Dropout2d(0.25)
        )
        # Calculate the expected input size for the linear layer
        ds_size = d_input_dim // 8
        self.linear_input_size = 128 * ds_size ** 2
        self.adv_layer = nn.Sequential(nn.Linear(self.linear_input_size, 1), nn.Sigmoid())

    def forward(self, x):
        print("Discriminator input shape:", x.shape)
        x = self.model(x)
        print("After model shape:", x.shape)
        x = x.view(x.size(0), -1)  # Flatten the output
        print("After flatten shape:", x.shape)
        x = self.adv_layer(x)
        return x

In [27]:
# build model
G = Generator().to(device)
D = Discriminator().to(device)

# define loss
criterion = nn.BCELoss()
z_dim = 100

# optimiser
lr = 0.0001
G_optimizer = torch.optim.Adam(G.parameters(), lr = lr)
D_optimizer = torch.optim.Adam(D.parameters(), lr = lr)

In [28]:
class NumpyDataset(Dataset):

    def __init__(self, data_path, transform=None):
        self.data = np.load(data_path).astype(np.float32)
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        image = self.data[idx]
        image = torch.from_numpy(image)
        if self.transform:
          image = self.transform(image)
        return image

In [21]:
# DataLoader

# Convert your training data to PyTorch tensors
# train_data = torch.tensor(train, dtype=torch.float32)
# test_data = torch.tensor(test, dtype=torch.float32)

# Create a TensorDataset from your training data

transform = transforms.Compose([transforms.ToPILImage(),
                                transforms.Resize(256),
                                transforms.ToTensor(),
                                transforms.Normalize(mean=(0.5), std=(0.5))])

train_data = NumpyDataset('/content/gdrive/MyDrive/Data/Ferguson_fire_train.npy', transform=transform)
test_data = NumpyDataset('/content/gdrive/MyDrive/Data/Ferguson_fire_test.npy', transform=transform)

# Specify batch size
#with depreceated functionality in torch, this needs to be an even divisible of
#the total input size
batch_size = 100

# Create a DataLoader for your training dataset
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=True)

In [29]:
def D_train(x):
    D.train()
    D_optimizer.zero_grad()

    # train discriminator on real data -- assign high score (use 1 here)
    x_real, y_real = x.view(-1, 1, 256, 256), torch.ones(batch_size, 1)  # we are assigning the label 'real data' to the samples (don't care anymore about what number they are)
    x_real, y_real = x_real.to(device), y_real.to(device)

    D_output = D(x_real)
    D_real_loss = criterion(D_output, y_real)

    # train discriminator on fake data -- assign low score (use 0 here)
    # sample vector and produce generator output
    z = torch.randn(batch_size, z_dim).to(device)
    x_fake, y_fake = G(z), torch.zeros(batch_size, 1).to(device)

    D_output = D(x_fake)
    D_fake_loss = criterion(D_output, y_fake)

    # combine the losses
    D_loss = D_real_loss + D_fake_loss

    # model update
    D_loss.backward()
    D_optimizer.step()

    return  D_loss.data.item()  ### deprecated version of loss.detach(), basically gets access to the tensor without the computational graph attached


In [30]:
def G_train(x):
    G.train()
    G_optimizer.zero_grad()

    # sample vector and produce generator output
    z = torch.randn(batch_size, z_dim).to(device)
    G_output = G(z)

    # obtain scores from D for the generated data
    D_output = D(G_output)

    # train generator to "fool" discriminator
    y = torch.ones(batch_size, 1).to(device)
    # G_loss = criterion(D_output, y)
    G_loss = criterion(D_output, torch.ones_like(D_output))

    # model update
    G_loss.backward()
    G_optimizer.step()

    return G_loss.data.item()  ### deprecated version of loss.detach(), basically gets access to the tensor without the computational graph attached


In [31]:
n_epoch = 20
groups = {'Loss': ['D_Loss', 'G_Loss']}
liveloss = PlotLosses(groups=groups)

for epoch in range(1, n_epoch + 1):
    D_losses, G_losses = [], []
    logs = {}

    for batch_idx, real_images in enumerate(train_loader):
        real_images = real_images.view(-1, 256, 256).to(device)

        D_loss = D_train(real_images)
        # G_loss = G_train(batch_size)
        G_loss = G_train(real_images.size(0))

        D_losses.append(D_loss)
        G_losses.append(G_loss)

    logs['D_Loss'] = torch.tensor(D_losses).mean().item()
    logs['G_Loss'] = torch.tensor(G_losses).mean().item()

    liveloss.update(logs)
    liveloss.draw()

    # save every 20th epochs
    if(np.mod(epoch, 20) == 0):
      torch.save(G.state_dict(), "./exp/Generator_{:03d}.pth".format(epoch))

Discriminator input shape: torch.Size([100, 1, 256, 256])
After model shape: torch.Size([100, 128, 32, 32])
After flatten shape: torch.Size([100, 131072])
Generator input shape: torch.Size([100, 100])
After fc1 shape: torch.Size([100, 131072])
After view shape: torch.Size([100, 128, 32, 32])
After conv_blocks shape: torch.Size([100, 1, 128, 128])
Discriminator input shape: torch.Size([100, 1, 128, 128])
After model shape: torch.Size([100, 128, 16, 16])
After flatten shape: torch.Size([100, 32768])


RuntimeError: mat1 and mat2 shapes cannot be multiplied (100x32768 and 131072x1)

In [None]:
with torch.no_grad():
    test_z = torch.randn(batch_size, z_dim).to(device)
    generated = G(test_z)

    # save_image(generated.view(generated.size(0), 1, 28, 28), './sample_' + '.png')
fig, axarr = plt.subplots(10, 10, figsize=(12, 12))
for ax, img in zip(axarr.flatten(), generated.view(generated.size(0), 28, 28).cpu()):
  ax.imshow(img, cmap="gray")
plt.title('Epoch = {:03d}'.format(epoch))