In [1]:
%matplotlib inline
import torch
import torch.nn as nn
import pandas as pd
import numpy as np
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image
from torch import autograd
from torch.autograd import Variable
from torchvision.utils import make_grid
import matplotlib.pyplot as plt

In [2]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

print('torch version:',torch.__version__)
print('device:', device)

torch version: 2.0.1+cu117
device: cuda


In [117]:
from gym_multi_car_racing import bezier
from gymnasium.utils import seeding

np_random, seed = seeding.np_random(7)

def get_track(control_points=None, mindst=0.08, np_random=np_random):
    if control_points is not None:
        a = np.array(control_points)
        x, y, _ = bezier.get_bezier_curve(a=a, rad=0.2, edgy=0.2, numpoints=40)
    else:
        a = bezier.get_random_points(n=12, scale=PLAYFIELD, mindst=mindst, np_random=np_random) // 30 * 30
        x, y, _ = bezier.get_bezier_curve(a=a, rad=0.2, edgy=0.2, numpoints=40)

    x, y = np.clip(0, x.max(), x), np.clip(0, y.max(), y)

    return a, x, y

In [118]:
def plot_track(track, checkpoints=None):
    # Plot track
    plt.plot(track[:,1], track[:,0], linewidth=9, color=[0.4, 0.4, 0.4])

    # Plot checkpoints
    if checkpoints is not None:
        for i in range(len(checkpoints)):
            y, x = checkpoints[i,:]
            plt.plot(x, y, 'o', markersize=2, color="tab:orange")
            plt.text(x, y, str(i), fontsize=10, color="black")
    
    plt.xlim(-20,360)
    plt.ylim(-20,360)

    ax = plt.gca()
    ax.set_facecolor(np.array([102, 230, 102])/255.)
    ax.grid(False)
    ax.set_xticks([])
    ax.set_yticks([])

In [126]:
def get_image(coordinates, img_size=(32,32)):
    max_value = 330 if coordinates.max() < 330 else coordinates.max()

    if coordinates[:,0].min() < 0:
        coordinates[:,0] = coordinates[:,0] - coordinates[:,0].min()
    if coordinates[:,1].min() < 0:
        coordinates[:,1] = coordinates[:,1] - coordinates[:,1].min()
    coordinates = coordinates // (max_value/(img_size[0]-1))
    coordinates = coordinates.astype(np.int32)

    # Clip to 0-31
    coordinates = np.clip(coordinates, 0, img_size[0]-1)
    
    img = np.zeros(img_size)

    for i in range(coordinates.shape[0]):
        img[coordinates[i,1], coordinates[i,0]] = 1

    return img

In [131]:
# Create new color map
from matplotlib import cm
from matplotlib.colors import ListedColormap, LinearSegmentedColormap

grass_green = np.array([102/255, 230/255, 102/255, 1])
road_gray = np.array([0.4, 0.4, 0.4, 1])
newcolors = np.array([grass_green, road_gray])
new_cmp = ListedColormap(newcolors)

In [191]:
# Data
X = np.load('X_30k.npy') * 30
Y = np.load('complexities_30k.npy')
Y = Y/0.25*10
Y = np.clip(Y, 0, 9)
Y = Y.round(0)

class_num = 10

input_size = 32 # Input size
batch_size = 32  # Batch size

# Model
z_size = 100
generator_layer_size = [256, 512, 1024]
discriminator_layer_size = [1024, 512, 256]

# Training
epochs = 30  # Train epochs
learning_rate = 1e-4

In [192]:
# Get images track images

IMG_SIZE = (32,32)

X_img = []  # Track images

for i in range(X.shape[0]):
    # Generate random track
    checkpoints, x, y = get_track(control_points=X[i], np_random=np_random)
    # Get image from track
    track = np.array([x, y]).T
    X_img.append(get_image(track, img_size=IMG_SIZE))

    print(f"Progress: {i+1}/{X.shape[0]}", end="\r")

X_img = np.array(X_img).astype(np.float32)

Progress: 30000/30000

In [193]:
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms.functional as TF

class TracksDataset(Dataset):
    """Face Landmarks dataset."""

    def __init__(self, tracks, difficulties):
        self.tracks = tracks
        self.difficulties = difficulties

    def __len__(self):
        return len(self.tracks)

    def __getitem__(self, index):
        x = TF.to_tensor(self.tracks[index])
        y = torch.tensor(self.difficulties[index])
        return x, y

In [194]:
# Create training and validation sets
from sklearn.model_selection import train_test_split

X_train, X_val, Y_train, Y_val = train_test_split(X_img, Y, test_size=0.1, random_state=42)

# Create datasets
train_dataset = TracksDataset(X_train, Y_train)
val_dataset = TracksDataset(X_val, Y_val)

In [195]:
# Data loader
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True)

### Generator

In [209]:
class Generator(nn.Module):
    def __init__(self, generator_layer_size, z_size, input_size, class_num):
        super().__init__()
        
        self.z_size = z_size
        self.input_size = input_size
        
        self.label_emb = nn.Embedding(class_num, class_num)
        
        self.model = nn.Sequential(
            nn.Linear(self.z_size + class_num, generator_layer_size[0]),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(generator_layer_size[0], generator_layer_size[1]),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(generator_layer_size[1], generator_layer_size[2]),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(generator_layer_size[2], self.input_size * self.input_size),
            nn.Tanh()
        )
    
    def forward(self, z, labels):
        
        # Reshape z
        z = z.view(-1, self.z_size)
        
        # One-hot vector to embedding vector
        c = self.label_emb(labels)
        
        # Concat image & label
        x = torch.cat([z, c], 1)
        
        # Generator out
        out = self.model(x)
        
        return out.view(-1, self.input_size, self.input_size)

