<a href="https://colab.research.google.com/github/alexk2206/tds_capstone/blob/Domi-DEV/Productive.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Productive Notebook

In [1]:
!pip install evaluate
!pip install --upgrade sympy

Collecting evaluate
  Downloading evaluate-0.4.3-py3-none-any.whl.metadata (9.2 kB)
Collecting datasets>=2.0.0 (from evaluate)
  Downloading datasets-3.2.0-py3-none-any.whl.metadata (20 kB)
Collecting dill (from evaluate)
  Downloading dill-0.3.9-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from evaluate)
  Downloading xxhash-3.5.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess (from evaluate)
  Downloading multiprocess-0.70.17-py311-none-any.whl.metadata (7.2 kB)
Collecting dill (from evaluate)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting multiprocess (from evaluate)
  Downloading multiprocess-0.70.16-py311-none-any.whl.metadata (7.2 kB)
Collecting fsspec>=2021.05.0 (from fsspec[http]>=2021.05.0->evaluate)
  Downloading fsspec-2024.9.0-py3-none-any.whl.metadata (11 kB)
Downloading evaluate-0.4.3-py3-none-any.whl (84 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m84.0/84.0 kB[0m [

In [2]:
import json
import pandas as pd
from sklearn.model_selection import train_test_split
import torch
import numpy as np
import urllib
from itertools import chain, combinations
from transformers import AutoTokenizer, AutoModelForMultipleChoice, TrainingArguments
import torch
import requests
import evaluate
import numpy as np
from transformers import AutoTokenizer, AutoModelForMultipleChoice, TrainingArguments, Trainer

### Preprocess dataset

Here we split the QA-dataset into train and validation dataset.
Additionnaly, we prepare the dataset to later be useful for response-generation and fine-tuning of a model

❎ Please insert code: load dataset into variable q ❎

In [4]:
# Example dataset

url = "https://raw.githubusercontent.com/alexk2206/tds_capstone/refs/heads/main/datasets/sampled_qa_dataset_easy.json"
data = pd.read_json(url)
print(data)
# Convert to DataFrame for easy handling
df = pd.DataFrame(data)

# Map the intended answer to the index of the option
df['label'] = df.apply(lambda x: x['options'].index(x['intended_answer']), axis=1)
df['stratify_key'] = df['context_type'] + '_' + df['type']

# Stratified Train-Validation Split
train_df, val_df = train_test_split(
    df,
    train_size=0.8,
    stratify=df['stratify_key'],
    random_state=42
)

                                             question           type  \
0                         What type of company is it?  SINGLE_SELECT   
1   Would you like to receive marketing informatio...  SINGLE_SELECT   
2                   What is the size of your company?  SINGLE_SELECT   
3                                          Next steps  SINGLE_SELECT   
4                   What kind of follow up is planned   MULTI_SELECT   
..                                                ...            ...   
95                  What is the size of your company?  SINGLE_SELECT   
96                            Data processing consent  SINGLE_SELECT   
97                        What type of company is it?  SINGLE_SELECT   
98                           Who to copy in follow up   MULTI_SELECT   
99                            Data processing consent  SINGLE_SELECT   

                                              options  \
0   [Construction company, Craft enterprises, Scaf...   
1                    

ValueError: ['Construction company'] is not in list

### Generate model output

After the creation of the QA-dataset, it's time for generating model output for different Huggingface models.

In [None]:
def model_output(model, tokenizer, questions):
  '''
  model_output -> creates output for every question in the dataset and safes it in a list of dicts. One dic has keys 'answer', 'predicted_answer', 'type'
  parameters:
  - model: one hugging face model
  - tokenizer: hugging face tokenizer
  - questions: QA-dataset in json format
  '''
  answer_comparison = []
  for question in questions:
        context = question['context']
        question_text = question['question']
        options = question['options']
        answer = question['answer']
        question_type = question['type']

        if question_type == "MULTI_SELECT":
          answer, predicted_answer = multi_select_model_output(model, tokenizer, question)
        if question_type == "SINGLE_SELECT":
          answer, predicted_answer = single_select_model_output(model, tokenizer, question)
        if question_type == "TEXT":
          answer, predicted_answer = text_model_output(question)
        if question_type == "NUMBER":
          answer, predicted_answer = number_model_output(model, tokenizer, question)
        if question_type == "DATE":
          answer, predicted_answer = date_model_output(model, tokenizer, question)
        else:
          continue
        answer_comparison.append({'answer': answer, 'predicted_answer': options[predicted_option], 'type': question_type})
  return answer_comparison

def single_select_model_output(model, tokenizer, question):
    '''
    Handles a question, its context and its options for a single-select question and generates output
    parameters:
    - model: one hugging face model
    - tokenizer: hugging face tokenizer
    - question: one question of the QA-dataset as a dictionary
    output:
    - answer: the correct/intended answer as a list of a string
    - predicted_answer: the predicted answer as a list of a string
    '''
    answer = question['answer']
    options = question['options']

    # creating input ids by tokenizing the question
    input_ids = tokenize_function(question, tokenizer)
    input_ids = input_ids["input_ids"].reshape(1, len(options), -1)
    attention_mask = input_ids["attention_mask"].reshape(1, len(options), -1)

    # generating the output
    outputs = model(input_ids=input_ids, attention_mask=attention_mask)
    logits = outputs.logits  # Shape: [batch_size, num_choices]

    # Predict the option with the highest score
    predicted_option = torch.argmax(logits, dim=1).item()

    predicted_binary = [0] * len(options)
    predicted_binary[predicted_option] = 1

    intended_binary = [0] * len(options)
    intended_binary[option == answer for option in options] = 1

    return intended_binary, predicted_binary, options[predicted_option]

def multi_select_model_output(model, tokenizer, question):
    '''
    Handles a question, its context and its options for a multi-select question and generates output as a list of indices of the predicted answers. Ticks every option whose probability is at least 90% of the best option (softmax)
    parameters:
    - model: one hugging face model
    - tokenizer: hugging face tokenizer
    - question: one question of the QA-dataset as a dictionary
    output:
    - answer: the correct/intended answers as a list of strings
    - predicted_answer: the predicted answers as a list of strings
    '''
    answer = question['answer']
    options = question['options']

    # creating input ids by tokenizing the question
    input_ids = tokenize_function(question, tokenizer)
    input_ids = input_ids["input_ids"].reshape(1, len(options), -1)
    attention_mask = input_ids["attention_mask"].reshape(1, len(options), -1)

    # generating the output
    outputs = model(input_ids=input_ids, attention_mask=attention_mask)
    logits = outputs.logits  # Shape: [batch_size, num_choices]

    # Find all indices to have at least 90% of the max score
    max_score = logits.max().item()
    threshold = 0.9 * max_score
    high_score_options = (logits >= threshold).nonzero(as_tuple=True)[1]  # Get the indices of valid options

    # List the corresponding options
    high_score_answers = [options[idx] for idx in high_score_options.tolist()]
    intended_binary = [0] * len(options)
    intended_binary[option == answer for option in options] = 1

    predicted_binary = [0] * len(options)
    predicted_binary[high_score_options.tolist()] = 1

    return intended_binary, predicted_binary, options[predicted_binary]

def text_model_output(question):
    '''
    Handles an open text question and summarizes it
    parameter:
    - question: one question of the QA-dataset as a dictionary
    output:
    - answer: the full context of the question as a string
    - summary: the generated summary as a string
    '''
    answer = question['context']
    summarization_pipeline = pipeline("text-summarization")
    summary = summarization_pipeline(answer, max_length=100, min_length=30, do_sample=False)
    return answer, summary[0]['summary_text']

def number_model_output(model, tokenizer, question):
    '''
    Handles a question where the context should contain a phone number and generates an answer to that question
    '''
    answer = question['answer']

    input_ids = tokenize_function(question, tokenizer)
    output = model(**input_ids)
    predicted_number = output.logits.item()

    return answer, predicted_number

def date_model_output(model, tokenizer, question):
    '''
    Handles a question where the context should contain a date and generates an answer to that question
    '''
    answer = question['answer']

    input_ids = tokenize_function(question, tokenizer)
    output = model(**input_ids)
    predicted_date = output.logits.item()

    return answer, predicted_date

def accuracy(answer_comparison):
    '''
    Computes the total accuracy and accuracy for each question type for the passed list of dicts. One dict in the list is one question with keys 'answer', 'predicted_answer', 'type'
    parameters:
    - list of dicts with entries 1) predicted answer 2) answer 3) type of question
    '''
    correct_multi_select = 0
    correct_single_select = 0
    correct_text = 0
    correct_number = 0
    correct_date = 0
    correct_total = 0
    total = 0

    for entry in answer_comparison:
        question_type = entry['type']
        if entry['answer'] == entry['predicted_answer']:
            if question_type == 'MULTI_SELECT':
                correct_multi_select += 1
                total_multi_select += 1
            elif question_type == 'SINGLE_SELECT':
                correct_single_select += 1
                total_single_select += 1
            elif question_type == 'TEXT':
                correct_text += 1
                total_text += 1
            elif question_type == 'NUMBER':
                correct_number += 1
                total_number += 1
            elif question_type == 'DATE':
                correct_date += 1
                total_date += 1
            else:
              continue
            correct_total += 1
        total += 1
    accuracy_total = correct_total / total
    accuracy_multi_select = correct_multi_select / total_multi_select
    accuracy_single_select = correct_single_select / total_single_select
    accuracy_text = correct_text / total_text
    accuracy_number = correct_number / total_number
    accuracy_date = correct_date / total_date
    return accuracy_total, accuracy_multi_select, accuracy_single_select, accuracy_text, accuracy_number, accuracy_date
'''
print_out_model_quality: takes the computations of function accuracy() and prints them out
parameters:
- accuracy_total
- accuracy_multi_select
- accuracy_single_select
- accuracy_text
- accuracy_number
- accuracy_date
'''
def print_out_model_quality(accuracy_total, accuracy_multi_select, accuracy_single_select, accuracy_text, accuracy_number, accuracy_date):
    accuracy_total, accuracy_multi_select, accuracy_single_select, accuracy_text, accuracy_number, accuracy_date = accuracy(model, tokenizer, questions)
    print(f"""Accuracy values of model: {model.name_or_path}\n
    Total: {accuracy_total}\n
    Multi-select: {accuracy_multi_select}\n
    Single-select: {accuracy_single_select}\n
    Text: {accuracy_text}\n
    Number: {accuracy_number}\n
    Date: {accuracy_date}\n""")
    return accuracy_total, accuracy_multi_select, accuracy_single_select, accuracy_text, accuracy_number, accuracy_date




### Fine-tuning a model


In [None]:
def fine_tune_model(train_dataset, val_dataset, tokenizer, model):
    # Define training arguments
    training_args = TrainingArguments("trainer",
        evaluation_strategy="epoch",
        save_strategy="epoch",
        logging_dir="./logs",
        learning_rate=2e-5,
        num_train_epochs=3,
        weight_decay=0.01,
        logging_steps=10,
        load_best_model_at_end=True
    )
    data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

    tokenized_train_dataset = train_dataset.map(tokenize_function, batched=True)
    tokenized_val_dataset = val_dataset.map(tokenize_function, batched=True)

    # Define Trainer
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=val_dataset,
        data_collator=data_collator,
        tokenizer=tokenizer,
        compute_metrics=compute_metrics
    )

    # Train the model
    trainer.train()

def compute_metrics(eval_preds, pretrained_dataset_name):
    metric = evaluate.load("glue", pretrained_dataset_name)
    logits, labels = eval_preds
    predictions = np.argmax(logits, axis=-1)
    return metric.compute(predictions=predictions, references=labels)

def tokenize_function(example, tokenizer):
    '''
    Converts the string input, which is a question with its context and the given options for multi-/single-select questions, into IDs the model later can make sense of. Distinguishes between multi-/single-select and the other questions
    parameters:
    - expample: question of the QA-dataset with all its entries (question, context, options, type are urgently necessary)
    - tokenizer: tokenizer of the model
    output:
    - tokenized: tokenized input example
    '''
    if example["type"] == "SINGLE_SELECT" or example["type"] == "MULTIPLE_SELECT":
      tokenized = tokenizer(
          [example["context"]] * len(example["options"]),  # Repeat context for each option
          [example["question"] + " " + option for option in example["options"]],  # Pair with each option
          truncation=True,
          max_length=512,
          padding="max_length",  # Ensure uniform input length
          return_tensors="pt"
      )

      # Get the label (index of the intended answer)
      if example["type"] == "SINGLE_SELECT":
          # For single-select questions, just find the index of the correct option
          labels = example["options"].index(example["intended_answer"])
      elif example["type"] == "MULTIPLE_SELECT":
          # For multiple-select questions, create binary labels for each option
          labels = [1 if option in example["intended_answer"] else 0 for option in example["options"]]
      else:
          raise ValueError(f"Unknown question type: {example['type']}")
      tokenized["labels"] = labels  # Add labels to the tokenized output
    else:
      tokenized = tokenizer(
          example["context"],
          example["question"],
          truncation=True,
          max_length=512,
          padding="max_length",
          return_tensors="pt"
      )
      tokenized["labels"] = 0
    return tokenized