In [38]:
import torch
import torch.nn as nn
from torch.nn import init
device = 'cuda' if torch.cuda.is_available() else 'cpu'
torch.manual_seed(0)

<torch._C.Generator at 0x7b6bfc0f0af0>

In [39]:
with open("/content/AeCa",'r',encoding='UTF-8') as f:
    text=f.read()
f.close()

In [40]:
chars=sorted(list(set(text)))
vocab_size=len(chars)
print(chars,vocab_size)

['A', 'C', 'G', 'T'] 4


In [41]:
stoi={s:i for i,s in enumerate(chars)}
itos={i:s for i,s in enumerate(stoi)}
stoi,itos

({'A': 0, 'C': 1, 'G': 2, 'T': 3}, {0: 'A', 1: 'C', 2: 'G', 3: 'T'})

In [42]:
encode=lambda word:[stoi[s] for s in word]
decode=lambda num: ''.join([itos[i] for i in num])
data=torch.tensor(encode(text),dtype=torch.long)

In [43]:
data

tensor([2, 1, 1,  ..., 2, 2, 1])

In [44]:
l=int(0.8*data.shape[0])
h=int(0.9*data.shape[0])
train_data=data[:l]
val_data=data[l:h]
test_data=data[h:]
print(train_data.shape,val_data.shape,test_data.shape)

torch.Size([1272839]) torch.Size([159105]) torch.Size([159105])


In [45]:
block_size=8
train_data[:block_size+1]

tensor([2, 1, 1, 2, 1, 1, 1, 1, 1])

In [46]:
x=train_data[:block_size]
y=train_data[1:block_size+1]
for t in range (block_size):
    context=x[:t+1]
    target=y[t]
    print(f"In the context of {context} the target is {target}")

In the context of tensor([2]) the target is 1
In the context of tensor([2, 1]) the target is 1
In the context of tensor([2, 1, 1]) the target is 2
In the context of tensor([2, 1, 1, 2]) the target is 1
In the context of tensor([2, 1, 1, 2, 1]) the target is 1
In the context of tensor([2, 1, 1, 2, 1, 1]) the target is 1
In the context of tensor([2, 1, 1, 2, 1, 1, 1]) the target is 1
In the context of tensor([2, 1, 1, 2, 1, 1, 1, 1]) the target is 1


In [47]:
class Linear(nn.Module):
    def __init__(self,fan_in,fan_out,bias=False):
        super().__init__()
        self.W=nn.Parameter(torch.randn((fan_out,fan_in)))
        self.b=nn.Parameter(torch.randn(fan_out)) if bias==True else None
        init.kaiming_uniform_(self.W, nonlinearity='relu')

        # 3. Initialize Bias (if used) to zero, as is standard practice
        if self.b is not None:
            init.zeros_(self.b)

    def forward(self, x):
        out= x @ self.W.T
        if self.b is not None:
            out=out+self.b
        return  out

In [48]:
class ReLu(nn.Module):
    def __init__(self,inplace=False):
        super().__init__()
        self.inplace=inplace

    def forward(self,x):
        if(self.inplace):
            return torch.clamp_(x,min=0.0)      # modifies x
        else:
            return torch.clamp(x,min=0.0)       # x remains unchanged

In [49]:
class Sigmoid(nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self, x):
        out=torch.empty_like(x)
        positive=x>=0
        negative=~positive
        out[positive]=1/(1+torch.exp(-x[positive]))
        exp_x=torch.exp(x[negative])
        out[negative]=exp_x/(1+exp_x)
        return out

In [50]:
class TanH(nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self,x):
        out=torch.empty_like(x)
        positive=x>=0
        negative=~positive
        out[positive]=1-(2/(torch.exp(2*x[positive])+1))
        out[negative]=(2/(torch.exp(-2*x[negative])+1))-1
        return out

In [51]:
class Dropout(nn.Module):
    def __init__(self, p=0.5, inplace=False):
        super().__init__()
        if p < 0 or p > 1:
            raise ValueError("Dropout probability must be between 0 and 1.")
        self.p = p  # probability of dropping a unit
        self.inplace = inplace

    def forward(self, x):
        if not self.training or self.p == 0.0:
            return x

        # Create a mask: 1 for keep, 0 for drop
        mask = (torch.rand_like(x) > self.p).float()

        if self.inplace:
            x.mul_(mask).div_(1 - self.p)  # scale to keep expected value
            out = x
        else:
            out = (x * mask) / (1 - self.p)

        return out

