In [None]:
import random
import torch
import torch.nn.functional as F
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
# reading the data into variable called name
import csv

names = []
with open('arabic_names.csv', 'r') as file:
    csv_reader = csv.reader(file)
    for row in csv_reader:
        names.append(row[0])

names.pop(0)
print(f"The length of names is {len(names)}")
print(f"The first 5 names are {names[:5]}")

In [80]:
# Randomizing the data set
random.seed(56)
random.shuffle(names)

n1 = int(0.8 * len(names))
n2 = int(0.9 * len(names))
print(f"the data set have {len(names)}, {n1=}, {n2=}")

the data set have 1405, n1=1124, n2=1264


In [81]:
# converting characters to number
chars = sorted(list(set(''.join(names))))
chars.insert(0, '.')

# create character to integer
ctoi = {value: index for index, value in enumerate(chars)}

# create integer to character
itoc = {value: index for index, value in ctoi.items()}

In [82]:
# preparing the dataset and split them into training, dev, and test samples.
# we need the first one to be ., ., . --> ا

# define a function build_dataset to accept list of names and output them in X, Y format
block_size = 8
def build_dataset(names):
    X, Y = [], []
    for name in names:
        context = [0] * block_size

        for cha in name + '.':
            X.append(context)
            ix = ctoi[cha]
            Y.append(ix)
            context = context[1:] + [ix]

    X = torch.tensor(X)
    Y = torch.tensor(Y)
    return X, Y

In [83]:
Xtr, Ytr = build_dataset(names[:n1])
Xdev, Ydev = build_dataset(names[n1:n2])
Xtest , Ytest = build_dataset(names[n2:])

for i, j in zip(Xtr[:3], Ytr[:3]):
    print( ' '.join([itoc[value.item()] for value in i]), ' ------> ', itoc[j.item()])

. . . . . . . .  ------>  غ
. . . . . . . غ  ------>  ن
. . . . . . غ ن  ------>  ي


In [90]:
# Let's train a deeper network

class Linear:
    def __init__(self, fan_in, fan_out, bias=True):
        self.weight = torch.randn((fan_in, fan_out))/ fan_in **0.5
        self.bias = torch.randn(fan_out) if bias else None
    def __call__(self, x):
        self.out = x @ self.weight
        if self.bias is not None:
            self.out += self.bias
        return self.out
    def parameters(self):
        return [self.weight,  ([] if self.bias is None else self.bias)]


class BatchNorm1D:
    def __init__(self, dim, eps=1e-5, momentum=0.1):
        self.eps = eps
        self.momentum = momentum
        self.training = True
        # parameters (training with backbrop)
        self.gama = torch.ones(dim)
        self.beta = torch.zeros(dim)
        # buffer (traning with running momentum)
        self.running_mean = torch.zeros(dim)
        self.running_var = torch.ones(dim)

    def __call__(self, x):
        if self.training:
            xmean = x.mean(0, keepdim=True)
            xvar = x.std(0, keepdim=True)
        else:
            xmean = self.running_mean
            xvar = self.running_var

        xhat = (x - xmean) / torch.sqrt(xvar + self.eps)
        self.out = self.gama * xhat + self.beta
        # update the buffer
        if self.training:
            with torch.no_grad():
                self.running_mean = (1 - self.momentum) * self.running_mean + self.momentum * xmean
                self.running_var = (1 - self.momentum) * self.running_var + self.momentum * xvar

        return self.out

    def parameters(self):
        return [self.beta, self.gama]

class Tanh:
    def __call__(self, x):
        self.out = torch.tanh(x)
        return self.out
    def parameters(self):
        return []

class Embedding:
    def __init__(self, num_embeddings, embedding_dim):
        self.weight = torch.randn((num_embeddings, embedding_dim))

    def __call__(self, IX):
        self.out = self.weight[IX]
        return self.out

    def parameters(self):
        return [self.weight]

