## Interactive, modular notebook

Used during development, doesn't contain all the information: more information on the Notion page.

One problem was reading the original given JSON files. The following function successfully reads them.

In [19]:
"""
Fixes a malformed JSON file containing multiple root objects and 
loads it into a Python list of dictionaries.
This function addresses the issue of JSON files that have multiple root objects 
by wrapping them in an array
and ensuring proper comma separation between objects. It then attempts to parse 
the fixed JSON string.
Args:
    file_path (str): The path to the JSON file to be fixed and loaded.
Returns:
    list: A list of dictionaries representing the fixed JSON objects if parsing is successful.
    None: If there is an error in parsing the JSON, None is returned 
    and an error message is printed.
"""

import json
import re

def fix_malformed_json(file_path):
    """Fixes JSON with multiple root objects and loads it into Python."""
    with open(file_path, 'r', encoding='utf-8') as file:
        json_string = file.read()

    # Step 1: Wrap JSON objects in an array
    fixed_json_string = f"[{json_string}]"

    # Step 2: Remove newlines between objects and add commas
    fixed_json_string = re.sub(r'}\s*{', r'},{', fixed_json_string)

    # Step 3: Parse the fixed JSON
    try:
        return json.loads(fixed_json_string)
    except json.JSONDecodeError as e:
        print(f"Error parsing JSON: {e}")
        return None

# Example usage
FILE_PATH = "data/economics.json" # "data/machine_learning_for_healthcare.json"
data = fix_malformed_json(FILE_PATH)

if data:
    print("Fixed JSON loaded successfully!")
    #print(json.dumps(data, indent=4))  # Pretty-print for inspection
else:
    print("Failed to fix and load JSON.")


Fixed JSON loaded successfully!


Setting up the model - using Google's free Gemini 1.5 Flash model.

In [1]:
"""
LLM API Script

This script configures and initializes a generative AI model using the Google Generative AI library.
"""

import os
import google.generativeai as genai

api_key = os.getenv("API_KEY")
genai.configure(api_key=api_key)
model = genai.GenerativeModel("gemini-1.5-flash")


  from .autonotebook import tqdm as notebook_tqdm


Looking at the data.

In [4]:
import random

# Get a random index
random_index = random.randint(1, len(data) - 1)

# Get the chunk at the random index
random_chunk = data[random_index]
text = random_chunk['content'].get('text', 'N/A')
summary = random_chunk['content'].get('summary', 'N/A')
main_topic = random_chunk['content'].get('main_topic', 'N/A')
subtopics = random_chunk['content'].get('subtopics', 'N/A')
print(summary)

The text discusses the mechanisms for monitoring and addressing macroeconomic imbalances among EU member states, particularly through the Specific Monitoring and the Excessive Imbalance Procedure (EIP). It also examines the European Stability Mechanism's role in providing financial assistance to euro area countries facing severe financial issues. The Portuguese case is presented to illustrate economic trends and imbalances leading up to the Economic and Monetary Union (EMU).


In [20]:
PROMPT = """
You are an AI assistant tasked with creating multiple-choice questions for learning.
You are given a chunk of text, a brief summary of that text, the main topic it belongs to, and a subtopics.

Generate a single multiple-choice question with the following properties:
- All the needed context must be in the question itself. Students don't see the text or summary.
- One True Answer: The correct answer must be directly supported by the information in the text and/or summary provided. Give an explanation for why this option is correct.
-Three Plausible Distractors: 
    - The incorrect answer options must be related to the main topic or subtopic.
    - They should be plausible, common misconceptions, or something a reader might easily confuse.
    - They should not simply be direct negations of the text, nor should they be generally true statements that might confuse the student. 
    - You also need to give explanations for why each distractor is incorrect, but relevant to the question.

Here's the information:

Text Chunk: "{text}"

Summary: "{summary}"

Main Topic: "{main_topic}"

Subtopic: "{subtopics}"

The questions should be clear and give all the information needed, not requiring the reader to refer back to the context.
The question MUST NOT containt the folowing or similar phrases:
"According to the text", "Based on the provided text", "According to the lecture slides", "as discussed", "as highlighted in the provided material",
"In this lecture", "according to the provided information", "key focus of the discussion"
Instead, directly ask the questions. Do NOT specify which lecture, text, or source the question is based on. 
They don't know which text you are referring to, nor the context outside of the question.

Your Output Should Be Formatted as Follows:

Question: [Insert the multiple choice question here]

A: [Answer Option A]

B: [Answer Option B]

C: [Answer Option C]

D: [Answer Option D]

Correct Answer: [The letter of the correct answer A, B, C, or D]

A: [Explanation for this option]

B: [Explanation for this option]

C: [Explanation for this option]

D: [Explanation for this option]
"""


