<a href="https://colab.research.google.com/github/Savith-02/notebooks/blob/main/nn_intent_classifier.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [212]:
def read_dataset(file_path):
    data = []
    with open(file_path, 'r') as file:
        for line in file:
            intent, sentence = line.strip().split(',')
            data.append((intent.strip(), sentence.strip()))
    return data

file_path = "drive/MyDrive/Code/rawData/data_small.txt"
dataset = read_dataset(file_path)[1:]
print(dataset[:10])

[('Greet', 'Hi'), ('Greet', 'Hello'), ('Greet', 'Hey there'), ('Greet', 'Good morning'), ('Greet', 'Howdy'), ('Greet', 'Hi there'), ('Greet', 'Hey'), ('Greet', 'Good afternoon'), ('Greet', 'Hello there'), ('Greet', 'Hi how can I help you?')]


In [213]:
# split = round(len(dataset) * 60 / 100)
# training_data = dataset[:split]
# cv_data = dataset[split:]
from sklearn.model_selection import train_test_split

training_data, test_data = train_test_split(dataset, test_size=0.4, random_state=42)


In [214]:
intent_counts = {}
for intent, _ in dataset:
    if intent not in intent_counts:
        intent_counts[intent] = 0
    intent_counts[intent] += 1
print(intent_counts)
print(f"Number of examples in dataset: {len(dataset)}")
print(f"Number of examples in training dataset: {len(training_data)}")
print(f"Number of examples in cv dataset: {len(test_data)}")

{'Greet': 53, 'Farewell': 66, 'Inquiry': 50, 'Feedback': 50, 'Complaint': 38, 'Request': 50, 'Navigation': 58}
Number of examples in dataset: 365
Number of examples in training dataset: 219
Number of examples in cv dataset: 146


In [215]:
all_sentences = set(sentence for intent, sentence in dataset)
print(f"All unique sentences count: {len(all_sentences)}")

All unique sentences count: 355


In [216]:
# Check for duplicates
# from collections import Counter
# word_counts = Counter(sentence for intent, sentence in dataset)
# word_counts

In [217]:
from itertools import chain

all_words = set(chain(*[sentence.split(" ") for intent, sentence in dataset]))
print(f"All unique sentences count: {len(all_words)}")

All unique sentences count: 768


In [218]:
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
import torch.optim as optim

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [219]:
word_to_index = {char: i for i, char in enumerate(all_words)}
print(f"No of words: {len(all_words)}")
print(word_to_index.items())

