### Imports

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import numpy as np
from tqdm import tqdm

import itertools
import random

device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)

print(f'Device = {device}')

### Data Preprocessing

In [None]:
class DataPreprocessing:
    def __init__(self, file_path, seq_length, batch_size):
        self.file_path = file_path
        self.seq_length = seq_length
        self.batch_size = batch_size
        self.num_seq = None
        self.data = None
        self.X_data = None
        self.Y_data = None
        self.K = None
        self.char_to_ind = None
        self.ind_to_char = None
    
    def load_data(self):
        """Prepares all the data necessary to train a model
        """
        fid = open(self.file_path, "r")
        book_data = fid.read()
        fid.close()
        self.data = book_data
        unique_chars = list(set(book_data))
        K = len(unique_chars)
        self.K = K
        mapping_value = np.arange(K)
        char_to_ind = dict(zip(unique_chars, mapping_value))
        ind_to_char = dict(zip(mapping_value, unique_chars))
        self.char_to_ind = char_to_ind
        self.ind_to_char = ind_to_char

    def get_one_hot_encoding(self, X_chars):
        """Encodes text as a one hot array

        Args:
            char_to_ind (dict): the mapping
            X_chars (string): characters to encode

        Returns:
            np.ndarray: one-hot encoding
        """
        seq_length = len(X_chars)
        one_hot = np.zeros((self.K, seq_length))
        for i, char in enumerate(X_chars):
            ind = self.char_to_ind[char]
            one_hot[ind, i] = 1
        return one_hot

    def get_decoded_one_hot(self, Y):
        """Decodes one-hot array back to text

        Args:
            ind_to_char (dict): the mapping
            Y (np.ndarray): one-hot encoding

        Returns:
            string: the decoded text
        """
        text = ''
        for t in range(Y.shape[1]):
            char_max = np.argmax(Y[:, t])
            text += self.ind_to_char[char_max]
        return text

    def preprocess(self):
        """Prepares the data as a tuple of inputs and targets 
        """
        encoded_data = self.get_one_hot_encoding(self.data)
        num_sequences = (len(self.data)-1) // self.seq_length # discarding the tail
        self.num_seq = num_sequences
        sequences_X = []
        sequences_Y = []
        t = 0 # pointer in text
        for seq in range(num_sequences):
            inputs = encoded_data[:, t: t+self.seq_length]
            targets = encoded_data[:, t+1: t+self.seq_length+1]
            sequences_X.append(inputs)
            sequences_Y.append(targets)
            t += self.seq_length
        self.X_data = np.concatenate(sequences_X, axis=1)
        self.Y_data = np.concatenate(sequences_Y, axis=1)

### Network architecture

In [None]:

class RNNModel(nn.Module):
    """Creates a 1 layer RNN model, first there is a recurrent layer, 
    then a fully connected output layer. The activation function used is tanh.

    Args:
        nn (RNN): RNN model
    """
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super().__init__()
        self.rnn = nn.RNN(input_size, hidden_size, batch_first=True)
        self.fc  = nn.Linear(hidden_size, output_size)
        self.hidden_size = hidden_size
        self.num_layers = num_layers

    def forward(self, x, hprev):
        """Forward pass for the model
        Args:
            x (torch.Tensor): text (BATCH_SIZE, SEQ_LENGTH, K)
            hprev (torch.Tensor): previous hidden states (1, BATCH_SIZE, HIDDEN_STATES)

        Returns:
            torch.Tensor, torch.Tensor: probabilities and updated hidden states
        """
        out, hnext = self.rnn(x, hprev)
        logits = self.fc(out)
        return logits, hnext

    def init_hidden(self, batch_size, hidden_size):
        """Initializes a new hidden layers of zeroes

        Args:
            batch_size (int): batches
            hidden_size (int): neurons in the hidden layer

        Returns:
            torch.Tensor: a new hidden layer (1, BATCH_SIZE, HIDDEN_SIZE)
        """
        return torch.zeros(1, batch_size, hidden_size, device=device)


