In [None]:
from uuid import uuid4
import json

In [None]:
input_files = []
output_path = "output/data-processed-shuffled0.jsonl"
shuffled_path = "output/data-processed-shuffled1.jsonl"
shuffled_path2 = "output/data-processed-shuffled2.jsonl"

In [None]:
import re

def better_question(question: str) -> str:
    # Cau 1 This is a question -> No match
    # Câu 1. This is a question -> Câu 1. 
    # cau 1.This is a question -> cau 1.
    # cAU 1. This is a question -> cAU 1. 
    # ...
    question_num_regex = r"^[Cc](?:a|á|à|ả|ã|ạ|â|ấ|ầ|ẩ|ẫ|ậ|ă|ắ|ằ|ẳ|ẵ|ặ)[Uu](?:|ù|ú|ủ|ũ|ụ|ư|ứ|ừ|ử|ữ|ự)\s*\D+\s*\d+\W*"
    return re.sub(question_num_regex, "", question.strip(), 1)


def better_options(options: list[str]) -> list[str]:
    # A. This is an answer -> A. 
    # A) This is an answer -> A) 
    # ...
    scan_regex = r"^\s*[A-Ga-g]{1}\W+.+$"
    delete_regex = r"^\s*[A-Ga-g]{1}\W+"
    
    # Must remove on all options
    check_list = [0] * (ord('G') - ord('A') + 1)
    results = []
    replaced = 0
    for option in options:
        choice_char = ord(option.strip()[0].upper()) - ord('A')
        if 0 <= choice_char and choice_char < len(check_list) and check_list[choice_char] < 2:
            check_list[choice_char] += 1
        if re.match(scan_regex, option):
            results.append(re.sub(delete_regex, "", option, 1))
            replaced += 1
        else:
            results.append(option)
    if replaced == len(options) and all(check < 2 for check in check_list):
        return results
    else:
        return options

In [None]:
from csv import DictReader

rows = []
last = 0
for file in input_files:
    with open(file) as csv_file:
        reader = DictReader(csv_file)
        rows.extend({**row, "file": file} for row in reader)
        print("Inserted:", len(rows) - last)
        last = len(rows)

# rows = rows[:300]
mapper = {
    "Hard": "Hard",
    "Easy": "Easy",
    "Moderate": "Medium",
    "Challenging": "Challenging",
    "Medium": "Medium",
    "Very Hard": "Challenging",
}
valid_difficulty = set(["Easy", "Medium", "Hard", "Challenging"])

f = open("errors.jsonl", 'w')

def transform(i, row):
    try:
        id = uuid4().hex
        options = [row[key] for key, value in row.items() if key.startswith("option") and value]
        cleaned_options = better_options(options)
        question = row["question"]
        cleaned_question = better_question(row["question"])
        if cleaned_question != question:
            print(f"{id}: {question} -> {cleaned_question}")
        if options != cleaned_options:
            print(f"{id}: {options} -> {cleaned_options}")

        res = {
            "id": id,
            "difficulty_level": mapper[row["difficultLevel"].strip()],
            "medical_topic": [item.strip() for item in row["medicalTopic"].split(",")],
            "question": cleaned_question,
            "options": cleaned_options,
            "option_map": [i for i in range(len(options))],
            "answer": row["correctOption"][-1],
            "answer_index": ord(row["correctOption"][-1]) - ord("A"),
        }
        assert len(res["question"])
        assert res["difficulty_level"] in valid_difficulty
        assert len(res["medical_topic"])
        assert res["answer_index"] >= 0
        assert res["answer_index"] < len(res["options"])
        return res
    except Exception as e:
        json.dump(row, f, ensure_ascii=False)
        f.write('\n')
        # print(i, row)
        # print(e)
        # raise e
        return None

rows = [
    transform(i, row)
    for i, row in enumerate(rows)
]

rows = list(filter(lambda row: row is not None, rows))
print("Final len:", len(rows))
f.close()

In [76]:
with open(output_path, 'w') as f:
    for row in rows:
        json.dump(row, f, ensure_ascii=False)
        f.write('\n')

In [None]:
import random

def shuffle_options(row):
    try:
        new_option_map: list[int] = row["option_map"].copy()
        random.shuffle(new_option_map)

        # Build new row with shuffled options and updated answer_index
        new_row = row.copy()
        new_row["options"] = [row["options"][i] for i in new_option_map]
        new_row["option_map"] = new_option_map
        new_row["answer_index"] = new_option_map.index(row["answer_index"])
        new_row["answer"] = chr(new_row["answer_index"] + ord('A'))
        return new_row
    except Exception as e:
        print(row)
        print(len(row["options"]))
        print(e)

In [78]:
shuffled_rows = [shuffle_options(row) for row in rows]
shuffled_rows = list(filter(lambda row: row is not None, shuffled_rows))
with open(shuffled_path, 'w') as f:
    for row in shuffled_rows:
        json.dump(row, f, ensure_ascii=False)
        f.write('\n')

shuffled_rows = [shuffle_options(row) for row in rows]
shuffled_rows = list(filter(lambda row: row is not None, shuffled_rows))
with open(shuffled_path2, 'w') as f:
    for row in shuffled_rows:
        json.dump(row, f, ensure_ascii=False)
        f.write('\n')