Parse the response from the LLM.

In [21]:
"""
This script parses a multiple-choice question from a given text input.
It extracts the question, options, correct answer, and explanations,
and organizes them into a structured dictionary format.
"""

def parse_question(text):
    """
    Parses a multiple-choice question from a given text input.

    Args:
        text (str): The text containing the multiple-choice question.

    Returns:
        dict: A dictionary containing the question, options, correct answer,
                distractors, and explanations.
    """

    lines = [line.strip() for line in text.strip().split('\n') if line.strip()]

    if len(lines) < 10:
        raise ValueError("Input text does not have the expected number of lines.")
    question = lines[0].replace('Question: ', '').strip()
    options = {line[0]: line[3:].strip() for line in lines[1:5]}
    explanations = {line[0]: line[3:].strip() for line in lines[6:10]}
    correct_answer = lines[5].replace('Correct Answer: ', '').strip()
    if correct_answer not in options:
        raise ValueError("Correct answer key is not in the options.")
    # Create a list of distractors excluding the correct answer
    distractors = [options[key] for key in options if key != correct_answer]

    if len(distractors) < 3:
        raise ValueError("Not enough distractors available.")
    question_data = {
        'question': question,
        'correct_answer': options[correct_answer],
        'correct_explanation': explanations[correct_answer],
        'distractor1': distractors[0],
        'distractor1_explanation': explanations[[key for key in options 
                                                 if options[key] == distractors[0]][0]],
        'distractor2': distractors[1],
        'distractor2_explanation': explanations[[key for key in options 
                                                 if options[key] == distractors[1]][0]],
        'distractor3': distractors[2],
        'distractor3_explanation': explanations[[key for key in options 
                                                 if options[key] == distractors[2]][0]]
    }

    return question_data

# Example usage
TEXT = """Question: [Insert the multiple choice question here]

A: [Answer Option A]

B: [Answer Option B]

C: [Answer Option C]

D: [Answer Option D]

Correct Answer: D

A: [Explanation for Option A]

B: [Explanation for Option B]

C: [Explanation for Option C]

D: [Explanation for Option D]
"""

print(parse_question(TEXT))


{'question': '[Insert the multiple choice question here]', 'correct_answer': '[Answer Option D]', 'correct_explanation': '[Explanation for Option D]', 'distractor1': '[Answer Option A]', 'distractor1_explanation': '[Explanation for Option A]', 'distractor2': '[Answer Option B]', 'distractor2_explanation': '[Explanation for Option B]', 'distractor3': '[Answer Option C]', 'distractor3_explanation': '[Explanation for Option C]'}


In [None]:
response = model.generate_content(PROMPT.format(text=text, summary=summary, main_topic=main_topic, subtopics=", ".join(subtopics)))
"""
response2 = model.generate_content(
            PROMPT.format(text=text, summary=summary, main_topic=main_topic, subtopics=", ".join(subtopics)),
            generation_config=genai.types.GenerationConfig(
                candidate_count=1,
                stop_sequences=None,
                max_output_tokens=100,
                temperature=0,
            ),
        )
"""
print(response.text)