### Training and validation loop

In [None]:
def validation_loop(model, hprev, val_loader, device):
    """Uses a separate dataset to measure performace of the trained model

    Args:
        model (nn.Module): the trained model
        hprev (torch.Tensor): previous hidden states
        val_loader (dataloader): dataloader with the data
        device (device): device for tensors

    Returns:
        mean loss, a list of all losses
    """
    model.eval()
    criterion = torch.nn.CrossEntropyLoss()
    total_loss = 0.0
    smooth_loss = None
    val_loss = []

    with torch.no_grad():
        for inputs, targets in val_loader:
            inputs = inputs.to(device)
            targets = targets.to(device)
            targets_indices = torch.argmax(targets, dim=2) # Shape (BATCH_SIZE, SEQ_LENGTH)

            outputs, hnext = model(inputs, hprev)
            hprev = hnext
            preds = outputs.permute(0,2,1)      # shape (BATCH_SIZE, K, SEQ_LENGTH)
            loss  = criterion(preds, targets_indices)

            total_loss += loss.item()
            if smooth_loss is None:
                smooth_loss = loss.item()
            else: 
                smooth_loss = 0.999 * smooth_loss + 0.001 * loss.item()
            val_loss.append(smooth_loss)

    model.train()
    return total_loss / len(val_loader), val_loss

def train(model, train_params, train_loader, val_loader):
    """Trains the model

    Args:
        model (nn.Model): model to train
        train_params (dict): specific parameters used for training
        train_loader (dataloader): dataloader with training data
        val_loader (dataloader): dataloader with validation data

    Returns:
        Training statistics and losses
    """
    
    num_epochs = train_params['num_epochs']
    batch_size = train_params['batch_size']
    hidden_size = train_params['hidden_size']
    learning_rate = train_params['learning_rate']
    
    model.train()
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(params=model.parameters(), lr=learning_rate)
    update_step = 0
    best_update_step = 0
    best_epoch = 0
    smooth_loss = None
    best_smooth_loss = float('inf')
    train_loss = []
    val_loss = []
    
    for epoch in range(num_epochs):
        hprev = model.init_hidden(batch_size, hidden_size)
        loss = 0.0
        
        for batch_num, (inputs, targets) in enumerate(tqdm(train_loader)):
            inputs = inputs.to(device)
            targets = targets.to(device)
            
            targets_indices = torch.argmax(targets, dim=2) # Shape (BATCH_SIZE, SEQ_LENGTH)# WHY?
            hprev = hprev.detach() # WHY?
                        
            # FORWARD PASS
            optimizer.zero_grad()
            outputs, hnext = model.forward(inputs, hprev) # outputs: shape (BATCH_SIZE, SEQ_LENGTH, K)
            hprev = hnext # update hidden states
            preds = outputs.permute(0,2,1) # shape (BATCH_SIZE, K, SEQ_LENGTH)
            loss  = criterion(preds, targets_indices)
            
            # BACKWARD PASS
            loss.backward()
            
            # Update parameters with ADAM
            optimizer.step()
                    
            if smooth_loss is None:
                smooth_loss = loss.item()
            else: 
                smooth_loss = 0.999 * smooth_loss + 0.001 * loss.item()
            train_loss.append(smooth_loss)
            
            if smooth_loss < best_smooth_loss:
                best_smooth_loss = smooth_loss
                best_epoch = epoch
                best_update_step = update_step
            update_step += 1
            
        # Validation loss
        validation_loss_mean, val_losses = validation_loop(model, hprev, val_loader, device)
        val_loss = val_loss + val_losses
            
    print(f'---Training complete---\nBest model had smooth loss: {best_smooth_loss}, at epoch: {best_epoch}, update step: {best_update_step}')
    return train_loss, val_loss, best_smooth_loss, best_epoch, best_update_step

### Hyperparameters Search Function

