In [968]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import einx
import random
from torch.utils.data import Dataset, DataLoader

def genList(listSize):
        return [(-1 if random.randint(0, 1) else random.random()) for x in range(listSize)]

def genAnswerKey(inputList):
    answerKeyList = []
    for val in inputList:
        numLessThanVal = 0
        for otherVal in inputList:
            if otherVal < val and otherVal > 0:
                numLessThanVal += 1
        answerKeyList.append(numLessThanVal)
    return answerKeyList


class ListDataset(Dataset):
    def __init__(self, listSize, datasetSize):
        self.datasetSize = datasetSize
        self.listSize = listSize
        self.allLists = [genList(self.listSize) for idx in range(self.datasetSize)]
        self.allAnswerKeys = [genAnswerKey(inputList) for inputList in self.allLists]

    def __len__(self):
        return self.datasetSize

    def __getitem__(self, idx):
        return torch.tensor(self.allLists[idx]).float(), torch.tensor(self.allAnswerKeys[idx]).float()

random.seed(0)
np.random.seed(0)
torch.manual_seed(0)

basicDataset = ListDataset(3, 100)
sampleLoader = DataLoader(basicDataset, batch_size=1, shuffle=True)
sampleInputTensor, sampleAnswerKey = next(iter(sampleLoader))
print(sampleInputTensor)
print(sampleAnswerKey)

tensor([[-1.0000,  0.7272,  0.9370]])
tensor([[0., 0., 1.]])


In [969]:

# Transform a (B, L) list to a (B, L, d_emb)
class Stem(nn.Module):
    def __init__(self, embeddingDim):
        super().__init__()
        self.norm = einn.Norm("b [c]")
        self.linearToEmbDim = nn.Linear(1, embeddingDim)
        self.gelu = nn.GELU()
        

    # X expected to be a (B, L) list
    def forward(self, x):
        x = self.norm(x)
        x = einx.rearrange("b c -> b c 1", x)
        x = self.linearToEmbDim(x)
        x = self.gelu(x)
        return x


stem = Stem(4)
sampleEmb = stem(sampleInputTensor)
print(sampleEmb.shape)

torch.Size([1, 3, 4])


In [970]:
keyDim = 2

sampleQuery = torch.tensor([
    [
        [0., 1],
        [1, 0],
        [1, 1]
    ]
])

sampleKey = torch.tensor([
    [
        [1., 0],
        [0, 1],
        [1, 1]
    ]
])

# Perform attention from all queries to all keys
dotProd = einx.dot("b q [d], b k [d] -> b q k", sampleQuery, sampleKey)
print(dotProd)

softMaxPerQuery = einx.softmax("b q [k]", dotProd / keyDim)
print(softMaxPerQuery)

sampleValDeltas = torch.tensor([
    [
        [2., 0],
        [0, 1],
        [1, 0]
    ]
])

# Each value is associated with the key that outputted that value so we do a matrix multiply across that key dimension to get the weighted sums 
# of each value embedding for each query:
appliedAttentionWeights = einx.dot("b q [k], b [k] h -> b q h", softMaxPerQuery, sampleValDeltas)

print(appliedAttentionWeights)

tensor([[[0., 1., 1.],
         [1., 0., 1.],
         [1., 1., 2.]]])
tensor([[[0.2327, 0.3837, 0.3837],
         [0.3837, 0.2327, 0.3837],
         [0.2741, 0.2741, 0.4519]]])
tensor([[[0.8490, 0.3837],
         [1.1510, 0.2327],
         [1.0000, 0.2741]]])


In [971]:
import einx.nn.torch as einn



class AttentionHead(nn.Module):
    def __init__(self, emb_dim, head_size):
        super().__init__()
        self.keyMap = nn.Linear(emb_dim, head_size, bias=False)
        self.queryMap = nn.Linear(emb_dim, head_size, bias=False)
        self.valueDownMap = nn.Linear(emb_dim, head_size, bias=False)
        self.valueUpMap = nn.Linear(head_size, emb_dim, bias=False)

    def forward(self, x):
        # (B, C, emb_dim)
        key = self.keyMap(x) # (B, C, head_size)
        query = self.queryMap(x) # (B, C, head_size)
        valueDown = self.valueDownMap(x) # (B, C, head_size)

        # Self-attention, pairwise dot-prod all queries to all keys then softmax
        attentionDotProd = einx.dot("b q [d], b k [d] -> b q k", query, key) # (B, C, C)
        softMaxPerQuery = einx.softmax("b q [k]", attentionDotProd / np.sqrt(keyDim))
        # Take the weighted sum of the valueDown heads for each channel, where each channel corresponds to 
        appliedAttentionWeights = einx.dot("b q [k], b [k] h -> b q h", softMaxPerQuery, valueDown) # (B C head_size)

        embDelta = self.valueUpMap(appliedAttentionWeights) # (B, C, emb_dim)

        return embDelta

