# Pokemon Model 1 with Context

In [8]:
from typing import Dict, Tuple
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import models, transforms
from torchvision.utils import save_image, make_grid
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation, PillowWriter
import numpy as np
from IPython.display import HTML
from diffusion_utilities import *

import pandas as pd
import os
import shutil
from PIL import Image

# Setting Things Up

In [9]:
class ContextUnet(nn.Module):
    def __init__(self, in_channels, n_feat=256, n_cfeat=10, height=28):  # cfeat - context features
        super(ContextUnet, self).__init__()

        # number of input channels, number of intermediate feature maps and number of classes
        self.in_channels = in_channels
        self.n_feat = n_feat
        self.n_cfeat = n_cfeat
        self.h = height  #assume h == w. must be divisible by 4, so 28,24,20,16...

        # Initialize the initial convolutional layer
        self.init_conv = ResidualConvBlock(in_channels, n_feat, is_res=True)

        # Initialize the down-sampling path of the U-Net with two levels
        self.down1 = UnetDown(n_feat, n_feat)        # down1 #[10, 256, 8, 8]
        self.down2 = UnetDown(n_feat, 2 * n_feat)    # down2 #[10, 256, 4,  4]

         # original: self.to_vec = nn.Sequential(nn.AvgPool2d(7), nn.GELU())
        self.to_vec = nn.Sequential(nn.AvgPool2d((4)), nn.GELU())

        # Embed the timestep and context labels with a one-layer fully connected neural network
        self.timeembed1 = EmbedFC(1, 2*n_feat)
        self.timeembed2 = EmbedFC(1, 1*n_feat)
        self.contextembed1 = EmbedFC(n_cfeat, 2*n_feat)
        self.contextembed2 = EmbedFC(n_cfeat, 1*n_feat)

        # Initialize the up-sampling path of the U-Net with three levels
        self.up0 = nn.Sequential(
            nn.ConvTranspose2d(2 * n_feat, 2 * n_feat, self.h//4, self.h//4), # up-sample
            nn.GroupNorm(8, 2 * n_feat), # normalize
            nn.ReLU(),
        )
        self.up1 = UnetUp(4 * n_feat, n_feat)
        self.up2 = UnetUp(2 * n_feat, n_feat)

        # Initialize the final convolutional layers to map to the same number of channels as the input image
        self.out = nn.Sequential(
            nn.Conv2d(2 * n_feat, n_feat, 3, 1, 1), # reduce number of feature maps   #in_channels, out_channels, kernel_size, stride=1, padding=0
            nn.GroupNorm(8, n_feat), # normalize
            nn.ReLU(),
            nn.Conv2d(n_feat, self.in_channels, 3, 1, 1), # map to same number of channels as input
        )

    def forward(self, x, t, c=None):
        """
        x : (batch, n_feat, h, w) : input image
        t : (batch, n_cfeat)      : time step
        c : (batch, n_classes)    : context label
        """
        # x is the input image, c is the context label, t is the timestep, context_mask says which samples to block the context on

        # pass the input image through the initial convolutional layer
        x = self.init_conv(x)
        # pass the result through the down-sampling path
        down1 = self.down1(x)       #[10, 256, 8, 8]
        down2 = self.down2(down1)   #[10, 256, 4, 4]

        # convert the feature maps to a vector and apply an activation
        hiddenvec = self.to_vec(down2)

        # mask out context if context_mask == 1
        if c is None:
            c = torch.zeros(x.shape[0], self.n_cfeat).to(x)

        # embed context and timestep
        cemb1 = self.contextembed1(c).view(-1, self.n_feat * 2, 1, 1)     # (batch, 2*n_feat, 1,1)
        temb1 = self.timeembed1(t).view(-1, self.n_feat * 2, 1, 1)
        cemb2 = self.contextembed2(c).view(-1, self.n_feat, 1, 1)
        temb2 = self.timeembed2(t).view(-1, self.n_feat, 1, 1)
        #print(f"uunet forward: cemb1 {cemb1.shape}. temb1 {temb1.shape}, cemb2 {cemb2.shape}. temb2 {temb2.shape}")


        up1 = self.up0(hiddenvec)
        up2 = self.up1(cemb1*up1 + temb1, down2)  # add and multiply embeddings
        up3 = self.up2(cemb2*up2 + temb2, down1)
        out = self.out(torch.cat((up3, x), 1))
        return out


In [10]:
# hyperparameters

# diffusion hyperparameters
timesteps = 500
beta1 = 1e-4
beta2 = 0.02

# network hyperparameters
device = torch.device("cuda:0" if torch.cuda.is_available() else torch.device('cpu'))
#device = torch.device('cpu')
n_feat = 256 # 64 hidden dimension feature
n_cfeat = 8 # context vector is of size 5
height = 16 # 16x16 image
save_dir = './weights/'

# training hyperparameters
batch_size = 100
n_epoch = 32
lrate=1e-3

In [11]:
# construct DDPM noise schedule
b_t = (beta2 - beta1) * torch.linspace(0, 1, timesteps + 1, device=device) + beta1
a_t = 1 - b_t
ab_t = torch.cumsum(a_t.log(), dim=0).exp()
ab_t[0] = 1

In [12]:
# construct model
nn_model = ContextUnet(in_channels=1, n_feat=n_feat, n_cfeat=n_cfeat, height=height).to(device)

# Preprocessing Images

In [13]:
# Path to the main folder containing the Pokemon subfolders
main_folder = 'PokemonData'

# Function to rename files in subfolders
def rename_files():
    for pokemon_folder in os.listdir(main_folder):
        pokemon_path = os.path.join(main_folder, pokemon_folder)
        if os.path.isdir(pokemon_path):
            file_list = os.listdir(pokemon_path)
            # Sort the file list to ensure consistency in numbering
            file_list.sort()
            # Counter for the sequential serial number
            serial_number = 1
            for file_name in file_list:
                file_path = os.path.join(pokemon_path, file_name)
                # Get the file extension
                _, extension = os.path.splitext(file_name)
                # Construct the new file name with sequential serial number
                new_file_name = f"{pokemon_folder}_{serial_number:03}{extension}"
                new_file_path = os.path.join(pokemon_path, new_file_name)
                # Rename the file
                os.rename(file_path, new_file_path)
                serial_number += 1

# Function to move files to the main folder and delete subfolders
def move_files_and_delete_folders():
    for pokemon_folder in os.listdir(main_folder):
        pokemon_path = os.path.join(main_folder, pokemon_folder)
        if os.path.isdir(pokemon_path):
            for file_name in os.listdir(pokemon_path):
                file_path = os.path.join(pokemon_path, file_name)
                # Move files to the main folder
                new_file_path = os.path.join(main_folder, f"{pokemon_folder}_{file_name}")
                shutil.move(file_path, new_file_path)
            # Remove empty subfolders
            os.rmdir(pokemon_path)

# Function to resize all images
def resize_images_and_delete_invalid():
    for file_name in os.listdir(main_folder):
        file_path = os.path.join(main_folder, file_name)
        if os.path.isfile(file_path):
            try:
                # Open the image file
                with Image.open(file_path) as img:
                    # Resize the image while maintaining the aspect ratio
                    resized_img = img.resize((96, 96), Image.ANTIALIAS)
                    # Save the resized image, overwrite the original file
                    resized_img.save(file_path)
            except Exception as e:
                print(f"Error resizing {file_name}: {e}")
                # Remove the file if resizing fails
                os.remove(file_path)

In [14]:
# Call the function to rename files in subfolders
rename_files()

# Call the function to move files and delete subfolders
move_files_and_delete_folders()

# Call the function to resize images and delete subfolders
resize_images_and_delete_invalid()

  resized_img = img.resize((96, 96), Image.ANTIALIAS)


In [15]:
def load_images_to_dataframe():
    data = []
    for file_name in os.listdir(main_folder):
        file_path = os.path.join(main_folder, file_name)
        if os.path.isfile(file_path):
            try:
                # Open the image file
                with Image.open(file_path) as img:
                    # Convert the image to grayscale
                    img = img.convert('L')
                    # Get the pixel values as a list [of tuples (R, G, B)]
                    pixel_values = list(img.getdata())
                    # Get the pixel values as a NumPy array
                    #pixel_values = np.array(img)
                    # Append the pixel values along with the Pokemon name to the data list
                    data.append([file_name.split('_')[0]] + pixel_values)
            except Exception as e:
                print(f"Error loading {file_name}: {e}")

    # Create a DataFrame with columns 'Pokemon' and pixel values
    columns = ['Pokemon'] + [f"Pixel_{i}" for i in range(len(data[0]) - 1)]
    df = pd.DataFrame(data, columns=columns)
    return df

# Call the function to load images into a DataFrame
image_dataframe = load_images_to_dataframe()


In [16]:
# Path to the main folder containing the resized images
main_folder = 'PokemonData'

def load_images_as_array():
    image_list = []
    for file_name in os.listdir(main_folder):
        file_path = os.path.join(main_folder, file_name)
        if os.path.isfile(file_path):
            try:
                # Open the image file
                with Image.open(file_path) as img:
                    # Convert the image to grayscale and resize to (96, 96)
                    img = img.convert('L').resize((32, 32), Image.ANTIALIAS)
                    # Convert the image to a numpy array
                    img_array = np.array(img)
                    # Append the image array to the list
                    image_list.append(img_array)
            except Exception as e:
                print(f"Error processing {file_name}: {e}")

    # Convert the list of image arrays to a single numpy array
    images_array = np.array(image_list)
    return images_array

# Call the function to load images as a numpy array
loaded_images = load_images_as_array()

# Check the shape of the resulting numpy array
print(f"Shape of the loaded images array: {loaded_images.shape}")

Shape of the loaded images array: (229, 32, 32)


  img = img.convert('L').resize((32, 32), Image.ANTIALIAS)


# Creating Input and Label files

In [17]:
#DataFrame of Pokémon and their types
data = {
    'Pokemon': ['Bulbasaur', 'Charmander', 'Squirtle', 'Caterpie', 'Pidgey'],
    'Type': [['Grass', 'Poison'], ['Fire'], ['Water'], ['Bug'], ['Normal', 'Flying']]
}

df = pd.DataFrame(data)
#print(df)

In [18]:
# List of all unique types
all_types = ['Grass', 'Poison', 'Fire', 'Water', 'Bug', 'Normal', 'Flying', 'Dragon']

# Create a DataFrame with one-hot encoded types for each Pokemon
one_hot_encoded = pd.DataFrame(columns=['Pokemon'] + all_types)
one_hot_encoded['Pokemon'] = df['Pokemon']

# Function to update the one-hot encoded DataFrame
def update_one_hot(row):
    types = row['Type']
    for type_val in types:
        one_hot_encoded.at[row.name, type_val] = 1

# Apply the function to update the one-hot encoded DataFrame
df.apply(update_one_hot, axis=1)

# Fill NaN values with 0
one_hot_encoded = one_hot_encoded.fillna(0)

# Merge the image_dataframe and one_hot_encoded DataFrames on the 'Pokemon' column
one_hot_encoded = pd.merge(image_dataframe, one_hot_encoded, on='Pokemon')

#print(one_hot_encoded)

In [19]:
image_dataframe = image_dataframe.drop(columns=['Pokemon'])
one_hot_encoded = one_hot_encoded.drop(columns=['Pokemon'])
one_hot_encoded = one_hot_encoded.iloc[:,9216:]

In [20]:
np.save('train_images.npy', loaded_images)
np.save('one_hot_train_labels.npy', one_hot_encoded.to_numpy())

# Training with Context

In [21]:
# construct/reset neural network
nn_model = ContextUnet(in_channels=1, n_feat=n_feat, n_cfeat=n_cfeat, height=height).to(device)

# load dataset
dataset = CustomDataset("train_images.npy", "one_hot_train_labels.npy", transform, null_context=False)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=1)