In [None]:
def hyperparameter_search(model, 
                          param_grid:dict, 
                          train_func,
                          train_ds,
                          val_ds,
                          device,
                          max_trials:int = None):
    """"
    Performs a ranodm or grid search over the hyperparameters
    Args:
        model (nn.Module): model to train
        param_grid (dict): hyperparameters to search
        train_func (function): function to train the model
        train_ds (TensorDataset): training dataset
        val_ds (TensorDataset): validation dataset
        device (device): device for tensors
        max_trials (int, optional): maximum number of trials. Defaults to None.
    Returns:
        all parameter combinations and print best combination with lowest validation loss
    """ 
 
    results = []
    keys = list(param_grid.keys())
    all_combinations = list(itertools.product(*[param_grid[k] for k in keys]))

    if max_trials:
    
        all_combinations = random.sample(all_combinations, min(max_trials,len(all_combinations)))
        

    for trial_id, values in enumerate(all_combinations):
        # id from0
        trial_config = dict(zip(keys, values))
        
        print(f"\n Trial {trial_id + 1}/{len(all_combinations)}: {trial_config}")

        # create model
        #model_train = model(**trial_config).to(device)
        model_train = model(input_size=DP.K,
               hidden_size=trial_config["hidden_size"],
                num_layers=1,
               output_size=DP.K).to(device)

        # train model
        batch_size = trial_config['batch_size']
        train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=False, drop_last=True)
        val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=False, drop_last=True)
        train_losses ,val_loss, *_ = train_func(model_train, trial_config, train_loader, val_loader)


        # record
        trial_result = trial_config.copy()
        trial_result['val_loss'] = round(sum(val_loss) / len(val_loss), 4)
        results.append(trial_result)
        print(f"Validation loss: {val_loss:.4f}")

        # plot train loss
        plt.figure(figsize=(10,5))
        plt.plot(train_losses)
        plt.title(f"Training Loss for Trial {trial_id + 1}")
        plt.xlabel("Epochs")
        plt.ylabel("Loss")
        plt.grid(True)
        plt.show()
        plt.savefig(f"train_loss_trial_{trial_id + 1}.png")
        plt.close()

        # best result
        best_result = min(results, key=lambda x: x['val_loss'])
        print(f"\n Best Result so far: {best_result}")

    return results

def ablation_experiments(model,
                          param_name,
                            param_values,
                              fixed_config, 
                              train_func, 
                              train_ds,
                                val_ds,
                                  device):
    """
    Performs ablation experiments by varying one parameter at a time
    Args:
        model (nn.Module): model to train
        param_name (str): name of the hyperparameter to vary
        param_values (list): values of the hyperparameter to test
        fixed_config (dict): fixed hyperparameters
        train_func (function): function to train the model
        train_ds (TensorDataset): training dataset
        val_ds (TensorDataset): validation dataset
        device (device): device for tensors         
        Returns:
            results (list): list of results for each parameter value
    """
    results = []

    for val in param_values:
        config = fixed_config.copy()
        config[param_name] = val

        model_train = model(
            input_size=DP.K,
            hidden_size=config["hidden_size"],
            output_size=DP.K,
            num_layers=1  # 固定为单层RNN
        ).to(device)

        train_loader = DataLoader(train_ds, batch_size=config["batch_size"], shuffle=False, drop_last=True)
        val_loader = DataLoader(val_ds, batch_size=config["batch_size"], shuffle=False, drop_last=True)


        train_losses ,val_loss, *_ = train_func(model_train, config, train_loader, val_loader)

        results.append({
            "param_value": val,
            "val_loss": round(sum(val_loss) / len(val_loss), 4)
        })
    print(results)

    return results


### Run pipeline

In [None]:
train_params = {
    'file_path': './shakes.txt',
    'seq_length': 25,
    'batch_size': 128,
    'hidden_size': 256,
    'num_layers': 1,
    'num_epochs': 10,
    'learning_rate': 0.001
}