In [52]:
class LSTM(nn.Module):
    def __init__(self, input_size, hidden_size, bias=True, batch_first=False,only_output=False):
        super().__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.batch_first = batch_first
        self.only_output= only_output

        # Input → Gates
        self.x2i = Linear(input_size, hidden_size, bias)  # input gate
        self.x2f = Linear(input_size, hidden_size, bias)  # forget gate
        self.x2o = Linear(input_size, hidden_size, bias)  # output gate
        self.x2g = Linear(input_size, hidden_size, bias)  # candidate cell state

        # Hidden → Gates
        self.h2i = Linear(hidden_size, hidden_size, bias)
        self.h2f = Linear(hidden_size, hidden_size, bias)
        self.h2o = Linear(hidden_size, hidden_size, bias)
        self.h2g = Linear(hidden_size, hidden_size, bias)

        # Activations
        self.sigmoid = Sigmoid()
        self.tanh = TanH()

    def forward(self, x, state=None):
        """
        x: (seq_len, batch_size, input_size) or (batch_size, seq_len, input_size)
        state: tuple (h0, c0)
               h0: (batch_size, hidden_size)
               c0: (batch_size, hidden_size)
        """
        if self.batch_first:
            x = x.permute(1, 0, 2)  # to (seq_len, batch_size, input_size)

        seq_len, batch_size, _ = x.shape

        # h_t : Hidden State (Short Term Memory)
        # c_t : Cell State (Long Term Memory)

        if state is None:
            h_t = torch.zeros(batch_size, self.hidden_size, device=x.device)
            c_t = torch.zeros(batch_size, self.hidden_size, device=x.device)
        else:
            h_t, c_t = state

        outputs = torch.zeros(seq_len, batch_size, self.hidden_size, device=x.device)

        for t in range(seq_len):
            x_t = x[t]

            # Gates
            i_t = self.sigmoid(self.x2i(x_t) + self.h2i(h_t))   # input gate
            f_t = self.sigmoid(self.x2f(x_t) + self.h2f(h_t))   # forget gate
            o_t = self.sigmoid(self.x2o(x_t) + self.h2o(h_t))   # output gate
            g_t = self.tanh(self.x2g(x_t) + self.h2g(h_t))      # candidate cell state

            # Cell state update
            c_t = f_t * c_t + i_t * g_t

            # Hidden state update
            h_t = o_t * self.tanh(c_t)

            outputs[t] = h_t

        if self.batch_first:
            outputs = outputs.permute(1, 0, 2)
        if self.only_output:
            return outputs
        return outputs, (h_t.unsqueeze(0), c_t.unsqueeze(0))


In [53]:
class LayerNorm(nn.Module):
    def __init__(self, normalized_shape, eps=1e-5):
        super().__init__()
        if isinstance(normalized_shape, int):
            normalized_shape = (normalized_shape,)
        self.normalized_shape = tuple(normalized_shape)
        self.eps = eps

        # Learnable params
        self.gamma = nn.Parameter(torch.ones(self.normalized_shape))
        self.beta = nn.Parameter(torch.zeros(self.normalized_shape))

    def forward(self, x):
        # Normalize across last len(normalized_shape) dims
        dims = tuple(range(-len(self.normalized_shape), 0))
        mean = x.mean(dim=dims, keepdim=True)
        var = x.var(dim=dims, unbiased=False, keepdim=True)

        x_hat = (x - mean) / torch.sqrt(var + self.eps)
        out = self.gamma * x_hat + self.beta
        return out

In [54]:
class Softmax(nn.Module):
    def __init__(self, dim=-1):
        super().__init__()
        self.dim = dim  # dimension over which to apply softmax

    def forward(self, x):
        # Numerical stability: subtract max value along dim before exponentiating
        x_max, _ = torch.max(x, dim=self.dim, keepdim=True)
        x_exp = torch.exp(x - x_max)
        out = x_exp / torch.sum(x_exp, dim=self.dim, keepdim=True)
        return out

In [None]:
class Head(nn.Module):
    """ a single self attention head """

    def __init__(self, n_embd, head_size, block_size,bias=False,only_output=False):
        super().__init__()
        self.key=Linear(n_embd,head_size,bias)
        self.query=Linear(n_embd,head_size,bias)
        self.value=Linear(n_embd,head_size,bias)
        self.register_buffer('tril',torch.tril(torch.ones(block_size,block_size)))
        self.dropout=Dropout().to(device=device)
        self.softmax=Softmax(dim=-1)
        self.only_output=only_output

    def forward(self,x):

        B,T,C = x.shape
        k = self.key(x)       # (B,T,hs)
        q = self.query(x)     # (B,T,hs)

        # compute attention scores ("affinities")
        wei = q @ k.transpose(-2,-1)*C**(-0.5)                               #(B,T,C) @ (B,C,T) --------> (B,T,T)
        wei = wei.masked_fill(self.tril[:T,:T] == 0, float('-inf'))         #(B,T,T)
        wei = self.softmax(wei)                                        #(B,T,T)
        wei = self.dropout(wei)

        # perform the weighted aggregation of the values
        v = self.value(x)
        out = wei @ v

        if self.only_output:
            return out

        return out,wei

