In [None]:
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn.functional as F
from utils import load_data

In [None]:


words = open("names.txt", "r").read().splitlines()

all_letters = []
for word in words:
  for letter in word:
    all_letters.append(letter)

all_letters = sorted(list(set(all_letters)))
print(all_letters)

stoi = {letter: i+1 for i, letter in enumerate(all_letters)}
itos = {i+1: letter for i, letter in enumerate(all_letters)}
stoi["."] = 0
itos[0] = "."
print(stoi)
print(itos)

N = np.zeros((len(stoi), len(stoi)), dtype=np.int64)

print(N.shape)

b = {}
for word in words:
  chs = ["."] + list(word) + ["."]
  for ch1, ch2 in zip(chs, chs[1:]):
    bigram = (ch1, ch2)
    b[bigram] = b.get(bigram, 0) + 1

sorted(list(b.items()), key=lambda x: x[1], reverse=True)[:10]

for bigram, count in b.items():
  N[stoi[bigram[0]], stoi[bigram[1]]] = count

In [None]:

plt.figure(figsize=(16,16))
plt.imshow(N, cmap="Blues")
for i in range(27):
  for j in range(27):
    chstr = itos[i] + itos[j]
    plt.text(j, i, chstr, ha="center", va="bottom", color="black")
    plt.text(j, i, N[i, j], ha="center", va="top", color="black")
plt.show()

In [None]:
N = N.astype(np.float64)
P = N.sum(axis=1, keepdims=True)
P = N / P

P = torch.tensor(P)

for _ in range(10):
  i = 0
  word = ""
  while True:
    i = torch.multinomial(P[i], num_samples=1, replacement=True).item()
    word += itos[i]
    if i == 0:
      break
  print(word)

In [None]:
block_size = 3

X, Y, stoi, itos = load_data("names.txt", block_size=block_size)

X = torch.tensor(X["train"])
Y = torch.tensor(Y["train"]).view(-1)
print("X", X.shape)
print("Y", Y.shape)


In [None]:
embedding_dim = 100
hidden_dim = 128

device = "cpu"

# Check if CUDA is available
if torch.cuda.is_available():
    device = torch.device("cuda")
    print("Using CUDA:", torch.cuda.get_device_name(0))

# Check if MPS is available (for Mac)
elif torch.backends.mps.is_available():
    device = torch.device("mps")
    print("Using MPS")

X = X.to(device)
Y = Y.to(device)
C = torch.randn((27, embedding_dim)).to(device)
C.requires_grad = True
W1 = torch.randn((embedding_dim * block_size, hidden_dim)).to(device)
W1.requires_grad = True
b1 = torch.randn(hidden_dim).to(device)
b1.requires_grad = True
W2 = torch.randn((hidden_dim, 27)).to(device)
W2.requires_grad = True
b2 = torch.randn(27).to(device)
b2.requires_grad = True
params = [C, W1, b1, W2, b2]

In [None]:
import numpy as np

batch_size = 256

iters = 30000
lres = np.arange(1, 3, 2 / iters).astype(np.float64)
lrs = 10 ** (-lres)

for itr in range(iters):
  indices = torch.randint(0, len(X), (batch_size,))
  X_batch = X[indices]
  Y_batch = Y[indices]
  embeddings = C[X_batch]
  embeddings = embeddings.view(-1, embedding_dim * block_size)
  logits = embeddings @ W1 + b1
  logits = F.tanh(logits)
  logits = logits @ W2 + b2
  for p in params:
    p.grad = None
  loss = F.cross_entropy(logits, Y_batch)
  loss.backward()
  lr = lrs[itr]
  for p in params:
    p.data -= lr * p.grad
  if itr % 1000 == 0:
    print(f"iter {itr}, loss {loss.item()}, lr {lr}")

In [None]:
for _ in range(10):
  x = torch.zeros(block_size, dtype=torch.int64).to(device)
  word = ""
  while True:
    embeddings = C[x].view(-1, embedding_dim * block_size)
    logits = embeddings @ W1 + b1
    logits = F.tanh(logits)
    logits = logits @ W2 + b2
    softmax = F.softmax(logits, dim=1)
    y = torch.multinomial(softmax[0], num_samples=1)
    word += itos[y.item()]
    if y.item() == 0:
      break
    x = torch.cat([x[1:], y])
  print(word)


In [None]:
class Linear:
  def __init__(self, fan_in, fan_out, device="cpu", name=None):
    self.device = device
    self.W = torch.randn((fan_in, fan_out)).to(device) / fan_in**0.5
    self.W.requires_grad = True
    self.b = torch.zeros(fan_out).to(device)
    self.b.requires_grad = True
    self.name = name

  def forward(self, x):
    self.out = x @ self.W + self.b
    return self.out

  def parameters(self):
    params = [self.W, self.b]
    return params

  def zero_grad(self):
    self.W.grad = None
    self.b.grad = None

  def train(self):
    self.training = True

  def eval(self):
    self.training = False

class Tanh:
  def __init__(self, name=None):
    self.name = name

  def forward(self, x):
    self.out = torch.tanh(x)
    return self.out

  def parameters(self):
    return []

  def zero_grad(self):
    pass

  def train(self):
    self.training = True

  def eval(self):
    self.training = False

