## simple conv. net

In [1]:
# Created by ChatGPT (OpenAI) - 2023
# with manual edits

import torch
import torch.nn as nn
import torch.optim as optim

# Define the dataset
random_strings = [
    "dog is playing with a ball",
    "the cat is sleeping",
    "the cats are sleeping",
    "the cat jumped over the fence",
    "birds are chirping in the trees",
    "the cat is hiding under the bed",
    "ccaccctat",
    "ccaccctccaatt",
    "ccaccctcatccatt",
]
labels = [0, 1, 1, 1, 0, 1, 0, 0, 1]  # 1 if the string contains "cat", 0 otherwise

# Define the vocabulary
vocabulary = {'<pad>': 0, ' ': 1, 'a': 2, 'b': 3, 'c': 4, 'd': 5, 'e': 6, 'f': 7,
              'g': 8, 'h': 9, 'i': 10, 'j': 11, 'k': 12, 'l': 13, 'm': 14, 'n': 15,
              'o': 16, 'p': 17, 'q': 18, 'r': 19, 's': 20, 't': 21, 'u': 22, 'v': 23,
              'w': 24, 'x': 25, 'y': 26, 'z': 27}

# Convert strings to tensors
# max_length = max(len(s) for s in random_strings)
INPUT_LENGTH = 64
X = torch.zeros((len(random_strings), INPUT_LENGTH), dtype=torch.long)
for i, string in enumerate(random_strings):
    for j, char in enumerate(string):
        X[i, j] = vocabulary.get(char.lower(), 0)
        
