## Import

In [1]:
import numpy as np
import json
import torch
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import fastai
from fastai import *
from fastai.data.core import DataLoaders
from fastai.learner import Learner
from fastai.losses import CrossEntropyLossFlat
from fastai.metrics import accuracy
from fastai.optimizer import Adam
from fastai.callback.progress import ProgressCallback
from tqdm.notebook import tqdm
from sklearn.model_selection import train_test_split

from parse_preprocessed_data import get_inputs_and_targets
from LSTM_Model import LSTMModel

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

## Hyper-Parameters

In [2]:
seq_length = 50

hidden_size = 128
learning_rate = 2e-3
dropout = 0.1
batch_size = 100
num_layers = 3
max_epochs = 20
validation_prop = 0.2

early_stopping = True
patience = 20

## Load Data

In [3]:
char_to_ix, ix_to_char, vocab_size, inputs, targets = get_inputs_and_targets('data_preprocessed/mario.txt', seq_length)
vocab_size, inputs.shape, targets.shape

Unique chars: ['\n', '-', '<', '>', '?', 'B', 'E', 'Q', 'S', 'X', '[', ']', 'b', 'o', 'x']
Number of unique chars: 15


  0%|          | 0/37 [00:00<?, ?it/s]

(15, (124700, 50, 15), (124700, 50))

In [4]:
first_three_cols = inputs[0][:3 * 17]
np.savetxt('data_preprocessed/seed.txt', first_three_cols)

In [5]:
with open('data_preprocessed/char_to_ix.json', 'w+') as json_f:
    json.dump(char_to_ix, json_f)

with open('data_preprocessed/ix_to_char.json', 'w+') as json_f:
    json.dump(ix_to_char, json_f)

In [6]:
inputs.shape,inputs.dtype

((124700, 50, 15), dtype('float64'))

In [7]:
targets.shape, targets.dtype

((124700, 50), dtype('int32'))

In [8]:
# Define custom dataset class
class CustomDataset(Dataset):
    def __init__(self, inputs, targets):
        self.inputs = inputs
        self.targets = targets

    def __len__(self):
        return len(self.inputs)

    def __getitem__(self, idx):
        return self.inputs[idx], self.targets[idx]

train_inputs, valid_inputs, train_targets, valid_targets = train_test_split(inputs, targets, test_size=0.2)

# Define your training and validation datasets and DataLoaders
train_dataset = CustomDataset(train_inputs, train_targets)
valid_dataset = CustomDataset(valid_inputs, valid_targets)

train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle='true')
valid_dataloader = DataLoader(valid_dataset, batch_size=batch_size)

# Combine your training and validation DataLoaders into a DataLoaders object
dls = DataLoaders(train_dataloader, valid_dataloader)

## Model Callbacks

In [9]:
def custom_loss(y_pred, y_true):
    return F.cross_entropy(
        y_pred.view(-1, vocab_size),
        y_true.long().view(-1)
    )

In [10]:
def custom_acc(y_pred, y_true):
    pred_classes = torch.argmax(y_pred.view(-1, vocab_size), dim=1)
    true_classes = y_true.view(-1)
    class_equality = torch.eq(pred_classes, true_classes).float()
    return torch.mean(class_equality)

## Model

In [11]:
# Initialize model
model = LSTMModel(vocab_size=vocab_size, hidden_size=hidden_size, num_layers=num_layers, dropout=dropout).to(device)

In [12]:
learn = Learner(dls, model, opt_func =Adam, loss_func=custom_loss, metrics=custom_acc)

## Train Model

In [13]:
learn.fit(n_epoch=max_epochs, lr = learning_rate, cbs=ProgressCallback())

epoch,train_loss,valid_loss,custom_acc,time