DP = DataPreprocessing(train_params['file_path'], train_params['seq_length'], train_params['batch_size'])
DP.load_data()
DP.preprocess()

X_data = DP.X_data.T # shape (num_seq * seq_length, K)
Y_data = DP.Y_data.T # shape (num_seq * seq_length, K)
X_data = X_data.reshape(DP.num_seq, train_params['seq_length'], DP.K) # shape (num_seq, seq_length, K)
Y_data = Y_data.reshape(DP.num_seq, train_params['seq_length'], DP.K) # shape (num_seq, seq_length, K)

# 70% train, 15% val, 15% test
X_train, X_test, Y_train, Y_test = train_test_split(
    X_data, Y_data, test_size=0.15, shuffle=False)

X_train, X_val, Y_train, Y_val = train_test_split(
    X_train, Y_train, test_size=0.176, shuffle=False)

X_train_t = torch.from_numpy(X_train).float()
Y_train_t = torch.from_numpy(Y_train).float()

X_val_t   = torch.from_numpy(X_val).float()
Y_val_t   = torch.from_numpy(Y_val).float()

X_test_t  = torch.from_numpy(X_test).float()
Y_test_t  = torch.from_numpy(Y_test).float()

train_ds = TensorDataset(X_train_t, Y_train_t)
val_ds   = TensorDataset(X_val_t,   Y_val_t)
test_ds  = TensorDataset(X_test_t,  Y_test_t)

train_loader = DataLoader(
    train_ds,
    batch_size=train_params['batch_size'],
    shuffle=False,
    drop_last=True
)

val_loader = DataLoader(
    val_ds,
    batch_size=train_params['batch_size'],
    shuffle=False,
    drop_last=True
)

test_loader = DataLoader(
    test_ds,
    batch_size=train_params['batch_size'],
    shuffle=False,
    drop_last=True
)

model = RNNModel(
    input_size=DP.K,
    hidden_size=train_params['hidden_size'],
    num_layers=train_params['num_layers'],
    output_size=DP.K,
).to(device)

train_loss, val_loss, best_smooth_loss, best_epoch, best_update_step = train(model, train_params, train_loader, val_loader)

### Loss graph

In [None]:
# Plot loss history
loss_fig_name = f'loss_fig_loss_{best_smooth_loss}_epoch_{best_epoch}_update_step_{best_update_step}.png'

plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(train_loss, label='Smooth Training Loss')
plt.xlabel('update steps')
plt.ylabel('loss')
plt.legend()
# plt.savefig(loss_fig_name, bbox_inches='tight', facecolor='none', pad_inches=0.1)
plt.show()

# Plot loss history
val_loss_fig_name = f'val_loss_fig_loss_{best_smooth_loss}_epoch_{best_epoch}_update_step_{best_update_step}.png'
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(val_loss, label='Smooth Validation Loss', color='orange')
plt.xlabel('update steps')
plt.ylabel('loss')
plt.legend()
# plt.savefig(loss_fig_name, bbox_inches='tight', facecolor='none', pad_inches=0.1)
plt.show()

### RNN Hyperparameter search

In [None]:
param_grid = {       
    "hidden_size": [128, 256, 512], 
    "learning_rate": [0.001, 0.01, 0.1],
    "batch_size": [16, 32, 64],
    "num_epochs": [10]    
}

results = hyperparameter_search(
    model=RNNModel,
    param_grid=param_grid,
    train_func=train,
    train_ds=train_ds,
    val_ds=val_ds,
    device=device,
    max_trials=25
)

### Ablation Experiments

In [None]:
fixed_config = {
    "hidden_size": 256,
    "learning_rate": 0.001,
    "batch_size": 128,
    "num_epochs": 10
}
hidden_size_results = ablation_experiments(
    model=RNNModel,
    param_name="learning_rate",
    param_values=[0.001, 0.1],
    fixed_config=fixed_config,
    train_func=train,
    train_ds=train_ds,
    val_ds=val_ds,
    device=device
)