No of words: 768
dict_items([('way', 0), ('share', 1), ('until', 2), ('color?', 3), ('doing', 4), ('assist', 5), ('solution', 6), ('excellent', 7), ('pm.', 8), ('by', 9), ('lot', 10), ('reliable', 11), ('seamless', 12), ('soon!', 13), ('Italian', 14), ('change', 15), ('navigate', 16), ('and', 17), ('many', 18), ('explanations.', 19), ('advanced', 20), ('easy!', 21), ('incorrect.', 22), ('understand', 23), ("mornin'", 24), ('bookmark', 25), ('one', 26), ('leaky', 27), ('up?', 28), ('make', 29), ('effort', 30), ('impairments.', 31), ('year?', 32), ('hiking', 33), ('hotel', 34), ('works?', 35), ('pricing', 36), ('fit', 37), ('issue?', 38), ('items', 39), ('help', 40), ('message', 41), ('easy', 42), ('tutorial', 43), ('Paris', 44), ('tight!', 45), ('here', 46), ('shall', 47), ('next', 48), ('awesome', 49), ('explain', 50), ('mailing', 51), ('Having', 52), ('certain', 53), ('Goodnight!', 54), ("farmers'", 55), ('summarize', 56), ('responsive', 57), ('new?', 58), ('query?', 59), ('all!', 60)

In [220]:
all_intents = set(intent for intent, _ in dataset[1:])
intent_to_index = {intent: i for i, intent in enumerate(all_intents)}

print(f"No of intents {len(all_intents)}")
print(intent_to_index.items())

No of intents 7
dict_items([('Feedback', 0), ('Greet', 1), ('Complaint', 2), ('Inquiry', 3), ('Navigation', 4), ('Request', 5), ('Farewell', 6)])


In [221]:
# Convert sentences to tensors
def sentence_to_tensor(sentence):
    # print(sentence)
    tensor = torch.zeros(len(all_words)).to(device)
    for i, word in enumerate(sentence.split(" ")):
        tensor[word_to_index[word]] = 1
        # print(word)
    # print(tensor)
    return tensor

In [222]:
# def tensor_to_sentence(tensor, all_words, word_to_index):
#     sentence = []
#     for i in range(len(tensor)):
#         if tensor[i] == 1:
#             word = list(word_to_index.keys())[list(word_to_index.values()).index(i)]
#             sentence.append(word)
#     return " ".join(sentence)


In [223]:
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
import torchvision.transforms as transforms

class CustomDataset(Dataset):
    def __init__(self, dataset, feature_transform=None, label_transform=None):
        self.data = dataset
        self.feature_transform = feature_transform
        self.label_transform = label_transform
    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sample = self.data[idx]
        return label_transform[sample[0]], feature_transform(sample[1])
            # sample[1] = self.transform(sample[1])


In [224]:
feature_transform = transforms.Compose([
    sentence_to_tensor,
])
label_transform = intent_to_index

In [225]:
# Create the datasets
train_dataset = CustomDataset(training_data[:192], feature_transform=feature_transform, label_transform=label_transform)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=2)

# test_dataset = CustomDataset(test_data, test_labels)

In [226]:
# Access individual samples from the dataset
# for i in range(len(train_dataset[5:10])):
#     sample = train_dataset[i]
    # print(sample)

In [227]:
for batch, (intent, sentence) in enumerate(train_loader):
    if batch == 15:
      break
    print(sentence.shape)
    print(intent.shape)
    print(intent)

    # data contains the input samples for the current batch
    # target contains the corresponding labels for the current batch

    # You can print the shapes of data and target to verify they are as expected
    # print(sentence)
    # print(len(sentence))
    print("-----------")

torch.Size([32, 768])
torch.Size([32])
tensor([1, 1, 6, 4, 2, 4, 1, 0, 4, 4, 1, 3, 4, 3, 4, 3, 6, 4, 4, 0, 0, 1, 0, 3,
        0, 6, 1, 1, 6, 3, 5, 1])
-----------
torch.Size([32, 768])
torch.Size([32])
tensor([1, 4, 5, 4, 2, 6, 2, 1, 4, 5, 1, 3, 4, 3, 3, 5, 5, 3, 0, 6, 5, 2, 3, 6,
        4, 5, 6, 5, 3, 3, 4, 4])
-----------
torch.Size([32, 768])
torch.Size([32])
tensor([5, 2, 6, 0, 5, 6, 5, 1, 3, 5, 0, 0, 2, 5, 3, 2, 4, 6, 3, 5, 6, 4, 1, 5,
        2, 0, 1, 6, 4, 1, 4, 2])
-----------
torch.Size([32, 768])
torch.Size([32])
tensor([4, 0, 3, 5, 4, 5, 3, 3, 6, 5, 6, 1, 2, 5, 0, 3, 6, 1, 0, 6, 6, 0, 2, 1,
        4, 2, 5, 0, 3, 3, 1, 0])
-----------
torch.Size([32, 768])
torch.Size([32])
tensor([1, 3, 6, 2, 5, 0, 2, 0, 1, 3, 1, 4, 0, 0, 0, 3, 5, 3, 0, 6, 0, 5, 5, 4,
        2, 6, 3, 6, 3, 2, 1, 1])
-----------
torch.Size([32, 768])
torch.Size([32])
tensor([3, 0, 5, 2, 6, 2, 6, 5, 0, 1, 4, 0, 5, 6, 0, 5, 2, 5, 6, 4, 2, 1, 0, 1,
        6, 6, 1, 6, 0, 6, 0, 4])
-----------


In [228]:
class MyModel(nn.Module):
    def __init__(self, word_count, intent_class_count, hidden_units):
        super(MyModel, self).__init__()
        self.fc1 = nn.Linear(word_count, hidden_units)
        self.fc2 = nn.Linear(hidden_units, intent_class_count)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x

In [229]:
model = MyModel(len(all_words), len(all_intents), 128)
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(params=model.parameters(), lr=0.001)


In [230]:
def train_step(model, dataloader, reg_rate, optimizer, loss_fn):

    model.train()
    train_loss, train_acc = 0, 0

    for batch, (intent, sentence) in enumerate(train_loader):

        # intent_tensor = intent_to_tensor(intent)
        train_pred_logits = model(sentence)
        loss = loss_fn(train_pred_logits, intent)

        model.zero_grad()
        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        loss = loss_fn(train_pred_logits, intent)

        train_pred_class = torch.argmax(torch.softmax(train_pred_logits, dim=1), dim=1)
        train_acc += (train_pred_class == intent).sum().item() / len(train_pred_class)

    # Adjust metrics to get average loss and accuracy per batch
    train_loss = train_loss / len(dataloader)
    train_acc = train_acc / len(dataloader)
    return train_loss, train_acc

In [232]:
def test_step(model, dataloader, reg_rate, loss_fn):

    model.train()
    train_loss, train_acc = 0, 0

    for batch, (intent, sentence) in enumerate(train_loader):

        # intent_tensor = intent_to_tensor(intent)
        train_pred_logits = model(sentence)
        loss = loss_fn(train_pred_logits, intent)

        model.zero_grad()
        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        loss = loss_fn(train_pred_logits, intent)

        train_pred_class = torch.argmax(torch.softmax(train_pred_logits, dim=1), dim=1)
        train_acc += (train_pred_class == intent).sum().item() / len(train_pred_class)

    # Adjust metrics to get average loss and accuracy per batch
    train_loss = train_loss / len(dataloader)
    train_acc = train_acc / len(dataloader)
    return train_loss, train_acc

In [233]:
def train(model, train_loader, test_loader, hidden_size, learning_rate, reg_rate, epochs, optimizer, loss_function):

    # Initialize the model
    # model = model(num_chars, hidden_size, num_intents)
    model.to(device)

    loss_function = loss_function()
    optimizer = optimizer(model.parameters(), lr=learning_rate)
    # scheduler = optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.9)

    # Training loop with loss plotting
    train_losses = []
    test_losses = []
    training_dataset_len = len(training_data)
    test_dataset_len = len(test_data)

    for batch, (intent, sentence) in enumerate(train_loader):

        model.train()
        total_loss = 0

        for intent, sentence in training_data:
            model.zero_grad()
            # intent_tensor = intent_to_tensor(intent)
            output = model(sentence)
            loss = loss_function(output, intent)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        # scheduler.step()
        # train_losses.append(total_loss / training_dataset_len)
        train_losses.append(total_loss)
        # if (epoch + 1) % 10 == 0:
        #     print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss}")

        model.eval()
        total_test_loss = 0

        with torch.inference_mode():
          for batch, (intent, sentence) in enumerate(test_loader):
              # intent = intent_to_tensor(intent)
              output = model(sentence)
              loss = loss_function(output, intent)
              total_test_loss += loss.item()

          test_losses.append(total_test_loss)

    return train_losses, test_losses, model