In [1]:
import math
import torch
import torch.nn as nn
import torch.nn.functional as F

In [2]:
# this ensures that the current MacOS version is at least 12.3+
print(torch.backends.mps.is_available())
# this ensures that the current current PyTorch installation was built with MPS activated.
print(torch.backends.mps.is_built())

True
True


In [3]:
device = torch.device("mps")

In [95]:
class MultiHeadAttention(nn.Module):
    def __init__(self, dimension, nheads, dropout=0.1):
        super().__init__()
        assert dimension % nheads == 0
        self.nheads = nheads
        self.dropout = nn.Dropout(dropout)

        self.query = nn.Linear(dimension, dimension)
        self.key = nn.Linear(dimension, dimension)
        self.value = nn.Linear(dimension, dimension)
        self.out = nn.Linear(dimension, dimension)

    def forward(self, query, key, value, mask):
        batch_size = query.shape[0]
        seq_len = query.shape[1]
        dimension = query.shape[2]

        query = self.query(query)
        key = self.key(key)
        value = self.value(value)

        # head split
        query = query.view(
            batch_size, seq_len, self.nheads, dimension // self.nheads
        ).transpose(2, 1)
        key = key.view(
            batch_size, seq_len, self.nheads, dimension // self.nheads
        ).transpose(2, 1)
        value = value.view(
            batch_size, seq_len, self.nheads, dimension // self.nheads
        ).transpose(2, 1)

        # attention
        attn = self.attention(query, key, mask)
        output = torch.matmul(attn, value)
        output = output.transpose(2, 1).reshape(batch_size, seq_len, dimension)
        return self.out(output)

    def attention(self, query, key, mask):
        qk_t = torch.matmul(query, key.transpose(3, 2))
        qk_t = qk_t / math.sqrt(query.shape[-1])

        if mask is not None:
            mask = mask.unsqueeze(1)
            qk_t = qk_t.fill_mask(mask == 0, 1e-9)

        qk_t = self.dropout(qk_t)
        return F.softmax(qk_t, dim=-1)

In [96]:
class LayerNorm(nn.Module):
    def __init__(self, dimension):
        super().__init__()
        self.mean_reverse = nn.Parameter(torch.zeros(dimension))
        self.std_reverse = nn.Parameter(torch.ones(dimension))
        self.delta = 1e-6

    def forward(self, x: torch.Tensor):
        mean = torch.mean(x, dim=-1, keepdim=True)
        std = torch.std(x, dim=-1, keepdim=True) + self.delta
        return (self.std_reverse / std) * (x - mean + self.mean_reverse)

In [97]:
class ResidualConnection(nn.Module):
    def __init__(self, dimension, dropout=0.0):
        super().__init__()
        self.norm = LayerNorm(dimension=dimension)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, layer):
        return x + self.dropout(layer(self.norm(x)))

In [98]:
class FeedForwardNet(nn.Module):
    def __init__(self, dimension, hidden_dim=1024, dropout=0.1, activation="relu"):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(dimension, hidden_dim),
            nn.ReLU() if activation == "relu" else nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim, dimension),
        )

    def forward(self, x):
        return self.net(x)

In [108]:
class EncoderLayer(nn.Module):
    def __init__(
        self, dimension, nheads, hidden_dimension=1024, activation="relu", dropout=0.1
    ):
        super().__init__()
        self.mha = MultiHeadAttention(
            dimension=dimension, nheads=nheads, dropout=dropout
        )
        self.ffnn = FeedForwardNet(
            dimension=dimension,
            hidden_dim=hidden_dimension,
            dropout=dropout,
            activation=activation,
        )
        self.resconn1 = ResidualConnection(dimension=dimension, dropout=dropout)
        self.resconn2 = ResidualConnection(dimension=dimension, dropout=dropout)

    def forward(self, x, mask):
        x = self.resconn1(x, lambda x: self.mha(query=x, key=x, value=x, mask=mask))
        return self.resconn2(x, self.ffnn)

In [109]:
from copy import deepcopy as copy


