In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import nltk
from nltk.corpus import treebank

In [2]:
# nltk.download('treebank')
# nltk.download('universal_tagset')

# Load the data
sentences = treebank.tagged_sents(tagset='universal')
train_data = sentences[:3000]
test_data = sentences[3000:4000]

vocab = {word for sent in train_data for word, tag in sent}
vocab = {word: i+2 for i, word in enumerate(sorted(vocab))}
vocab['<pad>'] = 0  # Padding
vocab['<unk>'] = 1  # Unknown words

tagset = {tag for sent in train_data for word, tag in sent}
tagset = {tag: i for i, tag in enumerate(tagset)}

In [15]:
tagset

{'CONJ': 0,
 'PRT': 1,
 'ADP': 2,
 'X': 3,
 'ADJ': 4,
 'VERB': 5,
 'ADV': 6,
 'NOUN': 7,
 '.': 8,
 'PRON': 9,
 'DET': 10,
 'NUM': 11}

In [3]:
vocab['quick']

8711

In [4]:
class POSDataset(Dataset):
    def __init__(self, data, vocab, tagset):
        self.data = data
        self.vocab = vocab
        self.tagset = tagset

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        words, tags = zip(*self.data[idx])
        word_ids = [self.vocab.get(word, self.vocab['<unk>']) for word in words]
        tag_ids = [self.tagset[tag] for tag in tags]
        return torch.tensor(word_ids, dtype=torch.long), torch.tensor(tag_ids, dtype=torch.long)

def pad_collate(batch):
    (xx, yy) = zip(*batch)
    x_lens = [len(x) for x in xx]
    xx_pad = nn.utils.rnn.pad_sequence(xx, batch_first=True, padding_value=vocab['<pad>'])
    yy_pad = nn.utils.rnn.pad_sequence(yy, batch_first=True, padding_value=-1)  # -1 for ignore_index in loss calculation
    return xx_pad, yy_pad, torch.tensor(x_lens, dtype=torch.long)

train_loader = DataLoader(POSDataset(train_data, vocab, tagset), batch_size=32, collate_fn=pad_collate)
test_loader = DataLoader(POSDataset(test_data, vocab, tagset), batch_size=32, collate_fn=pad_collate)


In [5]:
class CustomLSTM(nn.Module):
    def __init__(self, input_dim, hidden_dim):
        super(CustomLSTM, self).__init__()
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim

        # Gates parameters
        self.W_i = nn.Parameter(torch.Tensor(input_dim, hidden_dim))
        self.U_i = nn.Parameter(torch.Tensor(hidden_dim, hidden_dim))
        self.b_i = nn.Parameter(torch.Tensor(hidden_dim))

        self.W_f = nn.Parameter(torch.Tensor(input_dim, hidden_dim))
        self.U_f = nn.Parameter(torch.Tensor(hidden_dim, hidden_dim))
        self.b_f = nn.Parameter(torch.Tensor(hidden_dim))

        self.W_c = nn.Parameter(torch.Tensor(input_dim, hidden_dim))
        self.U_c = nn.Parameter(torch.Tensor(hidden_dim, hidden_dim))
        self.b_c = nn.Parameter(torch.Tensor(hidden_dim))

        self.W_o = nn.Parameter(torch.Tensor(input_dim, hidden_dim))
        self.U_o = nn.Parameter(torch.Tensor(hidden_dim, hidden_dim))
        self.b_o = nn.Parameter(torch.Tensor(hidden_dim))

        self.init_weights()

    def init_weights(self):
        for p in self.parameters():
            if p.data.ndimension() >= 2:
                nn.init.xavier_uniform_(p.data)
            else:
                nn.init.zeros_(p.data)

    def forward(self, x, init_states=None):
        """
        x: Shape (batch, sequence, feature)
        init_states: (h, c) initial states
        """
        batch_size, seq_size, _ = x.size()
        hidden_seq = []

        if init_states is None:
            h_t, c_t = (torch.zeros(batch_size, self.hidden_dim).to(x.device), 
                        torch.zeros(batch_size, self.hidden_dim).to(x.device))
        else:
            h_t, c_t = init_states

        for t in range(seq_size):
            x_t = x[:, t, :]

            i_t = torch.sigmoid(x_t @ self.W_i + h_t @ self.U_i + self.b_i)
            f_t = torch.sigmoid(x_t @ self.W_f + h_t @ self.U_f + self.b_f)
            g_t = torch.tanh(x_t @ self.W_c + h_t @ self.U_c + self.b_c)
            o_t = torch.sigmoid(x_t @ self.W_o + h_t @ self.U_o + self.b_o)
            c_t = f_t * c_t + i_t * g_t
            h_t = o_t * torch.tanh(c_t)

            hidden_seq.append(h_t.unsqueeze(0))

        hidden_seq = torch.cat(hidden_seq, dim=0)
        hidden_seq = hidden_seq.transpose(0, 1).contiguous()

        return hidden_seq, (h_t, c_t)


