In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
import pandas as pd
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
import os, glob, unicodedata, string, random

## Exercise 1: Natural Gas Price Prediction using RNN

Dataset: Daily natural gas prices (nominal dollars) starting from January 1997. Given the last 10 days of prices, the model predicts the 11th day's price.

In [None]:
# Load & preprocess data
df = pd.read_csv("./data/NaturalGasPrice/daily.csv").dropna()
y = df['Price'].values
minm, maxm = y.min(), y.max()
y_norm = (y - minm) / (maxm - minm)
seq_len = 10
X = np.array([y_norm[i:i+seq_len] for i in range(len(y_norm)-seq_len)])
Y = np.array([y_norm[i+seq_len] for i in range(len(y_norm)-seq_len)])
x_train, x_test, y_train, y_test = train_test_split(X, Y, test_size=0.1, random_state=42, shuffle=False)

class NGTimeSeries(Dataset):
    def __init__(self, x, y): self.x = torch.tensor(x, dtype=torch.float32); self.y = torch.tensor(y, dtype=torch.float32)
    def __getitem__(self, idx): return self.x[idx], self.y[idx]
    def __len__(self): return len(self.x)

train_loader = DataLoader(NGTimeSeries(x_train,y_train), batch_size=256, shuffle=True)

# Define RNN model
class RNNModel(nn.Module):
    def __init__(self): 
        super().__init__()
        self.rnn = nn.RNN(1, 5, 1, batch_first=True)
        self.fc = nn.Linear(5, 1)
    def forward(self, x):
        x = x.unsqueeze(-1)
        out, _ = self.rnn(x)
        return self.fc(torch.relu(out[:,-1,:]))

model = RNNModel()
optimizer = optim.Adam(model.parameters(), lr=1e-4)
criterion = nn.MSELoss()

# Training loop
for epoch in range(1500):
    for xb, yb in train_loader:
        optimizer.zero_grad()
        loss = criterion(model(xb).squeeze(), yb)
        loss.backward()
        optimizer.step()
    if epoch % 50 == 0: print(f"Epoch {epoch} loss: {loss.item():.4f}")

# Evaluation & plots
test_loader = DataLoader(NGTimeSeries(x_test, y_test), batch_size=len(x_test))
with torch.no_grad():
    for xb, yb in test_loader: 
        y_pred = model(xb).squeeze()
plt.figure(figsize=(10,5)); plt.plot(y_pred.numpy(), label='Pred'); plt.plot(yb.numpy(), label='Orig'); plt.legend(); plt.show()
y_pred_denorm = y_pred.numpy()*(maxm-minm)+minm
yb_denorm = yb.numpy()*(maxm-minm)+minm
plt.figure(figsize=(10,5)); plt.plot(yb_denorm, label='Orig'); plt.plot(y_pred_denorm, label='Pred'); plt.legend(); plt.show()

## Exercise 2: Name Classification with RNN

Dataset: A collection of names stored in files (each file corresponds to a language). Train an RNN on surnames from 18 languages to predict the language based on name spelling.

In [None]:
# Prepare data
all_letters = string.ascii_letters + " .,;'"
n_letters = len(all_letters)
def unicodeToAscii(s): return ''.join(c for c in unicodedata.normalize('NFD', s) if unicodedata.category(c) != 'Mn' and c in all_letters)
def readLines(filename): return [unicodeToAscii(line) for line in open(filename, encoding='utf-8').read().strip().split('\n')]
categories, cat_lines = [], {}
for fn in glob.glob('./data/names/*.txt'):
    cat = os.path.splitext(os.path.basename(fn))[0]
    categories.append(cat)
    cat_lines[cat] = readLines(fn)
if not categories: raise RuntimeError("No data in ./data/names/")
def nameToTensor(name):
    tensor = torch.zeros(len(name), 1, n_letters)
    for i, ch in enumerate(name): tensor[i][0][all_letters.find(ch)] = 1
    return tensor

