In [None]:
import numpy as np
import torch


In [None]:
with open('data/names.txt', 'r') as f:
    data = f.read().splitlines()


In [None]:
chars = sorted(set(''.join([d for d in data])))
chars = ['.'] + chars
ctoi = {c:i for i, c in enumerate(chars)}
itoc = {i:c for i,c in enumerate(chars)}

print(ctoi)
print(itoc)
X = []
block_size = 2 # Tri-gram
for word in data:
    context = [0] * block_size
    for c in word + '.':
        ix = ctoi[c]
        X.append(context + [ix])
        context = context[1:] + [ix]

print(len(X))

## Counting the occurence and then predicting 

In [None]:
# First let's 'train' a simple 'network': Count how many times one character follows another and
# use that to sample next word to generate word. NOT using a Neural Network


counts = torch.zeros(27, 27, 27)
# print(counts.size())
# counts[0, 0, 0] represents count of '...' sequence in our training data, counts[0, 1, 0] represents count of '.a.' in our training data and so on
for val in X:
    counts[val[0], val[1], val[2]] += 1


print(counts[0, 0, :])
# Now Normalize
# '...', '..a', '..b', '..c', '..d', ..... and so on should sum to 1.0 (i.e., normalize across count[0, 0, i])
# '.a.', '.aa', '.ab', '.ac', '.ad', ... and so on should sum to 1.0 (i.e., across count[0, 1, i])
# '.b.', '.ba', '.bb', '.bc', '.bd', ... and so on should sum to 1.0 (i.e., across count[0, 2, i])
# ...
# 'z..', 'z.a', 'z.b', 'z.c', 'z.d', ...
# ...
# 'zz.', 'zza', 'zzb', 'zzc', 'zzd', ...

# i.e., for every previous occurrences, we have to sum along dimension = 2 (position of i above)

counts = counts + 1 # Avoid division by zero (smoothing) as this 1 increase to higher number, we get uniform distribution. Hence smoothing.
counts = counts / counts.sum(dim=2, keepdim=True)

In [None]:
# Check if it sums to 1.0
print(sum(counts[0, 0, :]))
print(sum(counts[0, 1, :]))
print(sum(counts[0, 1, :]))

# Rough code to see dimension
# ten = torch.tensor([[[1,2,3],[1,2,3],[1,2,3]], [[10,20,30],[10,20,30],[10,20,30]], [[100,200,300],[100,200,300],[100,200,300]]])
# print(ten)

# print("Sum")
# ten.sum(dim=2, keepdim=True)
# dim0 = (3, 3)
# dim1 = (1, 3)
# dim2 = (3, 1)

In [None]:
# Now sample from that counts (PREDICTION Step)

g = torch.Generator().manual_seed(1024)
for _ in range(50):
    out = []
    ix, prev_ix = 0, 0
    while True:
        # Now sample index from count[0, 0, :] row.
        p = counts[prev_ix, ix, :]
        new_ix = torch.multinomial(p, num_samples=1, replacement=True, generator=g).item()
        out.append(itoc[new_ix])
        if new_ix == 0:
            break
        prev_ix = ix
        ix = new_ix
    print(''.join(out))


In [None]:
# Evaluate the quality of this model:

# For that we look at the probability of the dataset from our 'learned' counts model.
for x in X[:10]:
    print(''.join(itoc[i] for i in x), end='--> ')
    print(f'{counts[x[0], x[1], x[2]].item():.2f}')

# MLE says that a good model maximizes the product of likelihood of this data
# since product of probabilities would be very small, we're taking log probability

print("Quality of the model: summarized by negative log likelihood")
log_likelihood = 0.0
n = 0
for x in X:
    prob = counts[x[0], x[1], x[2]]
    logprob = torch.log(prob)
    log_likelihood += logprob
    n += 1

# Log likelihood = 0 when PS = 1.0 (which we aim to go towards)
# Log likelihood = -ve when PS < 1.0 (which says that we haven't fitted the prediction for the likelihood.)

# We want a loss function (i.e., 0 when prediction is good, high when prediction is bad)
# so we use Negative log likelihood (nll) as loss function.
# we also average it instead of sum.
nll = -1 * log_likelihood
loss = nll/n

print(f'{loss=}')