In [6]:
class POSModel(nn.Module):
    def __init__(self, vocab_size, tagset_size, embedding_dim, hidden_dim):
        super(POSModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=vocab['<pad>'])
        self.lstm = CustomLSTM(embedding_dim, hidden_dim)
        self.fc = nn.Linear(hidden_dim, tagset_size)

    def forward(self, words, word_lengths):
        embeds = self.embedding(words)
        lstm_out, _ = self.lstm(embeds)
        lstm_out = lstm_out.contiguous().view(-1, lstm_out.shape[2])
        tag_space = self.fc(lstm_out)
        tag_scores = nn.functional.log_softmax(tag_space, dim=1)
        return tag_scores.view(words.size(0), words.size(1), -1)


In [7]:
# Hyperparameters
EMBEDDING_DIM = 100
HIDDEN_DIM = 128
model = POSModel(len(vocab), len(tagset), EMBEDDING_DIM, HIDDEN_DIM)
loss_function = nn.CrossEntropyLoss(ignore_index=-1)  # Ignore the padding in the loss calculation
optimizer = optim.Adam(model.parameters(), lr=0.01)


In [8]:
def train_model(model, train_loader, optimizer, loss_function, epochs=5):
    model.train()
    for epoch in range(epochs):
        total_loss = 0
        for words, tags, lengths in train_loader:
            model.zero_grad()
            tag_scores = model(words, lengths)
            loss = loss_function(tag_scores.view(-1, len(tagset)), tags.view(-1))
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f"Epoch {epoch+1}, Loss: {total_loss}")


In [9]:
def test_model(model, sentence, vocab, tagset):
    model.eval()
    with torch.no_grad():
        inputs = [vocab.get(word.lower(), vocab['<unk>']) for word in sentence.split()]
        inputs_tensor = torch.tensor(inputs).unsqueeze(0)
        tag_scores = model(inputs_tensor, torch.tensor([len(inputs)]))
        tag_ids = torch.argmax(tag_scores, dim=2)
        tags = [list(tagset.keys())[tag_id] for tag_id in tag_ids[0]]
        return list(zip(sentence.split(), tags))


In [10]:
train_model(model, train_loader, optimizer, loss_function, epochs=10)

Epoch 1, Loss: 65.80699542164803
Epoch 2, Loss: 19.28850381821394
Epoch 3, Loss: 7.312714505940676
Epoch 4, Loss: 3.2961814729496837
Epoch 5, Loss: 1.8418646750506014
Epoch 6, Loss: 1.1415140028111637
Epoch 7, Loss: 0.7769635362783447
Epoch 8, Loss: 0.5403456123895012
Epoch 9, Loss: 0.3901566627318971
Epoch 10, Loss: 0.3201424130820669


In [11]:
sentence = "The quick brown fox jumps over the lazy dog"
tagged_sentence = test_model(model, sentence, vocab, tagset)
print(tagged_sentence)

[('The', 'DET'), ('quick', 'ADJ'), ('brown', 'NOUN'), ('fox', 'NOUN'), ('jumps', 'NOUN'), ('over', 'ADP'), ('the', 'DET'), ('lazy', 'NOUN'), ('dog', 'NOUN')]


In [12]:
# Save the model
model_path = 'pos_model.pth'
torch.save(model.state_dict(), model_path)

In [10]:
def load_model(model_path, vocab_size, tagset_size, embedding_dim, hidden_dim):
    model = POSModel(vocab_size, tagset_size, embedding_dim, hidden_dim)
    model.load_state_dict(torch.load(model_path))
    model.eval()
    return model

In [11]:
# Load the model
model_loaded = load_model('pos_model.pth', len(vocab), len(tagset), EMBEDDING_DIM, HIDDEN_DIM)

# Test the model on a new sentence
sentence = "A very good man"
tagged_sentence = test_model(model_loaded, sentence, vocab, tagset)
print(tagged_sentence)


[('A', 'NOUN'), ('very', 'CONJ'), ('good', 'NUM'), ('man', 'ADJ')]
