In [None]:
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models
from sklearn.model_selection import train_test_split
from collections import defaultdict



In [None]:

from google.colab import files
uploaded = files.upload()

Saving NER dataset.csv to NER dataset.csv


In [None]:
df = pd.read_csv("NER dataset.csv", encoding='latin1')
df = df.fillna(method='ffill')

sentences = []
labels = []

temp_sentence = []
temp_label = []

for i, row in df.iterrows():
    temp_sentence.append(row['Word'])
    temp_label.append(row['Tag'])

    if i + 1 == len(df) or df.iloc[i]['Sentence #'] != df.iloc[i+1]['Sentence #']:
        sentences.append(temp_sentence)
        labels.append(temp_label)
        temp_sentence = []
        temp_label = []

  df = df.fillna(method='ffill')


In [None]:
# Word and tag vocab
word_vocab = {'<PAD>': 0, '<UNK>': 1}
tag_vocab = {'<PAD>': 0}
for s in sentences:
    for w in s:
        if w not in word_vocab:
            word_vocab[w] = len(word_vocab)
for l in labels:
    for t in l:
        if t not in tag_vocab:
            tag_vocab[t] = len(tag_vocab)

idx2tag = {v: k for k, v in tag_vocab.items()}


In [None]:
MAX_LEN = 50

def encode_sentences(sentences, labels, word_vocab, tag_vocab, max_len=50):
    X, y = [], []

    for sent, tags in zip(sentences, labels):
        word_ids = [word_vocab.get(w, word_vocab['<UNK>']) for w in sent]
        tag_ids = [tag_vocab[t] for t in tags]

        # Padding
        word_ids = word_ids[:max_len] + [word_vocab['<PAD>']] * (max_len - len(word_ids))
        tag_ids = tag_ids[:max_len] + [tag_vocab['<PAD>']] * (max_len - len(tag_ids))

        X.append(word_ids)
        y.append(tag_ids)

    return np.array(X), np.array(y)

X, y = encode_sentences(sentences, labels, word_vocab, tag_vocab, MAX_LEN)
y = tf.keras.utils.to_categorical(y, num_classes=len(tag_vocab))


In [None]:
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.1, random_state=42)


In [None]:
class PositionalEncoding(layers.Layer):
    def __init__(self, max_len, d_model):
        super().__init__()
        pos = np.arange(max_len)[:, np.newaxis]
        i = np.arange(d_model)[np.newaxis, :]
        angle_rates = 1 / np.power(10000, (2 * (i//2)) / np.float32(d_model))
        angle_rads = pos * angle_rates

        pos_encoding = np.zeros((max_len, d_model))
        pos_encoding[:, 0::2] = np.sin(angle_rads[:, 0::2])
        pos_encoding[:, 1::2] = np.cos(angle_rads[:, 1::2])

        self.pos_encoding = tf.cast(pos_encoding[np.newaxis, ...], dtype=tf.float32)

    def call(self, x):
        return x + self.pos_encoding[:, :tf.shape(x)[1], :]


In [None]:
class TransformerBlock(layers.Layer):
    def __init__(self, d_model, num_heads, ff_dim, rate=0.1):
        super().__init__()
        self.att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=d_model)
        self.ffn = models.Sequential([
            layers.Dense(ff_dim, activation='relu'),
            layers.Dense(d_model)
        ])
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)

    def call(self, x, training, mask=None):
        attn_output = self.att(x, x, attention_mask=mask)
        out1 = self.layernorm1(x + self.dropout1(attn_output, training=training))
        ffn_output = self.ffn(out1)
        return self.layernorm2(out1 + self.dropout2(ffn_output, training=training))


In [None]:
import torch.nn as nn
import torch.nn.functional as F
import math

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=50):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        return x + self.pe[:x.size(0), :]

class TransformerModel(nn.Module):
    def __init__(self, vocab_size, tag_size, d_model, nhead, num_layers, dim_feedforward, max_len=50, dropout=0.1):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoder = PositionalEncoding(d_model, max_len)
        encoder_layer = nn.TransformerEncoderLayer(d_model, nhead, dim_feedforward, dropout)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers)
        self.fc = nn.Linear(d_model, tag_size)

    def forward(self, src):
        src = self.embedding(src) * math.sqrt(self.embedding.embedding_dim)
        src = self.pos_encoder(src)
        output = self.transformer_encoder(src)
        output = self.fc(output)
        return output

# Model parameters (example values, you might need to tune these)
vocab_size = len(word2idx)
tag_size = len(tag2idx)
d_model = 128
nhead = 4
num_layers = 2
dim_feedforward = 256

model = TransformerModel(vocab_size, tag_size, d_model, nhead, num_layers, dim_feedforward, max_len=50)

print(model)

TransformerModel(
  (embedding): Embedding(35180, 128)
  (pos_encoder): PositionalEncoding()
  (transformer_encoder): TransformerEncoder(
    (layers): ModuleList(
      (0-1): 2 x TransformerEncoderLayer(
        (self_attn): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=128, out_features=128, bias=True)
        )
        (linear1): Linear(in_features=128, out_features=256, bias=True)
        (dropout): Dropout(p=0.1, inplace=False)
        (linear2): Linear(in_features=256, out_features=128, bias=True)
        (norm1): LayerNorm((128,), eps=1e-05, elementwise_affine=True)
        (norm2): LayerNorm((128,), eps=1e-05, elementwise_affine=True)
        (dropout1): Dropout(p=0.1, inplace=False)
        (dropout2): Dropout(p=0.1, inplace=False)
      )
    )
  )
  (fc): Linear(in_features=128, out_features=18, bias=True)
)




In [None]:
import torch.optim as optim

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss(ignore_index=tag2idx.get('<PAD>', 0)) # Ignore padding in loss
optimizer = optim.Adam(model.parameters(), lr=0.001) # You can adjust the learning rate

# Training loop
num_epochs = 5 # You can adjust this
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()

        outputs = model(inputs)
        # Reshape outputs and labels for CrossEntropyLoss
        outputs = outputs.view(-1, outputs.shape[-1])
        labels = labels.view(-1)

        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(train_loader)}")

    # Validation
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            outputs = outputs.view(-1, outputs.shape[-1])
            labels = labels.view(-1)
            loss = criterion(outputs, labels)
            val_loss += loss.item()

    print(f"Validation Loss: {val_loss/len(val_loader)}")

In [None]:
def predict_tags(model, sentence):
    input_ids = [word_vocab.get(w, word_vocab["<UNK>"]) for w in sentence]
    input_ids = input_ids[:MAX_LEN] + [word_vocab["<PAD>"]] * (MAX_LEN - len(input_ids))
    input_array = np.array([input_ids])

    preds = model.predict(input_array)[0]
    pred_ids = np.argmax(preds, axis=-1)[:len(sentence)]
    return [idx2tag[i] for i in pred_ids]

test_sent = ["John", "lives", "in", "New", "York"]
print("Sentence:", test_sent)
print("Predicted Tags:", predict_tags(model, test_sent))