# Right now, since we counted how many times 3rd character occurs given first two characters, our model right now is 'perfect'. So this is the loss that we can get to even when we train a neural network to do this.

## Training a Neural Network for Trigram Character Level Language Model.

- We took an approach that we felt natural, i.e, counting the times last character appeared given two characters ahead, we computed the probabilities. We then used that distribution to predict the next character in prediction phase.
- Now, as the number of character increase, this becomes computationally infeasible.
- So, we want to use a neural network to do that.
- Here, we input two characters to the neural network and it predicts the probability distribution over the next characters.

In [None]:
# Print input data
print(X[:10])

In [None]:
x = []
y = []
for data in X[:5]:
    x.append([data[0], data[1]])
    y.append(data[2])

x = torch.tensor(x)
y = torch.tensor(y)


In [None]:
# Since we can't pass int64 (long) into a neural network, we use one hot vector
import torch.nn.functional as F

torch.manual_seed(1024)

x_enc = F.one_hot(x, num_classes=len(chars)).float()
print(x_enc.shape)
# Reshape it to stack the inputs
# X_temp = torch.tensor([[[1, 2, 3], [4, 5, 6]], [[10, 20, 30], [40, 50, 60]]]).float()

# print(X_temp.view(len(X_temp), -1))
# print(X_temp.reshape(2, 6))

x_enc = x_enc.view(len(x_enc), -1)
print(f'{x_enc.shape=}')

# Now we make a 27 layer output to predict probability distribution over characters
# weight = 54 (inputs) * 27 (output layer)

W = torch.randn(54, 27)

out = x_enc @ W
print(f'output shape: {out.shape}')

# 1st neuron output for 1st data
neuron_output = torch.dot(x_enc[0, :], W[:, 0])
print(neuron_output)


counts = out.exp()
prob_nn = counts / counts.sum(dim=1, keepdim=True)
print(f'{prob_nn.shape=}')
print(y)
# These give the probability for the current target values (true)
print(prob_nn[0, 5], prob_nn[1, 13], prob_nn[2, 13], prob_nn[3, 1], prob_nn[4, 0])
print(prob_nn[torch.arange(len(prob_nn)), y])

# Now to compute logprob
logprob = prob_nn[torch.arange(len(prob_nn)), y].log()
print(f'{logprob=}')

# We can see that logprob of fair predictions (i.e., for 4th data -> 0.1475) is more compared to others (very less prob -> bad predictions). These are true label, so the prob should be higher.

# To compute loss, we take negative logprob (and average it)
loss = -1 * logprob.mean()
print(f'{loss=}')

# This loss is pretty high compared to what we got from counting model above. our aim is to get close to counting model loss, because that reflects true probability distribution of the dataset.

In [None]:
torch.manual_seed(1024)
# Now do for all data
x = []
y = []
for data in X:
    x.append([data[0], data[1]])
    y.append(data[2])

x = torch.tensor(x)
y = torch.tensor(y)

# divide into train, validation and test sets
n1 = int(0.8 * len(x))
n2 = int(0.9 * len(x))
x_train, x_val, x_test = x[:n1], x[n1:n2], x[n2:]
y_train, y_val, y_test = y[:n1], y[n1:n2], y[n2:]

print(x_train.shape, y_train.shape)
print(x_val.shape, y_val.shape)
print(x_test.shape, y_test.shape)

# Create a random embedding
n_embed = 10
C = torch.randn((len(chars), n_embed), requires_grad=True)
# x_enc = F.one_hot(x, num_classes=len(chars)).float()
# x_enc = x_enc.view(len(x_enc), -1)
W = torch.randn((n_embed * 2, 27), requires_grad=True)

# print(f"Num training data: {len(x_enc)}")
# Now we can predict the probability distribution over the characters