Question:  The European Union employs mechanisms to address macroeconomic imbalances in member states.  One such mechanism, the Excessive Imbalance Procedure (EIP), is designed to ensure compliance with which broader procedure, and what potential consequence might a member state face for repeated failure to comply?

A:  The Specific Monitoring procedure;  potential expulsion from the Eurozone.

B:  The Macroeconomic Imbalance Procedure; potential sanctions, including fines.

C:  The European Stability Mechanism; potential loss of access to financial assistance.

D:  The Treaty Establishing the European Stability Mechanism;  potential legal action from the European Commission.


Correct Answer: B

A: This answer incorrectly identifies the Specific Monitoring procedure as the broader procedure, conflating two separate EU mechanisms for addressing macroeconomic issues. While Specific Monitoring is related, the EIP is directly designed to ensure compliance with the Macroeconomic Imbalance 

Generate one question from chunk.

In [124]:
def generate_question(chunk):
    context = chunk['content']
    
    text = context.get('text', 'N/A')
    summary = context.get('summary', 'N/A')
    main_topic = context.get('main_topic', 'N/A')
    subtopics = context.get('subtopics', 'N/A')

    index = chunk['chunk_index']

    response = model.generate_content(PROMPT.format(text=text, summary=summary, main_topic=main_topic, subtopics=", ".join(subtopics)))
    response_text = response.text

    try:
        question_data = parse_question(response_text)
    except ValueError as e:
        question_data = {'error': str(e)}
    question_data['index'] = index

    return question_data

def save_to_json(data, file_name):
    with open(file_name, 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=4)

# Example usage
chunk = random_chunk
question_data = generate_question(chunk)
save_to_json(question_data, 'question_data.json')

In [126]:
import json

# Function to generate questions for all chunks
def generate_questions_for_all_chunks(data):
    all_questions = []
    for chunk in data:
        if chunk['type'] == 'chunk':
            question_data = generate_question(chunk)
            all_questions.append(question_data)
    return all_questions

# Generate questions for all chunks
all_questions = generate_questions_for_all_chunks(data)

# Save all questions to a JSON file
with open('all_questions.json', 'w', encoding='utf-8') as json_file:
    json.dump(all_questions, json_file, ensure_ascii=False, indent=4)

# Print all questions to a text file in the format that the model returns
with open('all_questions.txt', 'w', encoding='utf-8') as txt_file:
    for question in all_questions:
        if 'error' in question:
            txt_file.write(f"Error: {question['error']}\n\n")
            continue
        # Create a list of answers
        answers = [
            question['distractor1'],
            question['distractor2'],
            question['distractor3'],
            question['correct_answer']
        ]
        
        # Shuffle the answers
        random.shuffle(answers)
        
        # Write the question and answers to the file
        txt_file.write(f"Question: {question['question']}\n")
        txt_file.write(f"A: {answers[0]}\n")
        txt_file.write(f"B: {answers[1]}\n")
        txt_file.write(f"C: {answers[2]}\n")
        txt_file.write(f"D: {answers[3]}\n")
        
        # Find the new label for the correct answer
        correct_label = 'ABCD'[answers.index(question['correct_answer'])]
        txt_file.write(f"Correct Answer: {correct_label}\n\n")

# Print the path of the generated files
print("Questions saved to 'all_questions.json' and 'all_questions.txt'")

Questions saved to 'all_questions.json' and 'all_questions.txt'


Generate multiple questions.

In [164]:
CHECK_IF_SIMILAR = """
Analyze if the answer options is contextually similar to a option from the answer set. 
Contextual similarity means that, while the wording may differ, the underlying meaning, concept, or information conveyed is essentially the same.
If the answer set is empty, the answer isn't contextually similar to any other answer.

Input:

Answer Option: {answer_option}

Answer Set: {answer_set}

The output should be in the next format:

Contextually Similar: [Yes/No]
Explanation: [Explanation of why the answer is contextually similar or not]
""" 

