In [None]:
from pydantic import BaseModel, Field
from typing import List, Union, Optional
from glob import glob

files = (
    glob("QuestionsCameroon/LLama32/*.txt")
    + glob("QuestionsCameroon/Mistral/*/*.txt")
    + glob("QuestionsCameroon/MixLlama3.1/*.txt")
    + glob("QuestionsCameroon/Mistral12B/*/*.txt")
)
len(files)

In [None]:
model_outputs = list(map(lambda x: open(x).read(), files))

len(model_outputs)

In [None]:
from typing import Any
import random


class AnswerChoice(BaseModel):
    letter: str
    text: str

    def __eq__(self, value: "AnswerChoice") -> bool:
        return self.text == value.text


def convert_answer(answer: AnswerChoice):
    letter = "".join(i for i in answer.letter.upper() if i in "ABCDE")
    return letter, answer.text


def convert(answers: list[AnswerChoice]) -> dict[str, str]:
    res = {}
    for i in answers:
        a, b = convert_answer(i)
        if b is not None and a:
            res[a] = b
    return res


class Question(BaseModel):
    question_number: Union[int, str]
    question_text: str
    answer_choices: List[AnswerChoice]
    correct_answers: List[str]
    explanation: str

    def __eq__(self, value: "Question") -> bool:
        return (
            self.question_text == value.question_text
            and self.answer_choices == value.answer_choices
        )

    def model_post_init(self, __context: Any) -> None:
        answer_choices = convert(self.answer_choices)
        self.answer_choices = [
            AnswerChoice(letter=i, text=j) for i, j in answer_choices.items()
        ]
        for i, key in enumerate(self.correct_answers):
            if key in answer_choices:
                continue
            flag = False
            for k, w in answer_choices.items():
                if w.strip() == key.strip():
                    key = k
                    flag = True
                    continue
            if not flag:
                key = [i for i in key.upper() if i in "ABCDE" and i in answer_choices]
                if key:
                    self.correct_answers.extend(key)
                key = None  # print this issue ?
            self.correct_answers[i] = key
        self.correct_answers = list(set([i for i in self.correct_answers if i]))

        self.shuffle_choices()
        return super().model_post_init(__context)

    def shuffle_choices(self):
        # Store the original letters and their positions
        original_positions = {choice.letter: choice for choice in self.answer_choices}

        # Shuffle the answer choices randomly
        random.shuffle(self.answer_choices)

        # Update the letters to retain the original alphabetical order
        for i, letter in enumerate(sorted(original_positions.keys())):
            self.answer_choices[i].letter = letter

        self._update_correct_answers(original_positions)

    def _update_correct_answers(self, original_positions: dict[str, AnswerChoice]):
        # Find the new letter(s) corresponding to the original correct answers
        updated_answers = []
        for answer in self.correct_answers:
            # Look up the original answer text
            original_text = original_positions[answer].text
            # Find the new letter for this text in the shuffled answer choices
            for choice in self.answer_choices:
                if choice.text == original_text:
                    updated_answers.append(choice.letter)
                    break
        self.correct_answers = updated_answers


class QuestionBank(BaseModel):
    questions: List[Question] = Field(
        description="Collection of all questions in the question bank"
    )

In [None]:
import re
import json
import pydantic
import uuid



def remove_trailing_commas(json_str: str) -> str:
	# Regex to find and remove trailing commas in JSON
	json_str = re.sub(r',\s*(\]|\})', r'\1', json_str)
	return json_str


def extract_questions(text: str) -> QuestionBank:
	# Regex pattern to capture each individual question object entirely
	question_pattern = re.compile(
		r'\{\s*"question_number":\s*(\d+|".+?"),\s*'
		r'"question_text":\s*".+?",\s*'
		r'"answer_choices":\s*\[.*?\],\s*'
		r'"correct_answers":\s*\[.*?\],\s*'
		r'"explanation":\s*".+?"\s*\}',
		re.DOTALL
	)

	text_id = str(uuid.uuid4())
	
	# Find all matches for the complete question JSON object
	matches = question_pattern.finditer(text)
	
	# Parse each question JSON structure and convert to a Question dataclass
	questions = []
	for match in matches:
		question_json_str = match[0]
		try:
			question_data = json.loads(remove_trailing_commas(question_json_str))
			question = Question.model_validate(question_data)
			question.question_number = text_id + "_" + str(question.question_number)
			questions.append(question)
		except json.JSONDecodeError as e:
			pass
			# print("Failed to parse question:", question_json_str)
		except pydantic.ValidationError as e:
			pass
	
	return questions

In [None]:
sentences = {}
fails = {}
for raw, file in zip(model_outputs, files):
	try:
		sentences[file] = extract_questions(raw)
	except:
		fails[file] = raw
merged_sentences = sum(sentences.values(), start=[])
len(merged_sentences)

In [None]:
from datasets import Dataset
from typing import List


# Convert list of dataclass instances to a dataset
def convert_to_dataset(questions: List[Question]) -> Dataset:
    # Create a dictionary for the dataset
    dataset_dict = {
        "question_number": [question.question_number for question in questions],
        "question_text": [question.question_text for question in questions],
        "answer_choices": [convert(question.answer_choices) for question in questions],
        "correct_answers": [question.correct_answers for question in questions],
        "explanation": [question.explanation for question in questions]
    }

    # Create the dataset
    dataset = Dataset.from_dict(dataset_dict)
    return dataset

# Convert the questions list to a Dataset
questions_dataset = convert_to_dataset(merged_sentences)

# Optionally, print the dataset
print(questions_dataset)

In [None]:
questions_dataset[:5]

In [None]:
import pandas as pd
correct_answers = pd.Series([j for i in questions_dataset["correct_answers"] for j in i])

correct_answers.value_counts()

In [None]:
questions_dataset

In [None]:
questions_dataset.push_to_hub("alexneakameni/qa_africa")