In [None]:
def forward_backward_pass(x=x_train, y=y_train):
    # 1) FORWARD PASS
    x_enc = C[x].view(len(x), -1)
    logits = x_enc @ W
    counts_nn = logits.exp()
    prob_nn = counts_nn / counts_nn.sum(dim=1, keepdim=True)
    # lines 3, and 4 is softmax
    loss = -1 * torch.log(prob_nn[torch.arange(len(prob_nn)), y]).mean()
    # Loss is negative logprobs (average)
    # Line 4, 5, 7 are simply achievable through F.cross_entropy(logits, y)
    # print(F.cross_entropy(logits, y).item()) # This is clled cross_entropy because
    print(loss.item())

    # 2) Backward Pass
    W.grad = None # Clear out gradients
    C.grad = None
    loss.backward()

    # Optimize weights
    W.data += -0.1 * W.grad # Gradients point towards the increase in loss.
    C.data += -0.1 * C.grad

forward_backward_pass()

In [None]:
# We train for couple of epochs
epochs = 1000
for i in range(epochs):
    print(f'Epoch {i}:', end=" ")
    forward_backward_pass()

In [None]:
# Prediction
generator = torch.Generator().manual_seed(1024)
for _ in range(10):
    input_x = torch.tensor([0, 0])
    ix = 0
    output = []
    input_x = torch.cat([input_x[1:],torch.tensor([ix])])
    while True:
        x_enc = C[input_x].view(1, -1)
        logits_pred = x_enc @ W
        counts_pred = logits_pred.exp()
        probs_pred = counts_pred/counts_pred.sum(dim=1, keepdim=True)
        ix = torch.multinomial(probs_pred, num_samples=1, replacement=True, generator=generator).item()
        char = itoc[ix]
        output.append(char)
        if ix == 0:
            break
    print(''.join(output))

Horrible performance.
# Now we will make a slightly deeper model and see how that performs
For that, we have to organize how we define a model

In [None]:
# Here, we make a linear neuron so that scaling up is easier.
# This initializes weights and bias and the output is similar of linear layer.
import matplotlib.pyplot as plt

BATCH_SIZE = 32

class Embedding():
    def __init__(self, vocab_size, n_embed):
        self.C = torch.randn((vocab_size, n_embed)) # Embedding matrix

    def __call__(self, x):
        return self.C[x]

    def parameters(self):
        return [self.C]


class Linear():
    def __init__(self, fan_in, fan_out, bias=True):
        self.W = torch.randn(fan_in, fan_out)  *  5/3 / fan_in**0.5 # Kaiming init
        self.b = torch.randn(1, fan_out) * 0.01 # Multiply with a small number
        self.bias = bias

    def __call__(self, X):

        self.out = X @ self.W
        if self.bias is not None:
            self.out += self.b
        return self.out

    def parameters(self):
        return [self.W] + ([self.b] if self.bias else [])

class BatchNorm():
    def __init__(self, n_hidden, eps=1e-5, momentum=0.1):
        self.momentum = momentum
        self.eps = eps
        self.training = True

        self.bngain = torch.ones(1, n_hidden)
        self.bnbias = torch.zeros(1, n_hidden)
        self.bnmean_running = torch.zeros(1, n_hidden)
        self.bnstd_running = torch.ones(1, n_hidden)

    def __call__(self, logits):
        if self.training:
            mean = logits.mean(dim=0, keepdim=True)
            std = logits.std(dim=0, keepdim=True)
        else:
            mean = self.bnmean_running
            std = self.bnstd_running
        xhat = (logits - mean) / (std + self.eps)
        self.out = self.bngain * xhat + self.bnbias
        if self.training:
            with torch.no_grad():
                self.bnmean_running = (1 - self.momentum) * self.bnmean_running + self.momentum * mean
                self.bnstd_running = (1 - self.momentum) * self.bnstd_running + self.momentum * std
        return self.out

    def parameters(self):
        return [self.bngain, self.bnbias]


class Tanh(object):
    def __call__(self, input):
        self.out = torch.tanh(input)
        return self.out

    def parameters(self):
        return []