# the input tensor is (32, 80) = (batch_num, block_size * emb_dim) === goal = (32, 4, 20)
class FlattenConsecutive:
    def __init__(self, n):
        # n = number of consecutive elements we want
        self.n = n
    def __call__(self, x):

        B, T, C = x.shape
        x = x.view(B, T//self.n, C * self.n)
        if x.shape[1] == 1:
            x = x.squeeze(1)

        self.out = x
        return self.out

    def parameters(self):
        return []

class Sequential:
    def __init__(self, layers):
        self.layers = layers

    def __call__(self, x):
        for layer in self.layers:
            x = layer(x)
        self.out = x
        return self.out

    def parameters(self):
        return [p for layer in self.layers for p in layer.parameters()]


In [103]:
n_emb = 10
n_hidden = 200
voc_len = len(chars)    # == 38
# define a generator
g = torch.Generator().manual_seed(123456789)

model = Sequential([
    Embedding(voc_len, n_emb),
    FlattenConsecutive(2), Linear(n_emb * 2,  n_hidden),  BatchNorm1D(n_hidden), Tanh(),
    FlattenConsecutive(2), Linear(n_hidden * 2,  n_hidden),  BatchNorm1D(n_hidden), Tanh(),
    FlattenConsecutive(2), Linear(n_hidden * 2,  n_hidden),  BatchNorm1D(n_hidden), Tanh(),
    Linear(n_hidden,            voc_len),
])

#with torch.no_grad():
    # last layer: less confident
 #   layers[-1].weight *= 0.1

parameters = model.parameters()
print(sum(p.nelement() for p in parameters))
for p in parameters:
    p.requires_grad = True

173818


In [104]:
ix = torch.randint(0, Xtr.shape[0], (4,))
Xb, Yb = Xtr[ix], Ytr[ix]
logits = model(Xb)
print(Xb.shape)
Xb

torch.Size([4, 8])


tensor([[ 0,  0,  0,  0, 32, 33, 11, 22],
        [ 0,  0,  0,  0,  0,  0, 35, 31],
        [ 0,  0,  0,  0,  0,  0,  0,  0],
        [ 0,  0,  0,  0,  0,  0,  0,  0]])

In [105]:
for layer in model.layers:
    print(layer.__class__.__name__, ':', tuple(layer.out.shape))

Embedding : (4, 8, 10)
FlattenConsecutive : (4, 4, 20)
Linear : (4, 4, 200)
BatchNorm1D : (4, 4, 200)
Tanh : (4, 4, 200)
FlattenConsecutive : (4, 2, 400)
Linear : (4, 2, 200)
BatchNorm1D : (4, 2, 200)
Tanh : (4, 2, 200)
FlattenConsecutive : (4, 400)
Linear : (4, 200)
BatchNorm1D : (4, 200)
Tanh : (4, 200)
Linear : (4, 38)


In [92]:
max_steps = 200000
batch_size = 32
lossi = []

for i in range(max_steps):
    ix = torch.randint(0, Xtr.shape[0], (batch_size,), generator=g)
    Xb, Yb = Xtr[ix], Ytr[ix]

    # forward pass
    logits = model(Xb)
    loss = F.cross_entropy(logits, Yb)

    # backward pass
    for p in parameters:
        p.grad = None
    loss.backward()

    # update
    lr = 0.1 if i < 150000 else 0.01
    for p in parameters:
        p.data += - lr * p.grad

    # track status
    if i % 10000 == 0:
        print(f"{i:7d}/{max_steps:7d}: {loss.item():.4f}")

    lossi.append(loss.log10().item())
    break

      0/ 200000: 4.3225


In [77]:
len(lossi)

0

In [None]:
plt.plot(lossi)

In [None]:
plt.plot(torch.tensor(lossi).view(-1, 1000).mean(1))

In [None]:
# put layer into eval mode
for layer in model.layers:
    layer.training = False

In [None]:
@torch.no_grad()
def split_loss(split):
    x, y = {
        'train': (Xtr, Ytr),
        'val': (Xdev, Ydev),
        'test': (Xtest, Ytest)
    }[split]
    logits = model(x)
    loss = F.cross_entropy(logits, y)
    print(split, loss.item())

split_loss('train')
split_loss('val')

In [None]:
# sample from the model

for _ in range(20):
    out = []
    context = [0] * block_size # initialize with ...
    while True:
        # forward pass the neural net
        logits = model(torch.tensor([context]))
        probs = F.softmax(logits, dim=1)
        # Sample from the distribution
        ix = torch.multinomial(probs, num_samples=1).item()
        # shift the context window and track the samples
        context = context[1:] + [ix]
        out.append(ix)
        # if we sample the special '.' token, break
        if ix == 0:
            break

    print(''.join(itoc[i] for i in out))
