In [11]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import math
from torchtext.datasets import AG_NEWS
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator

# Define the Transformer model
class TransformerClassifier(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, num_classes, num_heads, num_encoder_layers):
        super(TransformerClassifier, self).__init__()
        self.embed_dim = embed_dim
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.transformer_encoder = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(d_model=embed_dim, nhead=num_heads),
            num_layers=num_encoder_layers
        )
        self.fc = nn.Linear(embed_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, num_classes)

    def forward(self, src):
        # src: (seq_len, batch_size)
        embedded = self.embedding(src) * math.sqrt(self.embed_dim)
        encoded = self.transformer_encoder(embedded)
        # Max-pooling along the time dimension (sequence length)
        pooled, _ = torch.max(encoded, dim=0)
        out = F.relu(self.fc(pooled))
        out = self.fc2(out)
        return out

# Rest of the code remains the same...


In [13]:
# Load AG News dataset
def yield_tokens(data_iter):
    for _, text in data_iter:
        yield get_tokenizer("basic_english")(text)
        
train_iter, test_iter = AG_NEWS()
vocab = build_vocab_from_iterator(yield_tokens(train_iter), specials=["<unk>"])
vocab.set_default_index(vocab["<unk>"])

def text_pipeline(text):
    return vocab(get_tokenizer("basic_english")(text))

# Hyperparameters
vocab_size = len(vocab)
embed_dim = 128
hidden_dim = 256
num_classes = 4
num_heads = 8
num_encoder_layers = 6
batch_size = 64
epochs = 10
learning_rate = 0.001

# Create the model
model = TransformerClassifier(vocab_size, embed_dim, hidden_dim, num_classes, num_heads, num_encoder_layers)

# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Training loop
model.train()
for epoch in range(epochs):
    total_loss = 0
    total_correct = 0
    total_samples = 0
    for idx, (label, text) in enumerate(train_iter):
        optimizer.zero_grad()
        text_tensor = torch.tensor(text_pipeline(text), dtype=torch.long)
        label_tensor = torch.tensor([label - 1], dtype=torch.long)  # Labels in AG News start from 1, so we subtract 1 to make them 0-indexed
        output = model(text_tensor)
        loss = criterion(output, label_tensor)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

        # Compute accuracy
        _, predicted = torch.max(output, 1)
        total_correct += (predicted == label_tensor).sum().item()
        total_samples += 1

        if idx % 1000 == 999:  # Print every 1000 batches
            print(f"Epoch {epoch + 1}, Batch {idx + 1}, Loss: {total_loss / 1000:.4f}, Accuracy: {100 * total_correct / total_samples:.2f}%")
            total_loss = 0
            total_correct = 0
            total_samples = 0

# Evaluation
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for label, text in test_iter:
        text_tensor = torch.tensor(text_pipeline(text), dtype=torch.long)
        label_tensor = torch.tensor([label - 1], dtype=torch.long)
        output = model(text_tensor)
        _, predicted = torch.max(output.data, 1)
        total += 1
        correct += (predicted == label_tensor).sum().item()

print(f"Accuracy on test set: {100 * correct / total:.2f}%")


RuntimeError: size mismatch (got input: [4], target: [1])

In [8]:
import torch
import torch.nn as nn
import torch.optim as optim
import math
from torchtext.datasets import AG_NEWS
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from tqdm import tqdm

class TransformerEncoder(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, num_heads, num_encoder_layers, dropout=0.1):
        super(TransformerEncoder, self).__init__()
        self.embed_dim = embed_dim
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.pos_encoder = PositionalEncoding(embed_dim, dropout)
        encoder_layer = nn.TransformerEncoderLayer(d_model=embed_dim, nhead=num_heads)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_encoder_layers)
        
    def forward(self, src):
        # src: (seq_len, batch_size)
        embedded = self.embedding(src) * math.sqrt(self.embed_dim)
        encoded = self.pos_encoder(embedded)
        output = self.transformer_encoder(encoded)
        return output

class PositionalEncoding(nn.Module):
    def __init__(self, embed_dim, dropout=0.1, max_len=512):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)
        pe = torch.zeros(max_len, embed_dim)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, embed_dim, 2).float() * (-math.log(10000.0) / embed_dim))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(1)
        self.register_buffer('pe', pe)
    
    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)