def generate_unique_question(chunk, existing_answers,yes_count, yes):
    """
    Generates a unique question that is not contextually similar to existing answers.

    Args:
        chunk (dict): A dictionary containing information about the text chunk.
        existing_answers (set): A set of existing answers to compare for similarity.
        yes_count (int): The count of 'Yes' responses for contextual similarity.
        yes (bool): A flag indicating if the question is contextually similar.

    Returns:
        dict: A dictionary containing the generated question data.
        int: The updated count of 'Yes' responses for contextual similarity.
        bool: A flag indicating if the question is contextually similar.
    """
    while True:
        question_data = generate_question(chunk)
        # Error in the JSON dataset - skip this chunk
        if 'error' in question_data:
            yes_count = 3
            yes = True
            return question_data, yes_count, yes

        response_answer = model.generate_content(CHECK_IF_SIMILAR.format(
            answer_option=question_data['correct_answer'],
            answer_set=", ".join(existing_answers)))

        if response_answer.text.startswith("Contextually Similar: Yes"):
            yes = True
            yes_count += 1
        return question_data, yes_count, yes
        
def generate_multiple_questions(chunk, num_questions = 5, minimum = 3, max_yes = 2):
    """
    Generates multiple unique questions from a given data chunk.

    Args:
        chunk (dict): A dictionary containing information about the text chunk.
        num_questions (int): The number of questions to generate.
        minimum (int): The minimum number of questions to generate.
        max_yes (int): The maximum number of 'Yes' responses for contextual similarity.

    Returns:
        list: A list of dictionaries containing the generated question data.
    """
    questions = []
    existing_answers = set()
    yes_count = 0
    for i in range(num_questions):
        yes = False
        question_data, yes_count, yes = generate_unique_question(chunk,
                                                                 existing_answers,
                                                                 yes_count, yes)
        if i >= minimum and yes_count >= max_yes:
            return questions

        if not yes:
            questions.append(question_data)
            existing_answers.add(question_data.get('correct_answer'))
        else:
            continue
    return questions

"""
# Example usage
chunk = random_chunk
num_questions = 8
questions = generate_multiple_questions(chunk, num_questions)

print(questions)



with open('multiple_questions.txt', 'w', encoding='utf-8') as txt_file:
    for question in questions:
        if 'error' in question:
            txt_file.write(f"Error: {question['error']}\n\n")
            continue
        # Create a list of answers
        answers = [
            question['distractor1'],
            question['distractor2'],
            question['distractor3'],
            question['correct_answer']
        ]
        
        # Shuffle the answers
        random.shuffle(answers)
        
        # Write the question and answers to the file
        txt_file.write(f"Question: {question['question']}\n")
        txt_file.write(f"A: {answers[0]}\n")
        txt_file.write(f"B: {answers[1]}\n")
        txt_file.write(f"C: {answers[2]}\n")
        txt_file.write(f"D: {answers[3]}\n")
        
        # Find the new label for the correct answer
        correct_label = 'ABCD'[answers.index(question['correct_answer'])]
        txt_file.write(f"Correct Answer: {correct_label}\n\n")

print("Multiple questions saved to 'multiple_questions.txt'")
"""

'\n# Example usage\nchunk = random_chunk\nnum_questions = 8\nquestions = generate_multiple_questions(chunk, num_questions)\n\nprint(questions)\n\n\n\nwith open(\'multiple_questions.txt\', \'w\', encoding=\'utf-8\') as txt_file:\n    for question in questions:\n        if \'error\' in question:\n            txt_file.write(f"Error: {question[\'error\']}\n\n")\n            continue\n        # Create a list of answers\n        answers = [\n            question[\'distractor1\'],\n            question[\'distractor2\'],\n            question[\'distractor3\'],\n            question[\'correct_answer\']\n        ]\n        \n        # Shuffle the answers\n        random.shuffle(answers)\n        \n        # Write the question and answers to the file\n        txt_file.write(f"Question: {question[\'question\']}\n")\n        txt_file.write(f"A: {answers[0]}\n")\n        txt_file.write(f"B: {answers[1]}\n")\n        txt_file.write(f"C: {answers[2]}\n")\n        txt_file.write(f"D: {answers[3]}\n")\n