### Text synthesis and evaluation metrics

In [None]:
def generate_text_old(
    model,
    DP,
    gen_length: int,
    temperature: float = 0.5,
    device: torch.device = None
) -> str:
    """
    Generate text from an unprompted LSTMmodel using temperature-controlled sampling.

    Args:
        model:       your trained LSTMmodel instance
        DP:   instance of your DataPreprocessing class, with:
                     - get_one_hot_encoding(str) -> np.ndarray of shape (K, 1)
                     - ind_to_char: dict mapping indices back to chars
                     - char_to_ind: dict mapping chars to indices
        gen_length:  number of characters to generate
        temperature: >0. Lower = more conservative, higher = more random
        device:      torch.device (default: model's device)

    Returns:
        A generated string of length `gen_length`.
    """
    if temperature <= 0:
        raise ValueError("Temperature must be > 0")

    model.eval()
    device = device or next(model.parameters()).device
    model.to(device)

    h = torch.zeros(model.num_layers, 1, model.hidden_size, device=device)

    K = len(DP.char_to_ind)
    idx = torch.randint(0, K, (1,), device=device).item()
    last_char = DP.ind_to_char[idx]
    generated = last_char

    # 3) feed that char once to set up hidden state
    oh = DP.get_one_hot_encoding(last_char)            # (K, 1)
    x = torch.tensor(oh.T, dtype=torch.float, device=device).unsqueeze(0)  # (1, 1, K)
    with torch.no_grad():
        _, h = model.rnn(x, h)

    # 4) generate the rest
    for _ in range(gen_length - 1):
        oh = DP.get_one_hot_encoding(last_char)
        x = torch.tensor(oh.T, dtype=torch.float, device=device).unsqueeze(0)
        with torch.no_grad():
            out, h = model.rnn(x, h)       # out: (1, 1, H)
            logits = model.fc(out[:, -1, :])          # (1, K)
            logits = logits / temperature
            probs  = F.softmax(logits, dim=-1)        # (1, K)

        idx = torch.multinomial(probs, num_samples=1).item()
        next_char = DP.ind_to_char[idx]
        generated += next_char
        last_char = next_char

    return generated

def generate_text(model, hprev, DP, text_len, input_text, temperature=0.5):
    
    input_indicies = []
    for char in input_text:
        input_indicies.append(DP.get_one_hot_encoding(char))
    input_indicies = np.concatenate(input_indicies, axis=1) # (K, SEQ_LENGTH)
    input_indicies = input_indicies.T # (SEQ_LENGTH, K)
    input_indicies = np.expand_dims(input_indicies, axis=0) # (BATCH_SIZE, SEQ_LENGTH, K)
    input_indicies = torch.tensor(input_indicies, dtype=torch.float, device=device)    
    generated_text = input_text
    
    model.eval()
    for char_index in range(text_len):
        predictions, hnext = model(input_indicies, hprev) # (BATCH_SIZE, SEQ_LENGTH, K)
        hprev = hnext
        predictions = predictions[0, 0, :] # (K)
        
        predictions = predictions / temperature
        predictions = torch.softmax(predictions, dim=-1)
        prediction_id = torch.multinomial(predictions, num_samples=1).item()
        next_char = DP.ind_to_char[prediction_id]
        generated_text += next_char
        
        # Update the next input char
        input_indicies = torch.zeros(1, 1, DP.K, device=device)
        input_indicies[0, 0, prediction_id] = 1.0
    
    return generated_text

In [None]:
hprev = model.init_hidden(batch_size=1, hidden_size=train_params['hidden_size'])
text = generate_text(model, hprev, DP, 1000, "ROMEO: ", 0.5)
print(text)
hprev = model.init_hidden(batch_size=train_params['batch_size'], hidden_size=train_params['hidden_size'])

mean_test_loss, test_losses = validation_loop(model, hprev, test_loader, device)
print(f'\nFinal mean test accuarcy = {mean_test_loss}')