In [1]:
import utils
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
from torch.nn.utils.rnn import pad_sequence
import pickle
import random
import numpy as np

In [None]:
NUM_CLASSES=23
BATCH_SIZE=256
EPOCHS=10
HIDDEN_SIZE = 768
VECTOR_SIZE = 768
TRAINING_SIZE =500000
NUM_LAYERS = 1
DROP_OUT=0
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Device is', device)

In [None]:
complete_data=utils.read_data("../data/fixed_PIZZA_train.json")
random.shuffle(complete_data)
print('Read Data Done')
data = complete_data[:TRAINING_SIZE]
corpus, top, decoupled = utils.get_train_dataset(data)
entites_output_as_number_labels,intents_output_as_number_labels, input_as_tokenized_string=utils.label_complete_input(corpus, decoupled, top)
print('Parse Train Data Done')

In [None]:
flat_list = [word for sublist in input_as_tokenized_string for word in sublist]
vocab = sorted(set(flat_list))
vocab = vocab
word_to_index = {word: index for index, word in enumerate(vocab)}
pickle.dump(word_to_index , open('word_to_index.pk1' , 'wb'))
print("word_to_index Done")

In [None]:
dev_data=utils.read_data("../data/fixed_PIZZA_dev.json")
dev_corpus, dev_top = utils.get_dev_dataset(dev_data)
ner_dev_labels,is_dev_labels,dev_as_tokenized_string=utils.label_complete_dev(dev_corpus, dev_top)
print('Parse Dev Data Done')

In [11]:
def data_generator(data, labels, batch_size, word_to_index):
    batch = []
    for i in range(len(data)):
        batch.append((data[i], labels[i]))
        
        if len(batch) == batch_size:
            sequences, labels_batch = zip(*batch)
            
            embeddings = []
            for seq in sequences:
                x=[]
                for token in seq:
                    x.append(word_to_index[token])
                x= np.array(x)
                embeddings.append(x)
            sequences=embeddings
            labels_batch = [torch.tensor(label, dtype=torch.long) for label in labels_batch]
            padded_labels = pad_sequence(labels_batch, batch_first=True, padding_value=-1)
            sequences = [torch.tensor(seq) for seq in sequences]
            padded_sequences = pad_sequence(sequences, batch_first=True)

            yield padded_sequences, padded_labels

            batch = []

class LargeWordRNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim,hidden_size, num_classes):
        super(LargeWordRNN, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.rnn = nn.LSTM(embedding_dim, hidden_size, batch_first=True, bidirectional=True, num_layers=NUM_LAYERS, dropout=DROP_OUT)
        self.fc = nn.Linear(hidden_size * 2, num_classes)

    def forward(self, x):
        x = self.embedding(x)
        lstm_out, _ = self.rnn(x)
        out = self.fc(lstm_out)
        return out

pickle_file_path1 = "../models/ner_model.pk1"
with open(pickle_file_path1, "rb") as file1:
    ner_model = pickle.load(file1)


pickle_file_path2 = "../models/ner_word_to_index.pk1"
with open(pickle_file_path2, "rb") as file2:
    ner_word_to_index = pickle.load(file2)

In [None]:
class TestLargeDataset(Dataset):
    def __init__(self, data):
        self.data = data 
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        return self.data[idx]

def test_collate_fn(batch):
    sequences = batch
    embeddings=[]
    for seq in sequences:
        x=[]
        for token in seq:
            if token not in ner_word_to_index:
                x.append(ner_word_to_index['i'])
            else:
                x.append(ner_word_to_index[token])
        embeddings.append(x)
    sequences=embeddings
    sequences = [torch.tensor(seq) for seq in sequences]
    padded_sequences = pad_sequence(sequences, batch_first=True)
    return padded_sequences
dev_dataset = TestLargeDataset(dev_as_tokenized_string)
dataloader = DataLoader(dev_dataset, batch_size=1, collate_fn=test_collate_fn, shuffle=False, num_workers=0)
ner_model_output=[]

ner_model.eval()
with torch.no_grad():
    for padded_sequences in dataloader:
        labels = []
        padded_sequences=padded_sequences.to(device)
        outputs = ner_model(padded_sequences)
        entity_to_num = {"I_NUMBER": 0, "I_SIZE": 1, "I_TOPPING": 2, "I_STYLE": 3, "I_DRINKTYPE": 4, "I_CONTAINERTYPE": 5, "I_VOLUME": 6, "I_QUANTITY": 7, "B_NUMBER": 8, "B_SIZE": 9, "B_TOPPING": 10, "B_STYLE": 11, "B_DRINKTYPE": 12, "B_CONTAINERTYPE": 13, "B_VOLUME": 14, "B_QUANTITY": 15, "I_NOT_TOPPING": 16, "B_NOT_TOPPING": 17,"I_NOT_STYLE": 18, "B_NOT_STYLE": 19, "B_NOT_QUANTITY": 20, "I_NOT_QUANTITY": 21, "NONE": 22}
        for i, out in enumerate(outputs[0]):
            num = torch.argmax(out).int().item()
            labels.append(num)
        ner_model_output.append(labels)

confusion_matrix, accuracy, exact_accuracy=utils.calc_accuracy(dev_corpus, ner_model_output, ner_dev_labels)
print("word accuracy:",accuracy*100,"EM",exact_accuracy*100)

In [None]:
is_model = LargeWordRNN(vocab_size=len(word_to_index),embedding_dim=VECTOR_SIZE, hidden_size=HIDDEN_SIZE, num_classes=NUM_CLASSES).to(device)
criterion = nn.CrossEntropyLoss(ignore_index=-1)
optimizer = torch.optim.Adam(is_model.parameters(), lr=0.001)
for epoch in range(EPOCHS):
    for padded_sequences, padded_labels in data_generator(input_as_tokenized_string, intents_output_as_number_labels, BATCH_SIZE, word_to_index):
        padded_sequences=padded_sequences.to(device)
        padded_labels=padded_labels.to(device)
        optimizer.zero_grad()
        outputs = is_model(padded_sequences)
        loss = criterion(outputs.view(-1, NUM_CLASSES), padded_labels.view(-1))
        loss.backward()
        optimizer.step()
    print(f"Epoch {epoch + 1}, Loss: {loss.item():.4f}")
pickle.dump(is_model , open('is_model.pk1' , 'wb'))
print('Finish Training, is_model is saved')

In [None]:
def test_collate_fn2(batch):
    sequences = batch
    embeddings=[]
    for seq in sequences:
        x=[]
        for token in seq:
            if token not in word_to_index:
                x.append(word_to_index['i'])
            else:
                x.append(word_to_index[token])
        embeddings.append(x)
    sequences=embeddings
    sequences = [torch.tensor(seq) for seq in sequences]
    padded_sequences = pad_sequence(sequences, batch_first=True)
    return padded_sequences
dev_dataset = TestLargeDataset(dev_as_tokenized_string)
dataloader2 = DataLoader(dev_dataset, batch_size=1, collate_fn=test_collate_fn2, shuffle=False, num_workers=0)

is_model_output=[]

is_model.eval()
with torch.no_grad():
    for padded_sequences in dataloader2:
        labels = []
        padded_sequences=padded_sequences.to(device)
        outputs = is_model(padded_sequences)
        intent_to_num = {"I_PIZZAORDER": 0, "I_DRINKORDER": 1, "I_COMPLEX_TOPPING": 2, "B_PIZZAORDER": 3, "B_DRINKORDER": 4, "B_COMPLEX_TOPPING": 5, "NONE": 6}
        for i, out in enumerate(outputs[0]):
            num = torch.argmax(out).int().item()
            labels.append(num)
        is_model_output.append(labels)
confusion_matrix, accuracy,exact_accuracy=utils.calc_accuracy(dev_corpus, is_model_output, is_dev_labels, 7)
print("word accuracy:",accuracy*100,"EM",exact_accuracy*100)
is_model_output = utils.intent_post_processing(dev_corpus,is_model_output)
confusion_matrix, accuracy,exact_accuracy=utils.calc_accuracy(dev_corpus, is_model_output, is_dev_labels, 7)
print("word accuracy:",accuracy*100,"EM",exact_accuracy*100)
is_model_output = utils.intent_post_processing_extra(dev_corpus,is_model_output)
confusion_matrix, accuracy,exact_accuracy=utils.calc_accuracy(dev_corpus, is_model_output, is_dev_labels, 7)
print("word accuracy:",accuracy*100,"EM",exact_accuracy*100)
is_model_output = utils.intent_post_processing2(is_model_output, ner_model_output)
confusion_matrix, accuracy,exact_accuracy=utils.calc_accuracy(dev_corpus, is_model_output, is_dev_labels, 7)
print("word accuracy:",accuracy*100,"EM",exact_accuracy*100)

In [None]:
def total_EM(ner_out, is_out, gold_ner, gold_is):
    """
    Calculates the exact match accuracy of the model.

    Args:
        ner_out: The predicted NER labels.
        is_out: The predicted IS labels.
        gold_ner: The true NER labels.
        gold_is: The true IS labels.

    Returns:
        The exact match accuracy of the model
    """
    total = 0
    correct = 0
    for i in range(len(ner_out)):
        if ner_out[i] == gold_ner[i] and is_out[i] == gold_is[i]:
            correct += 1
        total += 1
    return 1.0*correct/total

print("EM",total_EM(ner_model_output, is_model_output, ner_dev_labels, is_dev_labels)*100)