class Encoder(nn.Module):
    def __init__(
        self,
        vocab_size,
        max_seq_len,
        nlayers,
        dimension,
        nheads,
        hidden_dimension,
        activation,
        dropout,
    ):
        super().__init__()
        self.emb = nn.Embedding(vocab_size, dimension)
        self.pemb = nn.Embedding(max_seq_len, dimension)

        self.encoder_layers = nn.ModuleList(
            [
                copy(
                    EncoderLayer(
                        dimension=dimension,
                        nheads=nheads,
                        hidden_dimension=hidden_dimension,
                        activation=activation,
                        dropout=dropout,
                    )
                )
                for _ in range(nlayers)
            ]
        )
        self.norm = LayerNorm(dimension=dimension)

    def forward(self, x, mask):
        x = self.emb(x)
        x = x + self.pemb(torch.arange(x.shape[1]))

        for layer in self.encoder_layers:
            x = layer(x, mask)
        return self.norm(x)

In [111]:
# sample run
vocab_size = 1000
encoder = Encoder(
    vocab_size=1000,
    max_seq_len=100,
    nlayers=2,
    dimension=256,
    nheads=8,
    hidden_dimension=512,
    activation="gelu",
    dropout=0.1,
)
encoder(torch.randint(low=0, high=vocab_size, size=(3, 4)), None).shape

torch.Size([3, 4, 256])

In [120]:
class DecoderLayer(nn.Module):
    def __init__(
        self, dimension, nheads, hidden_dimension=1024, activation="relu", dropout=0.1
    ):
        super().__init__()
        self.ffnn = FeedForwardNet(
            dimension=dimension,
            hidden_dim=hidden_dimension,
            dropout=dropout,
            activation=activation,
        )
        self.resconn = nn.ModuleList(
            [
                copy(ResidualConnection(dimension=dimension, dropout=dropout))
                for _ in range(3)
            ]
        )
        self.mha = nn.ModuleList(
            [
                copy(
                    MultiHeadAttention(
                        dimension=dimension, nheads=nheads, dropout=dropout
                    )
                )
                for _ in range(2)
            ]
        )

    def forward(self, x, encoder_output, encmask, decmask):
        # self attention
        x = self.resconn[0](
            x, lambda x: self.mha[0](query=x, key=x, value=x, mask=decmask)
        )

        # cross attention
        x = self.resconn[1](
            x,
            lambda x: self.mha[1](
                query=x, key=encoder_output, value=encoder_output, mask=encmask
            ),
        )
        return self.resconn[2](x, self.ffnn)

In [121]:
class Decoder(nn.Module):
    def __init__(
        self,
        vocab_size,
        max_seq_len,
        nlayers,
        dimension,
        nheads,
        hidden_dimension,
        activation,
        dropout,
    ):
        super().__init__()
        self.emb = nn.Embedding(vocab_size, dimension)
        self.pemb = nn.Embedding(max_seq_len, dimension)

        self.decoder_layers = nn.ModuleList(
            [
                copy(
                    DecoderLayer(
                        dimension=dimension,
                        nheads=nheads,
                        hidden_dimension=hidden_dimension,
                        activation=activation,
                        dropout=dropout,
                    )
                )
                for _ in range(nlayers)
            ]
        )
        self.norm = LayerNorm(dimension=dimension)

    def forward(self, x, encoder_output, encmask, decmask):
        x = self.emb(x)
        x = x + self.pemb(torch.arange(x.shape[1]))

        for layer in self.decoder_layers:
            x = layer(x, encoder_output, encmask, decmask)
        return self.norm(x)

In [123]:
# sample
dimension = 256
vocab_size = 1000
batch = 3
seqlen = 4
x = torch.randint(low=0, high=vocab_size, size=(batch, seqlen))

decoder = Decoder(
    vocab_size=1000,
    max_seq_len=100,
    nlayers=2,
    dimension=dimension,
    nheads=8,
    hidden_dimension=512,
    activation="gelu",
    dropout=0.1,
)
decoder(
    x=x,
    encoder_output=torch.rand((batch, seqlen, dimension)),
    encmask=None,
    decmask=None,
).shape

torch.Size([3, 4, 256])