def train(X, y, layers, parameters, epochs, C, lr=0.1, verbose=False):
    all_loss = []
    for epoch in range(epochs):
        # lets take a mini batch of input to do sgd so that it trains fast
        ix = torch.randint(0, X.shape[0], (BATCH_SIZE, ))
        X_batch = X[ix]
        logits = C(X_batch).view(BATCH_SIZE, -1)
        # logits = C(X).view(len(X), -1)
        # Forward pass
        for i, layer in enumerate(layers):
            logits = layer(logits)
        # Since taking exp of logits, and then dividing it by sum is softmax, we're just going to take softmax
        # you can directly compute loss with F.cross_entropy(), that does softmax inside of it
        # loss = F.cross_entropy(logits, y)
        loss = F.cross_entropy(logits, y[ix])

        # backward pass
        for layer in layers:
            layer.out.retain_grad() # AFTER_DEBUG: would take out retain_graph

        all_loss.append(loss.log10().item())
        # make sure you flush all the gradients
        for p in parameters:
            p.grad = None
        # Do backward propagation
        loss.backward()
        # optimize
        for p in parameters:
            p.data += -lr * p.grad
        if verbose:
            print(f'Epoch: {epoch}, loss: {loss.item()}')
    return all_loss


In [None]:
a = torch.Tensor([[1,2,3],
                  [3,4,5],
                  [6,7,8]])
print(a.shape)
mean_a_dim_0 = a.mean(dim=0, keepdim=True)
print(mean_a_dim_0)

In [None]:
layer = Linear(n_embed * block_size, 27)
out = layer(x_enc)
print(out.shape)

### Now, we can do the same above steps with a slightly deeper model

In [None]:
n_embed = 10
vocab_size = len(chars)
C = Embedding(vocab_size=vocab_size, n_embed=n_embed)
layers = [
    Linear(n_embed * block_size, 100),
    Linear(100, 100),
    Linear(100, vocab_size)
]
parameters = C.parameters() + [p for layer in layers for p in layer.parameters()]
print("Total parameters = ", sum(p.nelement() for p in parameters))

# lets make sure that the use grad is true for all parameters.
for p in parameters:
    p.requires_grad_(True)

all_loss = train(x_train, y_train, layers, parameters, 100000, C, lr=0.10)
print(f'Average 10 loss: {sum(all_loss[-10:])/10:.2f}')
plt.plot(all_loss)

#### Didn't improve performance because no matter how much linear layer you stack (wx + b), you can make a single linear layer that is equivalent to the stacked layers.

## Let's add non-linear layer

In [None]:
# Now lets add a non-linear layer (lets do tanh)
C = Embedding(vocab_size=vocab_size, n_embed=n_embed)
layers_w_nl = [
    Linear(n_embed * block_size, 100), Tanh(),
    Linear(100, 100), Tanh(),
    Linear(100, 27)
]
parameters_nl = C.parameters() + [p for layer in layers_w_nl for p in layer.parameters()]
print("Total parameters = ", sum(p.nelement() for p in parameters))

# lets make sure that the use grad is true for all parameters.
for p in parameters_nl:
    p.requires_grad_(True)

# Now train like before
all_loss = train(x_train, y_train, layers_w_nl, parameters_nl, 100000, C, lr=0.1)
print(f'Average last 10 loss: {sum(all_loss[-10:])/10:.2f}')
plt.plot(all_loss)

## Not a good performance, the loss is very high in the beginning (remember loss plot is in log scale).
#### Lets scale the loss down at the initialization (scale down W and b), and increase the number of layers

In [None]:
C = Embedding(vocab_size=vocab_size, n_embed=n_embed)
layers_w_nl = [
    Linear(n_embed * block_size, 100, bias=False), Tanh(),
    Linear(100, 100, bias=False), Tanh(),
    Linear(100, 100, bias=False), Tanh(),
    Linear(100, 100, bias=False), Tanh(),
    Linear(100, 100, bias=False), Tanh(),
    Linear(100, 27)
]
parameters_nl = C.parameters() + [p for layer in layers_w_nl for p in layer.parameters()]
print("Total parameters = ", sum(p.nelement() for p in parameters_nl))

# lets make sure that the use grad is true for all parameters.
for p in parameters_nl:
    p.requires_grad = True

# Now train like before
all_loss = train(x_train, y_train, layers_w_nl, parameters_nl, 100000, C, lr=0.10, verbose=False)
print(f'Average last 10 loss: {sum(all_loss[-10:])/10:.2f}')
plt.plot(all_loss)

## Log loss summary:
Just linear layer: Average 10 loss: 2.60 

Added non-linear layer: Average last 10 loss: 1.23 

Scaled initialization weights, for good initialization: Average last 10 loss: 0.52

### Nope. Didn't work.
1) Let's see visually how our network is performing.
2) Let's split train and validation set.
3) train on the training set, and eval loss on the validation set.