class Embedding:
  def __init__(self, num_embeddings, embedding_dim, device="cpu", name=None):
    self.device = device
    self.embedding = torch.randn((num_embeddings, embedding_dim)).to(device)
    self.embedding.requires_grad = True
    self.name = name

  @property
  def num_embeddings(self):
    return self.embedding.shape[0]

  @property
  def embedding_dim(self):
    return self.embedding.shape[1]

  def forward(self, x):
    block_size = x.shape[1]
    return self.embedding[x].view(-1, self.embedding_dim * block_size)

  def parameters(self):
    return [self.embedding]

  def zero_grad(self):
    self.embedding.grad = None

  def train(self):
    self.training = True

  def eval(self):
    self.training = False

class BatchNorm1d():
  def __init__(self, num_features, eps=1e-5, momentum=0.1, device="cpu", name=None):
    self.device = device
    self.num_features = num_features
    self.name = name
    self.eps = eps
    self.momentum = momentum
    self.W = torch.ones(num_features).to(device)
    self.W.requires_grad = True
    self.b = torch.zeros(num_features).to(device)
    self.b.requires_grad = True
    self.running_mean = torch.zeros(num_features).to(device)
    self.running_var = torch.ones(num_features).to(device)
    self.training = True

  def forward(self, x):
    if self.training:
      mean = x.mean(dim=0)
      var = x.var(dim=0)
    else:
      mean = self.running_mean
      var = self.running_var
    x_hat = (x - mean) / torch.sqrt(var + self.eps)
    self.out = self.W * x_hat + self.b
    if self.training:
      self.running_mean = self.running_mean * (1 - self.momentum) + mean * self.momentum
      self.running_var = self.running_var * (1 - self.momentum) + var * self.momentum
    return self.out

  def parameters(self):
    return [self.W, self.b]

  def zero_grad(self):
    self.W.grad = None
    self.b.grad = None

  def train(self):
    self.training = True

  def eval(self):
    self.training = False


In [None]:
class Sequential:
  def __init__(self, layers, name=None):
    self.layers = layers
    self.name = name

  def forward(self, x):
    for layer in self.layers:
      x = layer.forward(x)
    return x

  def parameters(self):
    params = []
    for layer in self.layers:
      params += layer.parameters()
    return params

  def zero_grad(self):
    for layer in self.layers:
      layer.zero_grad()

  def train(self):
    for layer in self.layers:
      layer.train()

  def eval(self):
    for layer in self.layers:
      layer.eval()


In [None]:
model = Sequential([
  Embedding(27, embedding_dim, name="emb", device=device),
  Linear(embedding_dim * block_size, hidden_dim, name="linear1", device=device),
  BatchNorm1d(num_features=hidden_dim, name="bn1", device=device),
  Tanh(),
  Linear(hidden_dim, hidden_dim, name="linear2", device=device),
  BatchNorm1d(num_features=hidden_dim, name="bn2", device=device),
  Tanh(),
  Linear(hidden_dim, hidden_dim, name="linear3", device=device),
  BatchNorm1d(num_features=hidden_dim, name="bn3", device=device),
  Tanh(),
  Linear(hidden_dim, hidden_dim, name="linear4", device=device),
  BatchNorm1d(num_features=hidden_dim, name="bn4", device=device),
  Tanh(),
  Linear(hidden_dim, hidden_dim, name="linear5", device=device),
  BatchNorm1d(num_features=hidden_dim, name="bn5", device=device),
  Tanh(),
  Linear(hidden_dim, 27, name="linear2", device=device),
])

In [None]:
batch_size = 256

iters = 10000
lres = np.arange(1, 3, 2 / iters).astype(np.float64)
lrs = 10 ** (-lres)

X = X.to(device)
Y = Y.to(device)

model.train()

for itr in range(iters):
  indices = torch.randint(0, len(X), (batch_size,))
  X_batch = X[indices]
  Y_batch = Y[indices]
  logits = model.forward(X_batch)
  model.zero_grad()
  loss = F.cross_entropy(logits, Y_batch)
  loss.backward()
  lr = lrs[itr]
  with torch.no_grad():
    for p in model.parameters():
      p.data -= lr * p.grad
  if itr % 1000 == 0:
    print(f"iter {itr}, loss {loss.item()}, lr {lr}")
    for i, layer in enumerate(model.layers):
      if isinstance(layer, Tanh):
        t = layer.out
        print('layer %d (%10s): mean %+.2f, std %.2f, saturated: %.2f%%' % (i, layer.__class__.__name__, t.mean(), t.std(), (t.abs() > 0.97).float().mean()*100))

In [None]:
# visualize histograms
plt.figure(figsize=(20, 4)) # width and height of the plot
legends = []
for i, layer in enumerate(model.layers[:-1]): # note: exclude the output layer
  if isinstance(layer, Tanh):
    t = layer.out.cpu().detach().flatten()
    print('layer %d (%10s): mean %+.2f, std %.2f, saturated: %.2f%%' % (i, layer.__class__.__name__, t.mean(), t.std(), (t.abs() > 0.97).float().mean()*100))
    hy, hx = torch.histogram(t, density=True)
    plt.plot(hx[:-1].detach().cpu(), hy.detach().cpu())
    legends.append(f'layer {i} ({layer.__class__.__name__})')
plt.legend(legends)
plt.title('activation distribution')
plt.show()

In [None]:
model.eval()
for _ in range(10):
  x = torch.zeros((1, block_size), dtype=torch.int64).to(device)
  word = ""
  while True:
    logits = model.forward(x)
    softmax = F.softmax(logits, dim=1)
    y = torch.multinomial(softmax[0], num_samples=1)
    word += itos[y.item()]
    if y.item() == 0:
      break
    x[0] = torch.cat([x[0, 1:], y])
  print(word)