In [170]:
import json, time

# Function to generate questions for all chunks
def generate_questions_for_all_chunks(data):
    """
    Generates questions for all chunks in the provided data.

    Args:
        data (list): A list of dictionaries containing information about text chunks.

    Returns:
        list: A list of dictionaries containing the generated question data for each chunk.
    """
    all_questions = []
    for chunk in data:
        if chunk['type'] == 'chunk':
            question_data = generate_multiple_questions(chunk)
            # Sleep for 1 minute to avoid rate limiting - adjustment in the documentation
            time.sleep(60)
            all_questions.append(question_data)
    return all_questions

# Generate questions for all chunks
all_questions = generate_questions_for_all_chunks(data)

# Save all questions to a JSON file
with open('all_questions.json', 'w', encoding='utf-8') as json_file:
    json.dump(all_questions, json_file, ensure_ascii=False, indent=4)

print("Questions saved to 'all_questions.json'")

Yes count:  1
Yes count:  2
Yes count:  1
Yes count:  2
Yes count:  3
Yes count:  1
Yes count:  2
Yes count:  3
Yes count:  1
Yes count:  2
Yes count:  1
Yes count:  1
Yes count:  2
Yes count:  3
Yes count:  1
Yes count:  1
Yes count:  2
Yes count:  1
Yes count:  2
Yes count:  1
Yes count:  2
Yes count:  1
Yes count:  2
Yes count:  1
Yes count:  1
Yes count:  2
Questions saved to 'all_questions.json' and 'all_questions.txt'


In [22]:
import json
import random

def generate_questions_for_all_chunks(data):
    """
    Generates questions for all chunks in the provided data.

    Args:
        data (list): A list of dictionaries containing information about text chunks.

    Returns:
        list: A list of dictionaries containing the generated question data for each chunk.
    """
    all_questions = []
    for chunk in data:
        if chunk['type'] == 'chunk':
            question_data = generate_multiple_questions(chunk)
            # Sleep for 1 minute to avoid rate limiting - adjustment in the documentation
            time.sleep(60)
            all_questions.append(question_data)
    return all_questions

def save_questions_to_file(input_json, output_txt):
    """
    Saves questions from a JSON file to a text file in a specific format.

    Args:
        input_json (str): The path to the JSON file containing the questions.
        output_txt (str): The path to the text file to save the questions to.
    """

    with open(input_json, 'r', encoding='utf-8') as json_file:
        all_questions = json.load(json_file)

    with open(output_txt, 'w', encoding='utf-8') as txt_file:
        for _, questions in enumerate(all_questions):
            try:
                for i, question in enumerate(questions):
                    # Create a list of answers
                    answers = [
                        question['distractor1'],
                        question['distractor2'],
                        question['distractor3'],
                        question['correct_answer']
                        ]
                    # Shuffle the answers
                    random.shuffle(answers)
                    # Write the question and answers to the file
                    txt_file.write(f"Question n. {question['index']}.{i}: {question['question']}\n")
                    txt_file.write(f"A: {answers[0]}\n")
                    txt_file.write(f"B: {answers[1]}\n")
                    txt_file.write(f"C: {answers[2]}\n")
                    txt_file.write(f"D: {answers[3]}\n")
                    # Find the new label for the correct answer
                    correct_label = 'ABCD'[answers.index(question['correct_answer'])]
                    txt_file.write(f"Correct Answer: {correct_label}\n\n")
            except KeyError as e:
                print(f"KeyError: Missing key {e} in question {question}")
            except IndexError as e:
                print(f"IndexError: {e} in question {question}")
            except TypeError as e:
                print(f"TypeError: {e} in question {question}")
            except ValueError as e:
                print(f"ValueError: {e} in question {question}")

# Example usage
save_questions_to_file('all_questions_multiple.json', 'all_questions.txt')