emb_dim = 4
batchSize = 1
numChannels = 3
head_size = 2

atHead = AttentionHead(emb_dim, head_size)
atHead(sampleEmb)

tensor([[[-0.0556, -0.0076,  0.0241,  0.0462],
         [-0.0562, -0.0074,  0.0243,  0.0465],
         [-0.0565, -0.0074,  0.0244,  0.0467]]], grad_fn=<UnsafeViewBackward0>)

In [972]:
class MultiHeadedAttention(nn.Module):
    def __init__(self, emb_dim, num_heads):
        super().__init__()
        self.head_size = emb_dim // num_heads
        self.attention_heads = nn.ModuleList([AttentionHead(emb_dim, self.head_size) for _ in range(num_heads)])
        self.norm = nn.LayerNorm(emb_dim)
    
    def forward(self, x):
        valueDelta = torch.zeros_like(x)
        for head in self.attention_heads:
            valueDelta += head(x)
        return self.norm(valueDelta)

num_heads = 2
multiHeadedAtt = MultiHeadedAttention(emb_dim, num_heads)
multiHeadedAtt(sampleEmb)

tensor([[[ 1.5126, -1.2724, -0.2850,  0.0449],
         [ 1.5051, -1.2824, -0.2778,  0.0550],
         [ 1.5009, -1.2877, -0.2746,  0.0613]]],
       grad_fn=<NativeLayerNormBackward0>)

In [973]:
class TransformerBlock(nn.Module):
    def __init__(self, emb_dim, num_heads):
        super().__init__()
        self.attention = MultiHeadedAttention(emb_dim, num_heads)
        self.feedforward_nonlinear = nn.Sequential(
            nn.Linear(emb_dim, emb_dim * 4),
            nn.GELU(),
            nn.Linear(emb_dim * 4, emb_dim)
        )
    def forward(self, x):
        x = x + self.attention(x)
        x = x + self.feedforward_nonlinear(x)
        return x

block = TransformerBlock(emb_dim, num_heads)
block(sampleEmb)

tensor([[[ 1.1220, -0.1524,  1.0363, -1.5487],
         [ 0.6964,  0.1712,  1.0027, -1.2601],
         [ 0.6674,  0.2180,  0.9976, -1.1155]]], grad_fn=<AddBackward0>)

In [974]:
emb_dim = 64
num_heads = 8
num_blocks = 6

class Transformer(nn.Module):
    def __init__(self):
        super().__init__()
        self.stem = Stem(emb_dim)

        self.transformerBlocks = nn.ModuleList([TransformerBlock(emb_dim, num_heads) for _ in range(num_blocks)])

        self.output = nn.Sequential(
            nn.Linear(emb_dim, 1)
        )

    def forward(self, x):
        x = self.stem(x)
        for block in self.transformerBlocks:
            x = block(x)
        x = self.output(x)
        return x.squeeze(-1)
        
sampleInputTensor = next(iter(sampleLoader))[0]
sampleTransformer = Transformer()
sampleTransformer(sampleInputTensor).shape

torch.Size([1, 3])

In [975]:

transformer = Transformer()

trainDataset = ListDataset(10, 1000)
trainLoader = DataLoader(trainDataset, batch_size=64, shuffle=True)
testDataset = ListDataset(10, 1000)
testLoader = DataLoader(testDataset, batch_size=256, shuffle=True)

inputTensor, targetTensor = next(iter(trainLoader))
print(inputTensor.shape)
transformer(inputTensor).shape

torch.Size([64, 10])


torch.Size([64, 10])

In [977]:
from torch.optim.lr_scheduler import CosineAnnealingLR

n_epochs = 100
initialLearningRate = 0.003

model = Transformer()

optimizer = torch.optim.Adam(model.parameters(), lr = initialLearningRate)
scheduler = CosineAnnealingLR(optimizer, n_epochs)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

criterion = nn.L1Loss()

for epoch in range(n_epochs):
    runningLoss = 0
    for inputTensor, targetTensor in trainLoader:
        model.zero_grad()
        pred = model(inputTensor.to(device))
        loss = criterion(pred, targetTensor.to(device))
        loss.backward()
        runningLoss += loss.item() * inputTensor.shape[0]
        optimizer.step()
    scheduler.step()

    runningTestLoss = 0
    for inputTensor, targetTensor in testLoader:
        with torch.no_grad():
            pred = model(inputTensor.to(device))
            loss = criterion(pred, targetTensor.to(device))
            runningTestLoss += loss.item() * inputTensor.shape[0]
    if epoch % 10 == 0:
        print(f"Epoch {epoch:< 4}Train Loss{round(runningLoss / len(trainDataset), 3):< 10}TestLoss{round(runningTestLoss / len(testDataset), 2):< 10}")