# Define the ConvNet architecture
class CatConvNet(nn.Module):
    def __init__(self, vocab_size, embedding_dim, num_filters, kernel_sizes, hidden_dim):
        super(CatConvNet, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.convs = nn.ModuleList([
            nn.Conv1d(embedding_dim, num_filters, kernel_size) for kernel_size in kernel_sizes
        ])
        self.fc = nn.Linear(len(kernel_sizes) * num_filters, hidden_dim)
        self.output = nn.Linear(hidden_dim, 1)
    
    def forward(self, x):
        x = self.embedding(x)
        x = x.transpose(1, 2)  # Convert to shape: (batch_size, embedding_dim, sequence_length)
        x = [torch.relu(conv(x)) for conv in self.convs]  # Apply convolutional layers
        x = [torch.max_pool1d(conv_out, conv_out.size(2)).squeeze(2) for conv_out in x]  # Max pooling
        x = torch.cat(x, dim=1)
        x = torch.relu(self.fc(x))
        x = torch.sigmoid(self.output(x))
        return x

# Initialize the model
VOCAB_SIZE = len(vocabulary)
embedding_dim = 16
N_FILTERS = 32
kernel_sizes = [3, 4, 5]
hidden_dim = 64
model = CatConvNet(VOCAB_SIZE, embedding_dim, N_FILTERS, kernel_sizes, hidden_dim)

# Define the loss function and optimizer
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 1000
for epoch in range(num_epochs):
    # Forward
    output = model(X)
    # print(output.shape, labels)
    loss = criterion(output, torch.tensor(labels, dtype=torch.float32).unsqueeze(1))
    
    # Backward pass and optimization
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    # Print the loss at every 100 epochs
    if (epoch + 1) % (num_epochs/5) == 0:
        print(f"Epoch: {epoch+1}/{num_epochs}, Loss: {loss.item()}")

# Test the model
test_strings = [
    "the dog chased the cat",
    "cats are cute",
    "the cat is on the mat",
    "a rabbit ran past",
    "ccaatt",
]

model.eval()
with torch.no_grad():
    for string in test_strings:
        test_X = torch.zeros((1, INPUT_LENGTH), dtype=torch.long)
        for j, char in enumerate(string):
            test_X[0, j] = vocabulary.get(char.lower(), 0)
        
        predicted = model(test_X)
        predicted_label = int(predicted.round().item())
        print(f"String: {string}, Contains 'cat': {'Yes' if predicted_label == 1 else 'No'}")

Epoch: 200/1000, Loss: 0.00047495323815383017
Epoch: 400/1000, Loss: 0.000125280290376395
Epoch: 600/1000, Loss: 5.607167986454442e-05
Epoch: 800/1000, Loss: 3.141085835522972e-05
Epoch: 1000/1000, Loss: 1.9860137399518862e-05
String: the dog chased the cat, Contains 'cat': Yes
String: cats are cute, Contains 'cat': Yes
String: the cat is on the mat, Contains 'cat': Yes
String: a rabbit ran past, Contains 'cat': No
String: ccaatt, Contains 'cat': No


## stepping though model.forward() manually to figure out what is going on...

In [2]:
_x = X[:2]; _x

tensor([[ 5, 16,  8,  1, 10, 20,  1, 17, 13,  2, 26, 10, 15,  8,  1, 24, 10, 21,
          9,  1,  2,  1,  3,  2, 13, 13,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
          0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
          0,  0,  0,  0,  0,  0,  0,  0,  0,  0],
        [21,  9,  6,  1,  4,  2, 21,  1, 10, 20,  1, 20, 13,  6,  6, 17, 10, 15,
          8,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
          0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
          0,  0,  0,  0,  0,  0,  0,  0,  0,  0]])

In [3]:
_emb = model.embedding(_x)
print(_emb.shape)
_emb

torch.Size([2, 64, 16])


tensor([[[ 0.3067, -0.0899,  1.2317,  ...,  0.1927, -0.5810, -0.6559],
         [-0.1324, -2.3706, -1.6330,  ...,  1.5376,  0.0507, -0.3009],
         [-0.5223,  0.9952, -0.8134,  ..., -0.3842, -0.7712,  1.1321],
         ...,
         [ 0.7829, -0.0130,  0.6944,  ...,  0.0903, -0.5905, -0.0296],
         [ 0.7829, -0.0130,  0.6944,  ...,  0.0903, -0.5905, -0.0296],
         [ 0.7829, -0.0130,  0.6944,  ...,  0.0903, -0.5905, -0.0296]],

        [[ 1.4782, -1.5622, -1.5464,  ..., -0.6063, -0.8329,  2.3900],
         [ 0.2707, -0.5611,  0.5882,  ..., -0.8165,  1.0123,  0.9820],
         [-1.2866,  1.9623, -0.0764,  ...,  1.0592, -0.2838,  0.8894],
         ...,
         [ 0.7829, -0.0130,  0.6944,  ...,  0.0903, -0.5905, -0.0296],
         [ 0.7829, -0.0130,  0.6944,  ...,  0.0903, -0.5905, -0.0296],
         [ 0.7829, -0.0130,  0.6944,  ...,  0.0903, -0.5905, -0.0296]]],
       grad_fn=<EmbeddingBackward0>)

In [4]:
_xtr = _emb.transpose(1, 2)
print(_xtr.shape)
_xtr

torch.Size([2, 16, 64])


tensor([[[ 0.3067, -0.1324, -0.5223,  ...,  0.7829,  0.7829,  0.7829],
         [-0.0899, -2.3706,  0.9952,  ..., -0.0130, -0.0130, -0.0130],
         [ 1.2317, -1.6330, -0.8134,  ...,  0.6944,  0.6944,  0.6944],
         ...,
         [ 0.1927,  1.5376, -0.3842,  ...,  0.0903,  0.0903,  0.0903],
         [-0.5810,  0.0507, -0.7712,  ..., -0.5905, -0.5905, -0.5905],
         [-0.6559, -0.3009,  1.1321,  ..., -0.0296, -0.0296, -0.0296]],

        [[ 1.4782,  0.2707, -1.2866,  ...,  0.7829,  0.7829,  0.7829],
         [-1.5622, -0.5611,  1.9623,  ..., -0.0130, -0.0130, -0.0130],
         [-1.5464,  0.5882, -0.0764,  ...,  0.6944,  0.6944,  0.6944],
         ...,
         [-0.6063, -0.8165,  1.0592,  ...,  0.0903,  0.0903,  0.0903],
         [-0.8329,  1.0123, -0.2838,  ..., -0.5905, -0.5905, -0.5905],
         [ 2.3900,  0.9820,  0.8894,  ..., -0.0296, -0.0296, -0.0296]]],
       grad_fn=<TransposeBackward0>)

In [5]:
_xconv = [torch.relu(conv(_xtr)) for conv in model.convs]
print([c.shape for c in _xconv])
print([c.size(2) for c in _xconv])

[torch.Size([2, 32, 62]), torch.Size([2, 32, 61]), torch.Size([2, 32, 60])]
[62, 61, 60]


In [6]:
_xpool = [torch.max_pool1d(conv_out, conv_out.size(2)).squeeze(2) for conv_out in _xconv]  # Max pooling
print([p.shape for p in _xpool])

[torch.Size([2, 32]), torch.Size([2, 32]), torch.Size([2, 32])]


In [8]:
_xcat = torch.cat(_xpool, dim=1)
_xcat.shape

torch.Size([2, 96])

In [9]:
_xfc = torch.relu(model.fc(_xcat))
_xfc.shape

torch.Size([2, 64])

In [10]:
_xo = torch.sigmoid(model.output(_xfc))
_xo.shape

torch.Size([2, 1])

## plan procedure with one hot encoding

In [11]:
max_len, VOCAB_SIZE = 64, 32

_xoh = nn.functional.one_hot(_x, num_classes=VOCAB_SIZE)
print(_xoh.shape)
print([int(x) for x in _xoh[0][0]])

torch.Size([2, 64, 32])
[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]


In [12]:
_xtr = _xoh.transpose(1, 2)  # Convert to shape: (batch_size, embedding_dim, sequence_length)
_xtr = _xtr.float()
_xtr.shape

torch.Size([2, 32, 64])

In [13]:
FITLERS = 8
KS = 5
STRIDE = 1
f_conv = nn.Conv1d(VOCAB_SIZE, FITLERS, KS, STRIDE)
_xconv = f_conv(_xtr)
_xconv.shape # samples, filters, signal at each conv. step

torch.Size([2, 8, 60])

In [15]:
def _signal_len(l, stride, kernel):
    return (l // stride) - kernel + 1
_conv_signal_len = _signal_len(max_len, STRIDE, KS)
print(_conv_signal_len)

f_pool = nn.MaxPool1d(_conv_signal_len) # pool all conv. steps in one go
_xpool = f_pool(_xconv)
_xpool.shape

60


torch.Size([2, 8, 1])

In [17]:
torch.cat([p.squeeze(2) for p in [_xpool, _xpool]], dim=1).shape

torch.Size([2, 16])

## make model with one hot encoding

In [20]:
import numpy as np

VOCAB_SIZE = 32
vocabulary = {'░': 0, ' ': 1, 'a': 2, 'b': 3, 'c': 4, 'd': 5, 'e': 6, 'f': 7,
              'g': 8, 'h': 9, 'i': 10, 'j': 11, 'k': 12, 'l': 13, 'm': 14, 'n': 15,
              'o': 16, 'p': 17, 'q': 18, 'r': 19, 's': 20, 't': 21, 'u': 22, 'v': 23,
              'w': 24, 'x': 25, 'y': 26, 'z': 27}

inv_vocab = {v:k for k, v in vocabulary.items()}

def _make_train_set(min_len, max_len, n, target: str):
    sd = (max_len - min_len)/4
    av_len = (max_len+min_len)/2
    lengths = np.random.normal(av_len, sd, n).round().clip(min_len, max_len).astype(int)
    true_cases = np.random.random(n) > 0.5
    entries = []
    for l, is_t in zip(lengths, true_cases):
        entry = [inv_vocab[k] for k in np.random.randint(1, len(vocabulary), size=l)]
        if is_t:
            pos = np.random.randint(0, l-len(target))
            for i, c in enumerate(target):
                entry[pos+i] = c
        entries.append(("".join(entry), is_t))

    return entries


INPUT_LENGTH = 64
N_EXAMPLES = 1000 
TARGET = "cat"
train_cases = _make_train_set(INPUT_LENGTH//2, INPUT_LENGTH, N_EXAMPLES, TARGET)
train_x = torch.zeros((len(train_cases), INPUT_LENGTH), dtype=torch.long)
train_y = [ans for case, ans in train_cases]
for i, (string, ans) in enumerate(train_cases):
    for j, char in enumerate(string):
        train_x[i, j] = vocabulary.get(char.lower(), 0)

train_x.shape

torch.Size([1000, 64])

In [21]:
class ConvOH(nn.Module):
    def __init__(self, kernels) -> None:
        super(ConvOH, self).__init__()
        def _signal_len(l, kernel, stride=1):
            return (l // stride) - kernel + 1

        def _one_hot(x): # deterministic, since x is indicies
            _oh = nn.functional.one_hot(x, num_classes=VOCAB_SIZE)
            # swap axis 1 and 2
            # tokens X OH embedding -> OH embedding channel X token is this category (channel)
            # ex. scanning through channel for char "k", a 1 == this token is "k", else not "k"
            return _oh.transpose(1, 2).float()
        
        self.l_conv_kernels = nn.ModuleList([
            nn.Conv1d(VOCAB_SIZE, n, s)
        for n, s in kernels])
        def _conv(x):
            return [torch.relu(c(x)) for c in self.l_conv_kernels]
        
        self.l_pools = nn.ModuleList([
            nn.MaxPool1d(_signal_len(INPUT_LENGTH, s))
        for n, s in kernels])
        def _pool(x):
            # pool hits for each kernel
            # squeeze (remove) the last axis since pooling entire signal length so len(axis2)==1
            # hits is [samples X filters for k in kernels]
            hits = [p(v).squeeze(2) for p, v in zip(self.l_pools, x)]
            return torch.cat(hits, dim=1) # concat hits for each kernel

        self.l_out = nn.Linear(sum([n for n, s in kernels]), 1)
        def _infer(x):
            # examine results across all filters to find hits
            return torch.sigmoid(self.l_out(x))
        
        self._procedure = [
            _one_hot, _conv, _pool, _infer
        ]

    def forward(self, x):
        _temp = x
        for fn in self._procedure:
            _temp = fn(_temp)
        return _temp

    
model_oh = ConvOH([ # (filters, size)
    (12, 5),
])
criterion = nn.BCELoss()
optimizer = optim.Adam(model_oh.parameters(), lr=0.001)
# optimizer = optim.SGD(model_oh.parameters(), lr=0.001)

# Training loop
num_epochs = 1000
for epoch in range(num_epochs):
    # Forward
    output = model_oh(train_x)
    # print(output.shape, labels)
    loss = criterion(output, torch.tensor(train_y, dtype=torch.float32).unsqueeze(1))
    
    # Backward pass and optimization
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    # Print the loss at every 100 epochs
    if (epoch + 1) % (num_epochs/5) == 0:
        print(f"Epoch: {epoch+1:04} of {num_epochs}, Loss: {loss.item():0.5f}")



# Test the model
test_strings = [
    "the dog chased the cat",
    "cats are cute",
    "the cat is on the mat",
    "a rabbit ran past",
    "ccaatt",
]

model_oh.eval()
with torch.no_grad():
    for string in test_strings:
        test_X = torch.zeros((1, INPUT_LENGTH), dtype=torch.long)
        for j, char in enumerate(string):
            test_X[0, j] = vocabulary.get(char.lower(), 0)
        
        predicted = model_oh(test_X)
        predicted_label = int(predicted.round().item())
        print(f"String: {string}, Contains 'cat': {'Yes' if predicted_label == 1 else 'No'}")

Epoch: 0200 of 1000, Loss: 0.26836
Epoch: 0400 of 1000, Loss: 0.07356
Epoch: 0600 of 1000, Loss: 0.03105
Epoch: 0800 of 1000, Loss: 0.01647
Epoch: 1000 of 1000, Loss: 0.00999
String: the dog chased the cat, Contains 'cat': Yes
String: cats are cute, Contains 'cat': Yes
String: the cat is on the mat, Contains 'cat': Yes
String: a rabbit ran past, Contains 'cat': No
String: ccaatt, Contains 'cat': Yes


In [22]:
def decode(filter):
    f: np.ndarray = filter.transpose(0, 1).detach().numpy()
    result = []
    for emb in f:
        i = np.argmax(emb)
        result.append(inv_vocab[i])
    return "".join(result), f.max(axis=1)

for kernel in model_oh.l_conv_kernels:
    for f in kernel.weight:
        s, scores = decode(f)
        print(s, scores)

catuz [0.6710437  0.6439772  0.5972075  0.35859096 0.34459642]
jcpaz [0.57144797 0.5423914  0.62578523 0.73969066 0.5217017 ]
gcatr [0.23899287 0.5146104  0.5090941  0.46041834 0.35250074]
p cat [0.2855157  0.2708299  0.49754673 0.4118676  0.5134269 ]
tacat [0.45216578 0.55285364 0.50897306 0.5009853  0.57777077]
samsf [0.7051544  0.6818988  0.7662786  0.66386276 0.6871013 ]
ifgjr [0.6068009  0.55019397 0.55477536 0.5676706  0.6157469 ]
pheos [0.65311545 0.6532108  0.7731761  0.7372977  0.60265785]
catez [0.7484057  0.77040267 0.66571456 0.28688735 0.30866337]
wnclj [0.6326445  0.5397061  0.68225783 0.67811763 0.63919127]
attij [0.73628515 0.63500434 0.21461163 0.22101949 0.19894506]
acatx [0.2685592  0.5878102  0.52868205 0.59639597 0.26977998]


In [23]:
model_oh.l_out.weight

Parameter containing:
tensor([[ 0.6629, -0.8011,  0.7902,  0.8332,  0.6761, -0.8127, -0.8117, -0.7340,
          0.7401, -0.8250,  0.8709,  0.7557]], requires_grad=True)