epoch,train_loss,valid_loss,custom_acc,time
0,2.656437,2.654948,0.241719,00:06
1,2.651501,2.650635,0.263699,00:06
2,2.648332,2.647657,0.257255,00:05
3,2.647376,2.646185,0.266928,00:05
4,2.646738,2.645473,0.304301,00:06
5,2.644467,2.643057,0.334666,00:06
6,2.642945,2.642263,0.377731,00:05
7,2.642976,2.64166,0.423212,00:05
8,2.642208,2.641537,0.446433,00:06
9,2.642305,2.640997,0.451732,00:06


In [14]:
seed = np.loadtxt('data_preprocessed/seed.txt', dtype=float)[:3*17 - 1].copy()

with open('data_preprocessed/ix_to_char.json', 'r') as json_f:
    ix_to_char = json.load(json_f)
    
with open('data_preprocessed/char_to_ix.json', 'r') as json_f:
    char_to_ix = json.load(json_f)

In [15]:
def onehot_to_string(onehot):
    ints = np.argmax(onehot, axis=-1)
    chars = [ix_to_char[str(ix)] for ix in ints]
    string = "".join(chars)
    char_array = []
    for line in string.rstrip().split('\n')[:-1]:
        if len(line) == 16:
            char_array.append(list(line))
        elif len(line) > 16:
            char_array.append(list(line[:16]))
        elif len(line) < 16:
            char_array.append(['-'] * (16 - len(line)) + list(line))
    char_array = np.array(char_array).T
    string = ""
    for row in char_array:
        string += "".join(row) + "\n"
    return string

In [16]:
seed[17+14] = 0
seed[17+14][char_to_ix['x']] = 1
seed[17*2+14] = 0
seed[17*2+14][char_to_ix['x']] = 1
print(onehot_to_string(seed))

--
--
--
--
--
--
--
--
--
--
--
--
--
--
-x
XX



In [17]:
def get_seed():
    seed = np.loadtxt('data_preprocessed/seed.txt', dtype=float)[:3*17 - 1]
    seed[17+14] = 0
    seed[17+14][char_to_ix['x']] = 1
    seed[17*2+14] = 0
    seed[17*2+14][char_to_ix['x']] = 1
    return seed

In [18]:
seed = get_seed()
seed.shape

(50, 15)

In [19]:
num_levels_to_gen = 10

num_chunks = 10
num_cols_per_chunk = 16
num_rows_per_col = 17
num_chars_to_gen = num_chunks * num_cols_per_chunk * num_rows_per_col - len(seed)
print(num_chars_to_gen)

2670


In [20]:
gen = seed.copy()
for i in tqdm(range(num_chars_to_gen), leave=False):
    # Convert seed to a PyTorch tensor and move it to the same device as the model
    seed_tensor = torch.tensor(seed, dtype=torch.float32).to(device)
    
    # Predict probabilities using the model
    probas = learn.model(seed_tensor)
    probas = probas[:, -1]  # all batches, last timestep
    
    # Generate the next character based on the normalized probabilities
    seed = np.zeros((num_levels_to_gen, 1, vocab_size))
    for b in range(num_levels_to_gen):
        p = probas[b].detach().cpu().numpy()  # Move to CPU and convert to NumPy array
        # Normalize probabilities and handle NaN or near-zero sum
        if np.sum(p) > 0:
            p /= p.sum()
        else:
            # If sum is close to zero or NaN, reset probabilities to uniform distribution
            p = np.ones_like(p) / len(p)
        
        idx = np.random.choice(np.arange(len(p)), p=p)
        seed[b][0] = 0
        seed[b][0][idx] = 1
        
    gen = np.concatenate([gen, seed], axis=1)



  0%|          | 0/2670 [00:00<?, ?it/s]

RuntimeError: For unbatched 2-D input, hx and cx should also be 2-D but got (3-D, 3-D) tensors

In [None]:
gen.shape

In [None]:
for i, g in enumerate(gen):
    with open(f'generated_levels_txt/{i+1}.txt', 'w+') as txt_f:
        txt_f.write(onehot_to_string(g))