Training

In [None]:
!pip install transformers

In [None]:
from transformers import DistilBertTokenizerFast, DistilBertModel, DistilBertForQuestionAnswering

tokenizer = DistilBertTokenizerFast.from_pretrained('distilbert-base-cased-distilled-squad')
model = DistilBertForQuestionAnswering.from_pretrained('distilbert-base-cased-distilled-squad')

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import pandas as pd
dataset = pd.read_excel("/content/drive/MyDrive/CIVIC-2023/Copy of 534 Updated_Dataset.xlsx")
dataset.head()

In [None]:
contexts = dataset["context"].tolist()
questions = dataset["question"].tolist()
answers = dataset["answer_text"].tolist()
answers_start = dataset["answer_start"].tolist()
answers_end = dataset["answer_end"].tolist()

In [None]:
qa_answers = []
for a, s, e in zip(answers, answers_start, answers_end):
    temp = {
        "text":a,
        "answer_start":s,
        "answer_end":e
    }
    qa_answers.append(temp)

In [None]:
question_map = {
    "Q1":[],
    "Q2":[],
    "Q3":[],
    "Q6":[],
    "Q7":[],
    "Q8":[]
}

In [None]:
def add_questions(question_num, question_str):
  if question_str not in question_map[question_num]:
    question_map[question_num].append(question_str)

add_questions("Q1", "What is your name?")
add_questions("Q1", "Can I have your name please?")
add_questions("Q1", "What is your name please?")
add_questions("Q1", "Your name please?")
add_questions("Q1", "Tell me your name.")
add_questions("Q1", "Name?")

add_questions("Q2", "What is the address of your emergency?")
add_questions("Q2", "What is the address of the emergency?")
add_questions("Q2", "Can I have the emergency address please?")
add_questions("Q2", "Tell me the address.")
add_questions("Q2", "Where did the emergency happen?")

add_questions("Q3", "What is your phone number?")
add_questions("Q3", "What is the best phone number for you?")
add_questions("Q3", "What is your contact number?")

add_questions("Q6", "What is the suspect description?")
add_questions("Q6", "What does the suspect look like?")
add_questions("Q6", "What is the suspect wearing?")
add_questions("Q6", "Can you give more details about the suspect?")
add_questions("Q6", "How would you describe the suspect?")

add_questions("Q7", "What is the vehicle description?")
add_questions("Q7", "What does the car look like?")
add_questions("Q7", "Can you offer more details about the vehicle?")
add_questions("Q7", "How would you describe the vehicle?")

add_questions("Q8", "What is the property description?")
add_questions("Q8", "How would you describe the property?")
add_questions("Q8", "What is the property?")

question_map

In [None]:
import random

qa_questions = []
qa_contexts = contexts
for qt in questions:
  if qt in question_map.keys():
    qa_questions.append(random.choice(question_map[qt]))
  else:
    qa_questions.append(qt)

In [None]:
train_encodings = tokenizer(qa_contexts, qa_questions, truncation=True, padding=True)

In [None]:
def add_token_positions(encodings, answers):
  start_positions = []
  end_positions = []
  for i in range(len(answers)):
    start_positions.append(encodings.char_to_token(i, int(answers[i]['answer_start'])))
    end_positions.append(encodings.char_to_token(i, int(answers[i]['answer_end'] - 1)))

    # if start position is None, the answer passage has been truncated
    if start_positions[-1] is None:
      start_positions[-1] = tokenizer.model_max_length
    if end_positions[-1] is None:
      end_positions[-1] = tokenizer.model_max_length

  encodings.update({'start_positions': start_positions, 'end_positions': end_positions})

add_token_positions(train_encodings, qa_answers)

In [None]:
import torch
from torch.utils.data import DataLoader
import requests
import json
import os
from tqdm import tqdm

In [None]:
class SQuAD_Dataset(torch.utils.data.Dataset):
  def __init__(self, encodings):
    self.encodings = encodings
  def __getitem__(self, idx):
    return {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
  def __len__(self):
    return len(self.encodings.input_ids)

In [None]:
train_dataset = SQuAD_Dataset(train_encodings)
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)

In [None]:
# Check on the available device - use GPU
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
print(f'Working on {device}')

In [None]:
from transformers import AdamW

N_EPOCHS = 3
optim = AdamW(model.parameters(), lr=5e-5)

model.to(device)
model.train()

for epoch in range(N_EPOCHS):
  loop = tqdm(train_loader, leave=True)
  for batch in loop:
    optim.zero_grad()
    input_ids = batch['input_ids'].to(device)
    attention_mask = batch['attention_mask'].to(device)
    start_positions = batch['start_positions'].to(device)
    end_positions = batch['end_positions'].to(device)
    outputs = model(input_ids, attention_mask=attention_mask, start_positions=start_positions, end_positions=end_positions)
    loss = outputs[0]
    loss.backward()
    optim.step()

    loop.set_description(f'Epoch {epoch+1}')
    loop.set_postfix(loss=loss.item())

torch.save(model, "model.pth")

Confidence Score

In [None]:
!pip install git+https://github.com/LIAAD/yake

In [None]:
!pip install -U sentence-transformers

In [None]:
def get_prediction(context, question):
    inputs = tokenizer.encode_plus(question, context, return_tensors='pt').to(device)
    outputs = model(**inputs)

    answer_start = torch.argmax(outputs[0])
    answer_end = torch.argmax(outputs[1]) + 1

    answer = tokenizer.convert_tokens_to_string(
        tokenizer.convert_ids_to_tokens(inputs['input_ids'][0][answer_start:answer_end].cpu()))

    return answer

In [None]:
import yake
from collections import Counter
from sentence_transformers import SentenceTransformer, util

def obtain_conf_scores(inputs, num_runs=5):
    alpha = 0.2
    kw_extractor = yake.KeywordExtractor()
    st_model = SentenceTransformer('all-MiniLM-L6-v2')
    embeddings = []
    keywords_list = []

    for _ in range(num_runs):
        with torch.no_grad():
            question = inputs[0]
            context = inputs[1]
            prediction = get_prediction(context, question)

            # Extract 5 keywords
            predicted_keywords = [kw[0] for kw in kw_extractor.extract_keywords(prediction)[:5]]
            keywords_list.append(predicted_keywords)

            # Compute sentence embeddings
            predicted_embedding = st_model.encode([prediction], convert_to_tensor=True)
            embeddings.append(predicted_embedding.cpu().numpy())

    # Compute average semantic similarity
    avg_semantic_similarity = np.mean(cosine_similarity(np.array(embeddings).squeeze()))

    # Compute keyword consistency
    common_keywords_ratios = []
    for i in range(1, num_runs):
        if keywords_list[i] and keywords_list[0]:  # if both are not empty
            common_keywords_ratio = len(set(keywords_list[i]) & set(keywords_list[0])) / max(
                len(set(keywords_list[0])), 1)
        else:  # if either or both are empty
            common_keywords_ratio = int(keywords_list[i] == keywords_list[0])  # 1 if both are empty, 0 otherwise

        common_keywords_ratios.append(common_keywords_ratio)

    avg_common_keywords_ratio = sum(common_keywords_ratios) / num_runs

    final_conf_score = alpha * avg_common_keywords_ratio + (1 - alpha) * avg_semantic_similarity

    return final_conf_score