# Load AG News dataset
train_iter, test_iter = AG_NEWS()
vocab = build_vocab_from_iterator(get_tokenizer("basic_english")(item[1]) for item in train_iter)

def text_pipeline(text):
    return vocab(get_tokenizer("basic_english")(text))

# Hyperparameters
vocab_size = len(vocab)
embed_dim = 128
hidden_dim = 256
num_heads = 8
num_encoder_layers = 6

# Create the Transformer model
model = TransformerEncoder(vocab_size, embed_dim, hidden_dim, num_heads, num_encoder_layers)

# Generate embeddings for each sample
for idx, (label, text) in tqdm(enumerate(train_iter)):
    print (label)
    print (text)
    text_tensor = torch.tensor(text_pipeline(text), dtype=torch.long)
    print (text_tensor)
    print (text_tensor.shape)
    embeddings = model(text_tensor)
    print (embeddings.shape)
    break
    # Now 'embeddings' contains the embeddings for each token in the input text
    # You can use these embeddings for further downstream tasks or analysis.
    #if idx == 10:  # Let's print the embeddings for the first 10 samples
    #    print(embeddings.size())  # (seq_len, batch_size, embed_dim)


0it [00:00, ?it/s]

3
Wall St. Bears Claw Back Into the Black (Reuters) Reuters - Short-sellers, Wall Street's dwindling\band of ultra-cynics, are seeing green again.
tensor([  430,   424,     0,  1604, 14837,   112,    65,     1,   847,    12,
           26,    13,    26,    14, 50724,     2,   430,   373,    15,     8,
        67506,     5, 52257,     2,    41,  4008,   782,   324,     0])
torch.Size([29])
torch.Size([29, 29, 128])





In [10]:
import torch
import torch.nn as nn
import torch.optim as optim
import math
from torchtext.datasets import AG_NEWS
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator

class TransformerEncoder(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, num_heads, num_encoder_layers, dropout=0.1):
        super(TransformerEncoder, self).__init__()
        self.embed_dim = embed_dim
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.pos_encoder = PositionalEncoding(embed_dim, dropout)
        encoder_layer = nn.TransformerEncoderLayer(d_model=embed_dim, nhead=num_heads)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_encoder_layers)
        
    def forward(self, src):
        # src: (seq_len, batch_size)
        embedded = self.embedding(src) * math.sqrt(self.embed_dim)
        encoded = self.pos_encoder(embedded)
        output = self.transformer_encoder(encoded)
        # Apply global average pooling to get a single embedding for the whole sentence
        pooled = torch.mean(output, dim=0)
        return pooled

class PositionalEncoding(nn.Module):
    def __init__(self, embed_dim, dropout=0.1, max_len=512):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)
        pe = torch.zeros(max_len, embed_dim)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, embed_dim, 2).float() * (-math.log(10000.0) / embed_dim))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(1)
        self.register_buffer('pe', pe)
    
    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)
# Load AG News dataset
train_iter, test_iter = AG_NEWS()
vocab = build_vocab_from_iterator(get_tokenizer("basic_english")(item[1]) for item in train_iter)

def text_pipeline(text):
    return vocab(get_tokenizer("basic_english")(text))

# Hyperparameters
vocab_size = len(vocab)
embed_dim = 128
hidden_dim = 256
num_heads = 8
num_encoder_layers = 6

# Create the Transformer model
model = TransformerEncoder(vocab_size, embed_dim, hidden_dim, num_heads, num_encoder_layers)

# Generate embeddings for each sample
for idx, (label, text) in enumerate(train_iter):
    text_tensor = torch.tensor(text_pipeline(text), dtype=torch.long)
    embedding = model(text_tensor.unsqueeze(1))  # Add a batch dimension for the single sample
    # Now 'embedding' contains the embedding for the whole sentence
    # You can use this embedding for further downstream tasks or analysis.
    if idx == 10:  # Let's print the embedding for the first 10 samples
        print(embedding.size())  # (batch_size, embed_dim)


torch.Size([1, 128])


KeyboardInterrupt: 

In [3]:
embeddings.shape

torch.Size([90, 90, 128])