# Define RNN classifier
class RNNClassifier(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super().__init__()
        self.rnn = nn.RNN(input_size, hidden_size, 1)
        self.fc = nn.Linear(hidden_size, output_size)
    def forward(self, x):
        h0 = torch.zeros(1,1,hidden_size)
        out, _ = self.rnn(x, h0)
        return self.fc(out[-1])
hidden_size = 128; n_categories = len(categories)
model = RNNClassifier(n_letters, hidden_size, n_categories)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.005)

# Training loop
def randomExample():
    cat = random.choice(categories)
    line = random.choice(cat_lines[cat])
    return torch.tensor([categories.index(cat)], dtype=torch.long), nameToTensor(line), cat, line

for i in range(100000):
    cat_tensor, line_tensor, cat, line = randomExample()
    optimizer.zero_grad()
    output = model(line_tensor)
    loss = criterion(output, cat_tensor)
    loss.backward()
    optimizer.step()
    if i % 5000 == 0:
        guess = categories[torch.argmax(output)]
        print(f"{i}: {loss.item():.4f} {line} => {guess} ({cat})")

def predict(name, topk=3):
    with torch.no_grad():
        output = model(nameToTensor(name))
        topv, topi = output.topk(topk)
        return [(categories[topi[i].item()], topv[i].item()) for i in range(topk)]
print("\nPrediction for 'Satoshi':", predict("Satoshi"))

## Exercise 3: Next Character Prediction using RNN

Dataset: A text file (e.g., `./data/input.txt`). Train an RNN to predict the next character in a sequence.

In [None]:
# Load & prepare text data
with open('./data/input.txt', 'r', encoding='utf-8') as f: text = f.read()
chars = sorted(list(set(text)))
vocab_size = len(chars)
char_to_idx = {ch: i for i, ch in enumerate(chars)}
idx_to_char = {i: ch for i, ch in enumerate(chars)}
seq_len = 50; hidden_size = 128; num_layers = 1; lr = 0.003; epochs = 20; bs = 64
sequences = [ [char_to_idx[ch] for ch in text[i:i+seq_len]] for i in range(len(text)-seq_len) ]
targets = [ char_to_idx[text[i+seq_len]] for i in range(len(text)-seq_len) ]
sequences, targets = np.array(sequences), np.array(targets)

class TextDataset(Dataset):
    def __init__(self, seq, tgt): self.seq, self.tgt = seq, tgt
    def __len__(self): return len(self.seq)
    def __getitem__(self, idx): return torch.tensor(self.seq[idx], dtype=torch.long), torch.tensor(self.tgt[idx], dtype=torch.long)

ds = TextDataset(sequences, targets)
dl = DataLoader(ds, batch_size=bs, shuffle=True)

# Define next-character RNN
class NextCharRNN(nn.Module):
    def __init__(self, vocab, hidden, layers):
        super().__init__()
        self.embed = nn.Embedding(vocab, hidden)
        self.rnn = nn.RNN(hidden, hidden, layers, batch_first=True)
        self.fc = nn.Linear(hidden, vocab)
    def forward(self, x):
        x = self.embed(x)
        out, _ = self.rnn(x)
        return self.fc(out[:,-1,:])

model = NextCharRNN(vocab_size, hidden_size, num_layers)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)

# Training loop
for epoch in range(epochs):
    loss_sum = 0
    for xb, yb in dl:
        optimizer.zero_grad()
        loss = criterion(model(xb), yb)
        loss.backward()
        optimizer.step()
        loss_sum += loss.item()
    print(f"Epoch {epoch+1}/{epochs} loss: {loss_sum/len(dl):.4f}")

# Text generation function
def gen_text(model, start, length=100):
    model.eval()
    inp = torch.tensor([char_to_idx[ch] for ch in start], dtype=torch.long).unsqueeze(0)
    out_text = start
    for _ in range(length):
        with torch.no_grad():
            out = model(inp)
            p = torch.softmax(out, dim=1).squeeze().cpu().numpy()
            char_idx = np.random.choice(len(p), p=p)
            out_text += idx_to_char[char_idx]
            inp = torch.cat([inp[:,1:], torch.tensor([[char_idx]], dtype=torch.long)], dim=1)
    return out_text

print("\nGenerated Text:\n", gen_text(model, "The ", 200))