class MultiHeadAttention(nn.Module):
    """ multiple heads of self-attention in parallel """

    def __init__(self,num_head,n_embd, head_size, block_size,bias=False):
        super().__init__()
        self.heads=nn.ModuleList([Head(n_embd, head_size, block_size,bias=False) for _ in range(num_head)])
        self.proj=Linear(n_embd,n_embd)
        self.dropout = Dropout()

    def forward(self,x):
        j=[h(x) for h in self.heads]
        # print(len(j),len(j[0]))
        out=torch.cat([h(x)[0] for h in self.heads],dim=-1)
        out=self.dropout(self.proj(out))
        #print(out.shape)
        return out

In [60]:
block_size=32
batch_size = 64
n_embd = 64
hidden_size=4*n_embd
n_head = 8
eval_interval=20
max_iters=100
learning_rate = 0.001
eval_iters = 50

In [None]:
from posixpath import pardir
import torch.nn.functional as F

class M2(nn.Module):
    def __init__(self,block_size,vocab_size,n_embd,hidden_size,n_head):
        super().__init__()
        self.token_embedding_table=nn.Embedding(vocab_size,n_embd)
        head_size=n_embd//2
        self.net=nn.Sequential(
            Linear(n_embd,4*n_embd),    #per token level,all tokens do this independently
            ReLu(),
            Dropout(0.3),
            LSTM(4*n_embd,n_embd,batch_first=True,only_output=True), #only_output=True
            LayerNorm(n_embd),
            Head( n_embd, head_size, block_size,only_output=True),
            LayerNorm(block_size),
            Linear(block_size,vocab_size)

        )

    def forward(self,x,targets=None):
        p=self.token_embedding_table(x)
        logits= self.net(p)

        #print(logits.shape)


        if targets is None:
            loss = None
        else:
            B,T,C=logits.shape
            logits=logits.view(B*T,C)
            targets=targets.view(B*T)
            loss=F.cross_entropy(logits,targets)#as it takes input as (B,C,T)

        return logits,loss

In [63]:
def get_batch(split):
    #generate a small batch of data of inputs x and target y
    data =train_data if split=='train' else val_data
    ix= torch.randint(len(data)-block_size,(batch_size,))  # if len(data) is 10 we cannot find a chunck of size 9 after index 1
    #print(ix)
    x=torch.stack([data[i:i+block_size] for i in ix])
    y=torch.stack([data[i+1:i+block_size+1] for i in ix])
    x,y=x.to(device),y.to(device)
    return x,y

x,y=get_batch('train')
x = x.to(device)
y = y.to(device)
print(x.shape,y.shape)
model=M2(block_size,vocab_size,n_embd,hidden_size,n_head)
model=model.to(device)
z=model(x)
print(z[0].shape)

torch.Size([64, 32]) torch.Size([64, 32])
torch.Size([64, 32, 4])


In [61]:
def estimate_loss():
    out={}
    model.eval()
    for split in ['train','val']:
        losses=torch.zeros(eval_iters)
        for k in range(eval_iters):
            X,Y=get_batch(split)
            logits,loss=model(X,Y)
            losses[k]=loss.item()
        out[split]=losses.mean()
    model.train()
    return out


optimizer=torch.optim.Adam(model.parameters(),learning_rate)
for iter in range(max_iters):

    # every once in a while evaluate the loss on train and val sets
    if iter % eval_interval == 0 or iter == max_iters - 1:
        losses = estimate_loss()
        print(f"step {iter}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")

    # sample a batch of data
    xb, yb = get_batch('train')
    #print(xb.shape,yb.shape)

    # evaluate the loss
    logits, loss = model(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()

step 0: train loss 1.4230, val loss 1.4250
step 20: train loss 1.3674, val loss 1.3671
step 40: train loss 1.3624, val loss 1.3621
step 60: train loss 1.3638, val loss 1.3609
step 80: train loss 1.3600, val loss 1.3601
step 99: train loss 1.3599, val loss 1.3572