# re setup optimizer
optim = torch.optim.Adam(nn_model.parameters(), lr=lrate)

sprite shape: (229, 32, 32)
labels shape: (229, 8)


In [15]:
# helper function: perturbs an image to a specified noise level
def perturb_input(x, t, noise):
    return ab_t.sqrt()[t, None, None, None] * x + (1 - ab_t[t, None, None, None]) * noise

In [16]:
# training with context code
# set into train mode
nn_model.train()

for ep in range(n_epoch):
    print(f'epoch {ep}')

    # linearly decay learning rate
    optim.param_groups[0]['lr'] = lrate*(1-ep/n_epoch)

    pbar = tqdm(dataloader, mininterval=2 )
    for x, c in pbar:   # x: images  c: context
        optim.zero_grad()
        x = x.to(device)
        c = c.to(x)

        # randomly mask out c
        context_mask = torch.bernoulli(torch.zeros(c.shape[0]) + 0.9).to(device)
        c = c * context_mask.unsqueeze(-1)

        # perturb data
        noise = torch.randn_like(x)
        t = torch.randint(1, timesteps + 1, (x.shape[0],)).to(device)
        x_pert = perturb_input(x, t, noise)

        # use network to recover noise
        pred_noise = nn_model(x_pert, t / timesteps, c=c)

        # loss is mean squared error between the predicted and true noise
        loss = F.mse_loss(pred_noise, noise)
        loss.backward()

        optim.step()

    # save model periodically
    if ep%4==0 or ep == int(n_epoch-1):
        if not os.path.exists(save_dir):
            os.mkdir(save_dir)
        torch.save(nn_model.state_dict(), save_dir + f"context_model_{ep}.pth")
        print('saved model at ' + save_dir + f"context_model_{ep}.pth")