Epoch  0  Train Loss 2.012    TestLoss 0.91     
Epoch  10 Train Loss 0.169    TestLoss 0.18     
Epoch  20 Train Loss 0.099    TestLoss 0.12     
Epoch  30 Train Loss 0.076    TestLoss 0.17     
Epoch  40 Train Loss 0.066    TestLoss 0.05     
Epoch  50 Train Loss 0.048    TestLoss 0.05     
Epoch  60 Train Loss 0.028    TestLoss 0.03     
Epoch  70 Train Loss 0.018    TestLoss 0.02     
Epoch  80 Train Loss 0.015    TestLoss 0.02     
Epoch  90 Train Loss 0.012    TestLoss 0.01     


In [855]:
inp, t = next(iter(trainLoader))
def parseTensor(tensor):
    return '\t'.join([f"{x.item():.2f}" for x in tensor])
print("Input List:")
print(parseTensor(inp[0]))
print("True Target List:")
print(parseTensor(t[0]))
print("Predicted List:")
print(parseTensor(model(inp.to(device))[0]))


print()
print()
inp = torch.tensor([[0.1, 0.2, 0.3, 0.4, 0.5, -1, -1, -1, -1, -1]])
trueTarget = torch.tensor(genAnswerKey(inp[0]))
print("An In Distribution Input:")
print(parseTensor(inp[0]))
print("True Target:")
print(parseTensor(trueTarget))
print("Predictd List:")
print(parseTensor(model(inp.to(device))[0]))


print()
print()
inp = torch.tensor([[0.5, 0.6, 0.7, 0.8, 0.9, 0.95, 0.97, -1, -1, -1]])
trueTarget = torch.tensor(genAnswerKey(inp[0]))
print("An Out Of Distribution Input:")
print(parseTensor(inp[0]))
print("True Target:")
print(parseTensor(trueTarget))
print("Predictd List:")
print(parseTensor(model(inp.to(device))[0]))


print()
print()
inp = torch.tensor([[0.9, 0.95, 0.975, 0.99, 0.995, 0.9975, 0.9999, -1, -1, -1]])
trueTarget = torch.tensor(genAnswerKey(inp[0]))
print("A Pathological Input:")
print(inp[0])
print("True Target:")
print(parseTensor(trueTarget))
print("Predictdd List:")
print(parseTensor(model(inp.to(device))[0]))

Input List:
-1.00	0.38	0.77	-1.00	0.45	0.28	-1.00	-1.00	0.87	0.31
True Target List:
0.00	2.00	4.00	0.00	3.00	0.00	0.00	0.00	5.00	1.00
Predicted List:
0.00	1.99	4.00	0.00	3.02	-0.00	-0.00	0.00	5.01	1.01


An In Distribution Input:
0.10	0.20	0.30	0.40	0.50	-1.00	-1.00	-1.00	-1.00	-1.00
True Target:
0.00	1.00	2.00	3.00	4.00	0.00	0.00	0.00	0.00	0.00
Predictd List:
-0.00	1.00	2.00	3.01	4.00	0.00	-0.00	0.00	0.00	0.00


An Out Of Distribution Input:
0.50	0.60	0.70	0.80	0.90	0.95	0.97	-1.00	-1.00	-1.00
True Target:
0.00	1.00	2.00	3.00	4.00	5.00	6.00	0.00	0.00	0.00
Predictd List:
0.00	0.99	2.04	3.03	3.92	5.04	6.03	0.00	0.00	0.00


A Pathological Input:
tensor([ 0.9000,  0.9500,  0.9750,  0.9900,  0.9950,  0.9975,  0.9999, -1.0000,
        -1.0000, -1.0000])
True Target:
0.00	1.00	2.00	3.00	4.00	5.00	6.00	0.00	0.00	0.00
Predictdd List:
0.01	1.08	2.44	3.21	4.47	5.00	5.68	0.00	0.00	0.00


In [856]:
from torch.optim.lr_scheduler import CosineAnnealingLR

class DumbModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.dumbLin = nn.Sequential(
            nn.Linear(10, 64),
            nn.GELU(),
            nn.Linear(64,8),
            nn.GELU(),
            nn.Linear(8,10)
        )
    def forward(self, x):
        return self.dumbLin(x)

n_epochs = 500
initialLearningRate = 0.03

dumbModel = DumbModel()

optimizer = torch.optim.Adam(dumbModel.parameters(), lr = initialLearningRate)
scheduler = CosineAnnealingLR(optimizer, n_epochs)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
dumbModel.to(device)

criterion = nn.L1Loss()