In [None]:
class Transformer(nn.Module):
    def __init__(
        self,
        encoder_vocab_size,
        decoder_vocab_size,
        encoder_max_seq_len,
        decoder_max_seq_len,
        encoder_nlayers,
        decoder_nlayers,
        dimension,
        nheads,
        hidden_dimension,
        activation,
        dropout,
    ):
        super().__init__()
        self.encoder = Encoder(
            vocab_size=encoder_vocab_size,
            max_seq_len=encoder_max_seq_len,
            nlayers=encoder_nlayers,
            dimension=dimension,
            nheads=nheads,
            hidden_dimension=hidden_dimension,
            activation=activation,
            dropout=dropout,
        )

        self.decoder = Decoder(
            vocab_size=decoder_vocab_size,
            max_seq_len=decoder_max_seq_len,
            nlayers=decoder_nlayers,
            dimension=dimension,
            nheads=nheads,
            hidden_dimension=hidden_dimension,
            activation=activation,
            dropout=dropout,
        )
        self.ffnn = nn.Linear(dimension, decoder_vocab_size)

    def forward(self, encoder_input, decoder_input, encmask, decmask):
        encoder_output = self.encoder(encoder_input, encmask)
        decoder_output = self.decoder(decoder_input, decmask)
        output = self.ffnn(decoder_output)

        return output

In [2]:
Encoder(2, 8, 256, 0.1)

8 heads 256 dimension in MultiHeadedAttention
8 heads 256 dimension in MultiHeadedAttention


Encoder(
  (enclays): ModuleList(
    (0-1): 2 x EncoderLayer(
      (attn): MultiHeadedAttention(
        (wq): Linear(in_features=256, out_features=256, bias=True)
        (wk): Linear(in_features=256, out_features=256, bias=True)
        (wv): Linear(in_features=256, out_features=256, bias=True)
        (out): Linear(in_features=256, out_features=256, bias=True)
        (dropout): Dropout(p=0.1, inplace=False)
      )
      (ffnn): FeedForwardNet(
        (l): Linear(in_features=256, out_features=2048, bias=True)
        (out): Linear(in_features=2048, out_features=256, bias=True)
        (dropout): Dropout(p=0.1, inplace=False)
      )
      (resconn1): ResidualConnection(
        (norm): LayerNorm()
        (dropout): Dropout(p=0.1, inplace=False)
      )
      (resconn2): ResidualConnection(
        (norm): LayerNorm()
        (dropout): Dropout(p=0.1, inplace=False)
      )
      (norm): LayerNorm()
      (dropout): Dropout(p=0.1, inplace=False)
    )
  )
  (norm): LayerNorm()
)

In [None]:
def anagram(s):
    if len(s) % 2 != 0:
        return -1
        
    half = len(s) // 2
    
    first = s[:half]
    second = s[half:]
    
    counter = dict()
    
    for char in first:
        if char not in counter:
            counter[char] = 1
            
        else:
            counter[char] += 1
            
    correction = 0
    
    for char in second:
        if char in counter:
            counter[char] -= 1
            
            if counter[char] == 0:
                del counter[char]
                 
        else:
            correction+=1

    return correction


In [130]:
# https://www.hackerrank.com/contests/one-last-timeagain/challenges/problem-1-1-7
def twist_string(s, twist):
    f = lambda char, nroll: chr(ord("a") + ((ord(char) - ord("a")) + nroll) % 26)
    s = list(s)
    
    for idx in twist:
        s[:idx] = [f(char, 1) for char in s[:idx]]
    
    return "".join(s)


def twist_string(s, twist):
    f = lambda char, nroll: chr(ord("a") + ((ord(char) - ord("a")) + nroll) % 26)

    inplace_twist = [0] * len(s)

    for idx in twist:
        inplace_twist[idx-1] += 1

    cumulative_twist = 0
    output = []
    
    for i in range(len(s) - 1, -1, -1):
        cumulative_twist += inplace_twist[i]
        output.append(f(s[i], cumulative_twist))

    return "".join(output)[::-1]


def findRollOut(s, roll):
    def solve(st , A):
        ch = ord(st) - ord('a')
        return chr(ord('a') + (ch + A) % 26)

    n = len(s)
    rolls = [0] * n
    
    for i in roll:
        rolls[i - 1] += 1

    ans = 0
    res = []
    
    for i in range(n - 1 , -1 , -1):
       ans += rolls[i] 
       res.append(solve(s[i] , ans))
     
    return ''.join(res[::-1])

s = "zcza"
twist = [1,1,3,4]
findRollOut(s, twist), twist_string(s, twist)

('debb', 'debb')

In [110]:
# https://takeitoutamber.medium.com/hackerrank-coding-interview-1-queue-at-atm-b2e0e1859d3d
"""
from math import ceil


f = lambda x, k: (x - 1 + k) // k
g = lambda x, k: ceil(x / k)
"""