epoch 0


100%|██████████| 3/3 [00:03<00:00,  1.02s/it]


saved model at ./weights/context_model_0.pth
epoch 1


100%|██████████| 3/3 [00:01<00:00,  2.13it/s]


epoch 2


100%|██████████| 3/3 [00:01<00:00,  2.09it/s]


epoch 3


100%|██████████| 3/3 [00:01<00:00,  2.05it/s]


epoch 4


100%|██████████| 3/3 [00:01<00:00,  2.06it/s]


saved model at ./weights/context_model_4.pth
epoch 5


100%|██████████| 3/3 [00:01<00:00,  2.16it/s]


epoch 6


100%|██████████| 3/3 [00:01<00:00,  2.05it/s]


epoch 7


100%|██████████| 3/3 [00:01<00:00,  2.04it/s]


epoch 8


100%|██████████| 3/3 [00:01<00:00,  2.02it/s]


saved model at ./weights/context_model_8.pth
epoch 9


100%|██████████| 3/3 [00:01<00:00,  2.05it/s]


epoch 10


100%|██████████| 3/3 [00:01<00:00,  2.06it/s]


epoch 11


100%|██████████| 3/3 [00:01<00:00,  2.01it/s]


epoch 12


100%|██████████| 3/3 [00:01<00:00,  2.01it/s]