In [None]:
# Visualize the histogram of activation
def plot_tanh_activation(layers):
    plt.figure()
    legends = []
    for i, layer in enumerate(layers):
        if isinstance(layer, Tanh):
            t = layer.out
            print("layer: %d -> %5s: mean %+.2f, std %+.2f, saturated: %.2f" % (i, layer.__class__.__name__, t.mean().item(), t.std().item(), (t.abs() > 0.97).float().mean() * 100))
            hy, hx = torch.histogram(t, density=True)
            plt.plot(hx[:-1].detach(), hy.detach())
            legends.append(f'layer {i} ({layer.__class__.__name__})')
    plt.legend(legends)
    plt.title('activation distribution')
# plot_tanh_activation(layers_w_nl)

# looks very saturated (most of the neurons are not active)

In [None]:
plt.figure()
plt.imshow(layers_w_nl[7].out.abs() > 0.97, cmap='gray')
plt.title("Activation of layer 7")
# There are a lot of white, i.e., those neurons are not active.

In [None]:
# Visualize the histogram of gradient
def plot_layer_gradient(layers):
    plt.figure()
    legends = []
    for i, layer in enumerate(layers):
        if isinstance(layer, Tanh):
            t = layer.out.grad
            print("layer: %d -> %5s: mean %+e, std %+e" % (i, layer.__class__.__name__, t.mean().item(), t.std().item()))
            hy, hx = torch.histogram(t, density=True)
            plt.plot(hx[:-1].detach(), hy.detach())
            legends.append(f'layer {i} ({layer.__class__.__name__}')
    plt.legend(legends)
    plt.title('gradient distribution')

# plot_layer_gradient(layers_w_nl)
# Gradient looks flattened as the layer increases (not a good sign)

# Precise weight tuning is hectic, normalization should be done

We want hidden layer activations to be centered around 0 and have a standard deviation of 1, because the gradient is more stable.  Input to Tanh should not be 1 or -1 (i.e., derivative = 0 and the gradient wont flow). 

So we want to standardize the pre-activation outputs of each layer. 
We do Batch Normalization for this.

In [None]:
vocab_size = len(chars)
C = Embedding(vocab_size=vocab_size, n_embed=n_embed)
layers_w_nl_w_bn = [
    Linear(n_embed * block_size, 100, bias=False), BatchNorm(100), Tanh(),
    Linear(100, 100, bias=False), BatchNorm(100), Tanh(),
    Linear(100, 100, bias=False), BatchNorm(100), Tanh(),
    Linear(100, 100, bias=False), BatchNorm(100), Tanh(),
    Linear(100, 100, bias=False), BatchNorm(100), Tanh(),
    Linear(100, vocab_size), BatchNorm(vocab_size)
]
parameters_nl = C.parameters() + [p for layer in layers_w_nl_w_bn for p in layer.parameters()]
print("Total parameters = ", sum(p.nelement() for p in parameters_nl))

# lets make sure that the use grad is true for all parameters.
for p in parameters_nl:
    p.requires_grad = True

# Now train like before
all_loss = train(x_train, y_train, layers_w_nl_w_bn, parameters_nl, 100000, C, lr=0.10, verbose=False)
print(f'Average last 10 loss: {sum(all_loss[-10:])/10:.2f}')
plt.plot(all_loss)

In [None]:
plot_tanh_activation(layers_w_nl_w_bn)

In [None]:
plot_layer_gradient(layers_w_nl_w_bn)

In [None]:
generator = torch.Generator().manual_seed(1024)
@torch.no_grad()
def predict(layers, examples=10):
    for layer in layers:
        layer.training = False
    for _ in range(examples):
        context = [0] * block_size
        output = []
        while True:
            emb = C(torch.tensor([context]))
            x_input = emb.view(1, -1)
            for layer in layers:
                x_input = layer(x_input)
            probs = torch.softmax(x_input, dim=1)
            ix = torch.multinomial(probs, num_samples=1, replacement=True, generator=generator).item()
            context = context[1:] + [ix]
            output.append(ix)
            if ix == 0:
                break
        print(''.join(itoc[i] for i in output))


predict(layers_w_nl_w_bn, 20)