def getFinalOrder_(k, amount):
    assert 1 <= len(amount) <= 10 ** 5
    assert all(1 <= x <= 10 ** 9 for x in amount)
    assert 1 <= k <= 10 ** 6
 
    arr = []
    for i in range(len(amount)):
        arr.append([(amount[i] + k - 1) // k, i + 1])

    arr.sort()
    arr2 = [x[1] for x in arr]
    return arr2


def getFinalOrder(k, amount):
    arr = [[ceil(a / k), idx] for idx, a in enumerate(amount)]
    return [a[1] + 1 for a in sorted(arr, key=lambda x: x[0], reverse=False)]


k = 2
array = [3, 6, 4, 5]
getFinalOrder_(k, array), getFinalOrder(k, array)

([1, 3, 2, 4], [1, 3, 2, 4])

In [None]:
# https://www.quora.com/If-it-takes-852-digits-to-number-the-pages-of-a-book-How-many-pages-are-there

"""
1-9 = 9 digits
10-99 = 90 * 2 digits
100-199 = 100 * 3 digits
100-999 = 900 * 3 digits
"""

In [133]:
def my_decorator(func):
    def wrapper():
        print("Something is happening before the function is called.")
        func()
        print("Something is happening after the function is called.")
    return wrapper

@my_decorator
def say_hello():
    print("Hello!")

say_hello()

Something is happening before the function is called.
Hello!
Something is happening after the function is called.


In [134]:
import functools

def log_function_call(func):
    @functools.wraps(func)  # Preserves the original function's metadata
    def wrapper(*args, **kwargs):
        result = func(*args, **kwargs)
        print(f"Calling {func.__name__} with arguments {args} and keyword arguments {kwargs}.")
        print(f"{func.__name__} returned {result}.")
        return result
    return wrapper

@log_function_call
def add(a, b):
    return a + b

# Usage:
add(3, 5)

Calling add with arguments (3, 5) and keyword arguments {}.
add returned 8.


8

In [135]:
import functools

def memoize(func):
    cache = {}

    @functools.wraps(func)
    def wrapper(*args):
        if args in cache:
            print("returning from cache..")
            return cache[args]
        else:
            result = func(*args)
            cache[args] = result
            return result
    return wrapper

@memoize
def expensive_calculation(n):
    print(f"Calculating result for {n}...")
    return n * 2

# Usage:
print(expensive_calculation(5))  # Calculation is performed
print(expensive_calculation(5))  # Result is retrieved from the cache, no recalculation

Calculating result for 5...
10
returning from cache..
10


In [138]:
expensive_calculation(51)

returning from cache..


102

In [139]:
@memoize
def func(n):
    print(f"Calculating result for {n}...")
    return n // 2

In [141]:
func(51)

returning from cache..


25

In [142]:
# code for testing decorator chaining
def decor1(func):
	def inner():
		x = func()
		return x * x
	return inner

def decor(func):
	def inner():
		x = func()
		return 2 * x
	return inner

@decor1
@decor
def num():
	return 10

@decor
@decor1
def num2():
	return 10

print(num())
print(num2())

400
200


In [None]:
"""
1. Normalisation Types (tick)
2. Attention (tick)
3. Transformer Architecture (tick)
4. Different types of activations (tick)
5. Docker vs Virtual Environment (tick) 
6. API 
7. Chatbot 
8. How to reduce/stop hallucinations 
9. Lambda Function / Decorator / Filter / Map
"""

In [146]:
f = lambda prefix, x: [[prefix + str(i)] for i in range(x)]

In [149]:
query = f("q", 3)
key = f("k", 3)
value = f("v", 3)

query, key, value

[['q0'], ['q1'], ['q2']] [['k0'], ['k1'], ['k2']] [['v0'], ['v1'], ['v2']]


In [None]:
[q0k0, q0k1, 0]
[q1k0, q1k1, 0]   / math.sqrt(dim)
[q2k0, q2k1, 0]

In [None]:
f = residual + normalisation

Embedding + PE 
-> Encoder(f(mh.attention) + f(FFNN)) 
-> Decoder(f(self attention) f(cross attention) + f(FFNN))

In [None]:
Mehek mah darlin

q1, q2, q3
k1, k2, k3, k4
v1, v2, v3, v4

softmax((q1k1, q1k2, q1k3, q1k4) / sqrt(dim)))
q1k1.v1 + q1k2.v2 + q1k3.v3 + q1k4.v4