In [2]:
import random
from collections import defaultdict
from sentence_transformers import SentenceTransformer, SentencesDataset
from sentence_transformers.losses import TripletLoss
from sentence_transformers.readers import LabelSentenceReader, InputExample
from tqdm import tqdm
import csv
from scipy import spatial
import numpy as np
from os import path

# from sentence_transformers.examples.training.other.training_batch_hard_trec import triplets_from_labeled_dataset
from torch.utils.data import DataLoader

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
def triplets_from_labeled_dataset(input_examples):
    # Copied from https://github.com/UKPLab/sentence-transformers/blob/master/examples/training/other/training_batch_hard_trec.py
    # Create triplets for a [(label, sentence), (label, sentence)...] dataset
    # by using each example as an anchor and selecting randomly a
    # positive instance with the same label and a negative instance with a different label
    triplets = []
    label2sentence = defaultdict(list)
    for inp_example in input_examples:
        label2sentence[inp_example.label].append(inp_example)

    for inp_example in input_examples:
        anchor = inp_example

        if len(label2sentence[inp_example.label]) < 2: #We need at least 2 examples per label to create a triplet
            continue

        positive = None
        while positive is None or positive.guid == anchor.guid:
            positive = random.choice(label2sentence[inp_example.label])

        negative = None
        while negative is None or negative.label == anchor.label:
            negative = random.choice(input_examples)

        triplets.append(InputExample(texts=[anchor.texts[0], positive.texts[0], negative.texts[0]]))

    return triplets

In [4]:
def train():
    # Load pre-trained model 
    sbert_model = SentenceTransformer('KBLab/sentence-bert-swedish-cased')

    # Set up data for fine-tuning 
    sentence_reader = LabelSentenceReader(folder = './tsv_files', separator='#')
    data_list = sentence_reader.get_examples(filename = 'scraping_results_clean_2.tsv')
    triplets = triplets_from_labeled_dataset(input_examples = data_list)
    finetune_data = SentencesDataset(examples = triplets, model = sbert_model)
    finetune_dataloader = DataLoader(finetune_data, shuffle=True, batch_size=10)

    # Initialize triplet loss
    loss = TripletLoss(model=sbert_model)

    # Fine-tune the model
    sbert_model.fit(train_objectives = [(finetune_dataloader, loss)], epochs = 4, output_path = 'fine_tuned_swedish_bert')

In [5]:
# Train the model with the scraped questions and answers
if not path.exists('fine_tuned_swedish_bert'):
    print("The model does not exist. Wait for training")
    train()

# Load the trained model
model = SentenceTransformer('KBLab/sentence-bert-swedish-cased')
ft_model = SentenceTransformer('fine_tuned_swedish_bert')

  return torch._C._cuda_getDeviceCount() > 0


In [6]:
def load_questions_and_answers_from_file(file_path):
    '''
    Loads questions and answers from file, also embedding questions
    :param file_path: relative file_path to file (String)
    :return question_embeddings: list of questions embedded by model
    :return question_texts: list of string representation of questions
    :return answer_mappings: list of string representation of answers 
    (question at index i of question_texts can be responded by answer at index i of answer_mappings)
    '''
    tsv_file = open(file_path)
    read_tsv = csv.reader(tsv_file, delimiter="#")

    questions_and_answers = {}

    for (key_str, text) in read_tsv:
        key = int(key_str)
        if key in questions_and_answers:
            questions_and_answers[key].append(text)
        else:
            questions_and_answers[key] = [text]

    question_embeddings = []
    answer_mappings = []
    

    for i in tqdm(range(len(questions_and_answers))):
        texts = questions_and_answers[i]
        
        answer = texts[-1]
        for question in texts[:-1]:
            question_embeddings.append(model.encode(question))
            answer_mappings.append(answer)
            

    return question_embeddings, answer_mappings

In [9]:
question_embeddings, answer_mappings = load_questions_and_answers_from_file(file_path="./tsv_files/scraping_results_clean_2.tsv")

100%|██████████| 1785/1785 [00:59<00:00, 30.10it/s]


In [10]:
def infer(question):
    '''
    Returns the most appropriate answer to the question
    :param question: Input question represented by string
    '''
    encoded_question_base = model.encode([question])
    encoded_question_ft = ft_model.encode([question])

    # We take 1 - cosine as spatial.distance.cdist calculates the distance, so the cosine similarity is 1 - distance
    similarity_base = 1 - spatial.distance.cdist(np.array(encoded_question_base), np.array(question_embeddings), 'cosine')[0]
    similarity_ft = 1 - spatial.distance.cdist(np.array(encoded_question_ft), np.array(question_embeddings), 'cosine')[0]
    
    results_base = zip(range(len(similarity_base)), similarity_base)
    results_ft = zip(range(len(similarity_ft)), similarity_ft)

    # Sort in reverse as we want to sort in descending order
    results_base = sorted(results_base, key=lambda x: x[1], reverse=True)
    results_ft = sorted(results_ft, key=lambda x: x[1], reverse=True)
        

    idx_base, similarity_base = results_base[0]
    idx_ft, similarity_ft = results_ft[0]

    # Check if the similarity is >0.5. If not, return string that informs the user that the question can't
    # be answered, else return the answers
    if similarity_base < 0.5:
        answer_base = "Jag kan tyvärr inte besvara din fråga. Om du tror att jag borde kunna besvara frågor inom detta ämne, testa att omformulera frågan."
    else:
        answer_base = answer_mappings[idx_base]

    if similarity_ft < 0.5:
        answer_ft = "Jag kan tyvärr inte besvara din fråga. Om du tror att jag borde kunna besvara frågor inom detta ämne, testa att omformulera frågan."
    else:
        answer_ft = answer_mappings[idx_ft]

    return [answer_base, answer_ft]

In [11]:
# Test for model
question = "Vad erbjuder Trafikverket när det gäller APV-utbildningar för externa?"

answers = infer(question)
answer_base = answers[0]
answer_ft = answers[1]
print("Base model: " + answer_base + "\n")
print("FT model: " + answer_ft)

Base model: Myndigheten Trafikverket har inga APV-utbildningar som externa kan anmäla sig på. Ur konkurrenssynpunkt så får vi inte heller rekommendera en utbildare framför en annan. Däremot har vi tagit fram två stycken broschyrer som kan vara nyttiga att ta del av.

FT model: Den kompetens inom "Arbete på väg" som Trafikverket kräver framgår i respektive kontrakt, med hänvisning till aktuellt kravdokument TDOK 2018:0371. Om du inte jobbar i ett kontrakt som Trafikverket upphandlat så omfattas du inte heller av Trafikverkets APV kompetenskrav. Då får du vända dig till din arbetsgivare/uppdragsgivare för att ta reda på vad som gäller i det just det arbete som du jobbar i och för de arbetsuppgifter som just du ska utföra  (till exempel med kommuner, elbolag, med flera).