saved model at ./weights/context_model_12.pth
epoch 13


100%|██████████| 3/3 [00:01<00:00,  2.14it/s]


epoch 14


100%|██████████| 3/3 [00:01<00:00,  2.01it/s]


epoch 15


100%|██████████| 3/3 [00:01<00:00,  2.00it/s]


epoch 16


100%|██████████| 3/3 [00:01<00:00,  2.00it/s]


saved model at ./weights/context_model_16.pth
epoch 17


100%|██████████| 3/3 [00:01<00:00,  2.01it/s]


epoch 18


100%|██████████| 3/3 [00:01<00:00,  1.99it/s]


epoch 19


100%|██████████| 3/3 [00:01<00:00,  2.02it/s]


epoch 20


100%|██████████| 3/3 [00:01<00:00,  1.98it/s]


saved model at ./weights/context_model_20.pth
epoch 21


100%|██████████| 3/3 [00:01<00:00,  2.10it/s]


epoch 22


100%|██████████| 3/3 [00:01<00:00,  1.99it/s]


epoch 23


100%|██████████| 3/3 [00:01<00:00,  1.96it/s]


epoch 24


100%|██████████| 3/3 [00:01<00:00,  1.97it/s]


saved model at ./weights/context_model_24.pth
epoch 25


100%|██████████| 3/3 [00:01<00:00,  2.06it/s]


epoch 26


100%|██████████| 3/3 [00:01<00:00,  1.96it/s]


epoch 27


100%|██████████| 3/3 [00:01<00:00,  2.01it/s]


epoch 28