for epoch in range(n_epochs):
    runningLoss = 0
    for inputTensor, targetTensor in trainLoader:
        dumbModel.zero_grad()
        pred = dumbModel(inputTensor.to(device))
        loss = criterion(pred, targetTensor.to(device))
        loss.backward()
        runningLoss += loss.item() * inputTensor.shape[0]
        optimizer.step()
    scheduler.step()

    runningTestLoss = 0
    for inputTensor, targetTensor in testLoader:
        with torch.no_grad():
            pred = dumbModel(inputTensor.to(device))
            loss = criterion(pred, targetTensor.to(device))
            runningTestLoss += loss.item() * inputTensor.shape[0]
    if epoch % 50 == 0:
        print(f"Epoch {epoch:< 7}Train Loss{round(runningLoss / len(trainDataset), 3):< 10}TestLoss{round(runningTestLoss / len(testDataset), 2):< 10}")


Epoch  0     Train Loss 1.132    TestLoss 1.04     
Epoch  50    Train Loss 0.495    TestLoss 0.52     
Epoch  100   Train Loss 0.469    TestLoss 0.5      
Epoch  150   Train Loss 0.436    TestLoss 0.49     
Epoch  200   Train Loss 0.425    TestLoss 0.48     
Epoch  250   Train Loss 0.404    TestLoss 0.48     
Epoch  300   Train Loss 0.391    TestLoss 0.48     
Epoch  350   Train Loss 0.375    TestLoss 0.48     
Epoch  400   Train Loss 0.365    TestLoss 0.47     
Epoch  450   Train Loss 0.358    TestLoss 0.47     


In [853]:
inp, t = next(iter(trainLoader))
def parseTensor(tensor):
    return '\t'.join([f"{x.item():.2f}" for x in tensor])
print("Input List:")
print(parseTensor(inp[0]))
print("True Target List:")
print(parseTensor(t[0]))
print("Predicted List:")
print(parseTensor(dumbModel(inp.to(device))[0]))


print()
print()
inp = torch.tensor([[0.1, 0.2, 0.3, 0.4, 0.5, -1, -1, -1, -1, -1]])
trueTarget = torch.tensor(genAnswerKey(inp[0]))
print("An In Distribution Input:")
print(parseTensor(inp[0]))
print("True Target:")
print(parseTensor(trueTarget))
print("Predictd List:")
print(parseTensor(dumbModel(inp.to(device))[0]))


print()
print()
inp = torch.tensor([[0.5, 0.6, 0.7, 0.8, 0.9, 0.95, 0.97, -1, -1, -1]])
trueTarget = torch.tensor(genAnswerKey(inp[0]))
print("An Out Of Distribution Input:")
print(parseTensor(inp[0]))
print("True Target:")
print(parseTensor(trueTarget))
print("Predictd List:")
print(parseTensor(dumbModel(inp.to(device))[0]))


print()
print()
inp = torch.tensor([[0.9, 0.95, 0.975, 0.99, 0.995, 0.9975, 0.9999, -1, -1, -1]])
trueTarget = torch.tensor(genAnswerKey(inp[0]))
print("A Pathological Input:")
print(inp[0])
print("True Target:")
print(parseTensor(trueTarget))
print("Predictdd List:")
print(parseTensor(dumbModel(inp.to(device))[0]))

Input List:
0.53	-1.00	-1.00	-1.00	-1.00	0.72	-1.00	-1.00	-1.00	0.04
True Target List:
1.00	0.00	0.00	0.00	0.00	2.00	0.00	0.00	0.00	0.00
Predicted List:
0.96	-0.00	0.02	-0.00	-0.00	-0.01	0.00	0.00	-0.00	-0.12


An In Distribution Input:
0.10	0.20	0.30	0.40	0.50	-1.00	-1.00	-1.00	-1.00	-1.00
True Target:
0.00	1.00	2.00	3.00	4.00	0.00	0.00	0.00	0.00	0.00
Predictd List:
0.28	0.00	2.32	2.79	3.02	0.58	-0.00	-0.00	-0.00	0.00


An Out Of Distribution Input:
0.50	0.60	0.70	0.80	0.90	0.95	0.97	-1.00	-1.00	-1.00
True Target:
0.00	1.00	2.00	3.00	4.00	5.00	6.00	0.00	0.00	0.00
Predictd List:
0.63	0.00	3.62	1.76	3.34	1.18	4.19	0.00	0.00	0.00


A Pathological Input:
tensor([ 0.9000,  0.9500,  0.9750,  0.9900,  0.9950,  0.9975,  0.9999, -1.0000,
        -1.0000, -1.0000])
True Target:
0.00	1.00	2.00	3.00	4.00	5.00	6.00	0.00	0.00	0.00
Predictdd List:
2.46	0.00	4.07	3.02	2.88	1.00	3.36	0.00	0.00	0.00