### Discriminator

In [210]:
class Discriminator(nn.Module):
    def __init__(self, discriminator_layer_size, input_size, class_num):
        super().__init__()
        
        self.label_emb = nn.Embedding(class_num, class_num)
        self.input_size = input_size
        
        self.model = nn.Sequential(
            nn.Linear(self.input_size * self.input_size + class_num, discriminator_layer_size[0]),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Dropout(0.3),
            nn.Linear(discriminator_layer_size[0], discriminator_layer_size[1]),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Dropout(0.3),
            nn.Linear(discriminator_layer_size[1], discriminator_layer_size[2]),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Dropout(0.3),
            nn.Linear(discriminator_layer_size[2], 1),
            nn.Sigmoid()
        )
    
    def forward(self, x, labels):
        
        # Reshape fake image
        x = x.view(-1, self.input_size * self.input_size)
        
        # One-hot vector to embedding vector
        c = self.label_emb(labels)
        
        # Concat image & label
        x = torch.cat([x, c], 1)
        
        # Discriminator out
        out = self.model(x)
        
        return out.squeeze()

### Define generator and discriminator

In [211]:
# Define generator
generator = Generator(generator_layer_size, z_size, input_size, class_num).to(device)
# Define discriminator
discriminator = Discriminator(discriminator_layer_size, input_size, class_num).to(device)

In [212]:
# Loss function
criterion = nn.BCELoss()

In [213]:
# Optimizer
g_optimizer = torch.optim.Adam(generator.parameters(), lr=learning_rate)
d_optimizer = torch.optim.Adam(discriminator.parameters(), lr=learning_rate)

In [214]:
def generator_train_step(batch_size, discriminator, generator, g_optimizer, criterion):
    
    # Init gradient
    g_optimizer.zero_grad()
    
    # Building z
    z = Variable(torch.randn(batch_size, z_size)).to(device)
    
    # Building fake labels
    fake_labels = Variable(torch.LongTensor(np.random.randint(0, class_num, batch_size))).to(device)
    
    # Generating fake images
    fake_images = generator(z, fake_labels)
    
    # Disciminating fake images
    validity = discriminator(fake_images, fake_labels)
    
    # Calculating discrimination loss (fake images)
    g_loss = criterion(validity, Variable(torch.ones(batch_size)).to(device))
    
    # Backword propagation
    g_loss.backward()
    
    #  Optimizing generator
    g_optimizer.step()
    
    return g_loss.data

In [215]:
def discriminator_train_step(batch_size, discriminator, generator, d_optimizer, criterion, real_images, labels):
    
    # Init gradient 
    d_optimizer.zero_grad()

    # Disciminating real images
    real_validity = discriminator(real_images, labels)
    
    # Calculating discrimination loss (real images)
    real_loss = criterion(real_validity, Variable(torch.ones(batch_size)).to(device))
    
    # Building z
    z = Variable(torch.randn(batch_size, z_size)).to(device)
    
    # Building fake labels
    fake_labels = Variable(torch.LongTensor(np.random.randint(0, class_num, batch_size))).to(device)
    
    # Generating fake images
    fake_images = generator(z, fake_labels)
    
    # Disciminating fake images
    fake_validity = discriminator(fake_images, fake_labels)
    
    # Calculating discrimination loss (fake images)
    fake_loss = criterion(fake_validity, Variable(torch.zeros(batch_size)).to(device))
    
    # Sum two losses
    d_loss = real_loss + fake_loss
    
    # Backword propagation
    d_loss.backward()
    
    # Optimizing discriminator
    d_optimizer.step()
    
    return d_loss.data

## Train

In [216]:
for epoch in range(epochs):
    
    print('Starting epoch {}...'.format(epoch+1))
    
    for i, (points, labels) in enumerate(train_loader):
        
        # Train data
        real_points = Variable(points).to(device).long()
        labels = Variable(labels).to(device).long()
        
        # Set generator train
        generator.train()
        
        # Train discriminator
        d_loss = discriminator_train_step(len(real_points), discriminator,
                                          generator, d_optimizer, criterion,
                                          real_points, labels)
        
        # Train generator
        g_loss = generator_train_step(batch_size, discriminator, generator, g_optimizer, criterion)
    
    # Set generator eval
    generator.eval()
    
    print('g_loss: {}, d_loss: {}'.format(g_loss, d_loss))
    
    # Building z 
    z = Variable(torch.randn(class_num-1, z_size)).to(device)
    
    # Labels 0 ~ 8
    labels = Variable(torch.LongTensor(np.arange(class_num-1))).to(device)
    
    # Generating points
    sample_points = generator(z, labels).unsqueeze(1).data.cpu()

    break

    grid = make_grid(sample_points, nrow=3, normalize=True).permute(1,2,0).numpy()
    plt.imshow(grid)
    plt.show()

    # plt.figure(figsize=(4,4))
    # # Show points
    # for i in range(9):
    #     plt.subplot(3,3,i+1)
    #     points = sample_points[i][0].cpu().numpy().reshape(12,2)
    #     a, x, y = get_track(points*300)
    #     track = np.array([x, y]).T
    #     track_img = get_image(track, img_size=(64,64))
    #     plt.imshow(track_img, cmap=new_cmp)
    #     plt.axis('off')
    # plt.tight_layout()
    # plt.show()

Starting epoch 1...
g_loss: 2.166813611984253, d_loss: 0.553512454032898