100%|██████████| 3/3 [00:01<00:00,  1.97it/s]


saved model at ./weights/context_model_28.pth
epoch 29


100%|██████████| 3/3 [00:01<00:00,  2.11it/s]


epoch 30


100%|██████████| 3/3 [00:01<00:00,  1.96it/s]


epoch 31


100%|██████████| 3/3 [00:01<00:00,  1.95it/s]


saved model at ./weights/context_model_31.pth


In [23]:
# load in pretrain model weights and set to eval mode
nn_model.load_state_dict(torch.load(f"{save_dir}/context_model_31.pth", map_location=device))
nn_model.eval()
print("Loaded in Context Model")

Loaded in Context Model


# Sampling with Context

In [24]:
# helper function; removes the predicted noise (but adds some noise back in to avoid collapse)
def denoise_add_noise(x, t, pred_noise, z=None):
    if z is None:
        z = torch.randn_like(x)
    noise = b_t.sqrt()[t] * z
    mean = (x - pred_noise * ((1 - a_t[t]) / (1 - ab_t[t]).sqrt())) / a_t[t].sqrt()
    return mean + noise

In [25]:
# sample with context using standard algorithm
@torch.no_grad()
def sample_ddpm_context(n_sample, context, save_rate=20):
    # x_T ~ N(0, 1), sample initial noise
    samples = torch.randn(n_sample, 1, height, height).to(device)

    # array to keep track of generated steps for plotting
    intermediate = []
    for i in range(timesteps, 0, -1):
        print(f'sampling timestep {i:3d}', end='\r')

        # reshape time tensor
        t = torch.tensor([i / timesteps])[:, None, None, None].to(device)

        # sample some random noise to inject back in. For i = 1, don't add back in noise
        z = torch.randn_like(samples) if i > 1 else 0

        eps = nn_model(samples, t, c=context)    # predict noise e_(x_t,t, ctx)
        samples = denoise_add_noise(samples, i, eps, z)
        if i % save_rate==0 or i==timesteps or i<8:
            intermediate.append(samples.detach().cpu().numpy())

    intermediate = np.stack(intermediate)
    return samples, intermediate

In [26]:
# visualize samples with randomly selected context
plt.clf()
ctx = F.one_hot(torch.randint(0, 5, (32,)), 5).to(device=device).float()
samples, intermediate = sample_ddpm_context(20, ctx)
animation_ddpm_context = plot_sample(intermediate,20,4,save_dir, "ani_run", None, save=False)
HTML(animation_ddpm_context.to_jshtml())

gif animating frame 31 of 32

<Figure size 640x480 with 0 Axes>

In [28]:
# Save the animation as a video file
#video_path = 'Pokemon_samples_animation.mp4'
#animation_ddpm_context.save(video_path, fps=30, extra_args=['-vcodec', 'libx264'])

# User Defined Context

In [21]:
def show_images(imgs, nrow=2):
    _, axs = plt.subplots(nrow, imgs.shape[0] // nrow, figsize=(4,2 ))
    axs = axs.flatten()
    for img, ax in zip(imgs, axs):
        img = (img.permute(1, 2, 0).clip(-1, 1).detach().cpu().numpy() + 1) / 2
        ax.set_xticks([])
        ax.set_yticks([])
        ax.imshow(img)
    plt.show()

In [25]:
# user defined context
ctx = torch.tensor([
    # Arcanine, Alolan-Sandslash, Abra, Aerodactyl, Arbok
    [0,0,1,0,0,0,0,0],
]).float().to(device)
samples, _ = sample_ddpm_context(ctx.shape[0], ctx)
show_images(samples)



ValueError: ignored

<Figure size 400x200 with 0 Axes>

In [None]:
# mix of defined context
ctx = torch.tensor([
    # Arcanine, Alolan-Sandslash, Abra, Aerodactyl, Arbok
    [1,0,0,0,0],      #Arcanine
    [1,0,0.6,0,0],
    [0,0,0.6,0.4,0],
    [1,0,0,0,1],
    [1,1,0,0,0],
    [1,0,0,1,0]
]).float().to(device)
samples, _ = sample_ddpm_context(ctx.shape[0], ctx)
show_images(samples)