In [None]:
BLANK_STUDENT_HISTORY_FILE = "/kaggle/input/qiskit-associate-developer-cert-practice/student_history.pkl"
STUDENT_HISTORY_SAVE_FOLDER = "/kaggle/working/"
TASK_BY_SECTION_DICT_FILE = "/kaggle/input/qiskit-associate-developer-cert-practice/task_by_section_dict.pkl"
PERCENT_BY_TASK_DICT_FILE = "/kaggle/input/qiskit-associate-developer-cert-practice/percentage_by_task_dict.pkl"
QUESTION_BANK_DF_FILE = "/kaggle/input/qiskit-associate-developer-cert-practice/question_df.pkl"


In [None]:
from datetime import datetime
import pickle
from datetime import datetime
import pickle
def save_student_history(student_history):
    root = STUDENT_HISTORY_SAVE_FOLDER
    # Make the filename FS-friendly
    ts = datetime.now().strftime("%Y%m%d_%H%M%S")
    save_path = root + f"student_history_{ts}.pkl"
    # ✅ use 'wb' (write-binary), not 'rb'
    with open(save_path, 'wb') as f:
        pickle.dump(student_history, f, protocol=pickle.HIGHEST_PROTOCOL)
    print("Student History saved to:", save_path)

def update_scores(student_history):
    """Vectorized 'score' = (Number_correct + 1) / (Number_attempts*(1+eps) + eps)."""
    import numpy as np
    epsilon = 1e-3
    attempt_penalty = 1 + epsilon
    num = student_history["Number_correct"].astype(float) + 1.0
    den = student_history["Number_attempts"].astype(float) * attempt_penalty + epsilon
    student_history["score"] = num / den  # ✅ assign back to the DF (no .iterrows mutation bug)

def load_student_history(file_path):
    """Returns (student_history, resolved: bool). If 'None', create a new empty structure."""
    import pandas as pd
    if file_path in ["None", "none", "NONE"]:
        print("Creating new student history (empty).")
        # Caller typically initializes from question_df; return placeholder here.
        file_path = BLANK_STUDENT_HISTORY_FILE
    try:
        with open(file_path, "rb") as f:
            student_history = pickle.load(f)
        print("Loaded Student History")
        # Ensure required columns exist
        
        return student_history, True
    except Exception as e:
        print(f"Unable to load student history: {e}")
        return None, False
    for c in ["Number_attempts","Number_correct"]:
        if c not in student_history.columns:
            student_history[c] = 0
        if "score" not in student_history.columns:
            update_scores(student_history)
    return student_history, True
import pickle
import sys
from pathlib import Path
import pandas as pd

# ---------------------------
# Safe loaders and utilities
# ---------------------------

def safe_load_pickle(path_str, required=True, expected_type=None, var_name="object"):
    p = Path(path_str)
    if not p.exists():
        msg = f"[ERROR] Missing file: {p}"
        if required:
            raise FileNotFoundError(msg)
        else:
            print(msg)
            return None
    if p.stat().st_size == 0:
        msg = f"[ERROR] Empty file: {p}"
        if required:
            raise ValueError(msg)
        else:
            print(msg)
            return None
    with open(p, "rb") as f:
        obj = pickle.load(f)
    if expected_type is not None and not isinstance(obj, expected_type):
        msg = f"[ERROR] {var_name} should be {expected_type}, got {type(obj)} from {p}"
        if required:
            raise TypeError(msg)
        else:
            print(msg)
            return None
    return obj

def prompt_choice(prompt, valid_choices, to_lower=True):
    """Prompt until user enters a valid string choice from valid_choices."""
    valid = {str(v).lower() if to_lower else str(v): v for v in valid_choices}
    while True:
        val = input(prompt).strip()
        key = val.lower() if to_lower else val
        if key in valid:
            return valid[key]
        print(f"Invalid selection. Valid options: {sorted(valid_choices)}")

def prompt_int(prompt, min_val=None, max_val=None):
    """Prompt until user enters an int within optional [min_val, max_val]."""
    while True:
        val = input(prompt).strip()
        try:
            n = int(val)
        except ValueError:
            print("Please enter a whole number.")
            continue
        if min_val is not None and n < min_val:
            print(f"Enter a number >= {min_val}.")
            continue
        if max_val is not None and n > max_val:
            print(f"Enter a number <= {max_val}.")
            continue
        return n

def prompt_yes_no(prompt):
    """Return True/False from 1/0 (or y/n)."""
    while True:
        val = input(prompt + " ").strip().lower()
        if val in {"1", "y", "yes"}:
            return True
        if val in {"0", "n", "no"}:
            return False
        print("Please enter 1/0 or y/n.")

# ---------------------------
# Data loads (protected)
# ---------------------------

try:
    file_path = TASK_BY_SECTION_DICT_FILE
    task_by_section_dict = safe_load_pickle(file_path,
                                            required=True, expected_type=dict,
                                            var_name="task_by_section_dict")
    file_path = PERCENT_BY_TASK_DICT_FILE
    percentage_by_task_dict = safe_load_pickle(file_path,
                                               required=True, expected_type=dict,
                                               var_name="percentage_by_task_dict")
    file_path = QUESTION_BANK_DF_FILE
    question_df = safe_load_pickle(file_path,required=True, expected_type=pd.DataFrame,var_name="question_df")
    # AFTER (minimal fix)
    question_df = question_df[question_df["Question"].astype(str).str.strip().str.lower() != 'question']
    question_df = question_df.sample(frac=1).reset_index(drop=True)

        
except Exception as e:
    print(e)
    sys.exit(1)

# Basic sanity checks
required_cols = {"Question", "Section", "Task"}
missing = required_cols - set(question_df.columns)
if missing:
    print(f"[ERROR] question_df missing required columns: {missing}")
    sys.exit(1)

if len(question_df) == 0:
    print("[ERROR] question_df is empty; cannot proceed.")
    sys.exit(1)

# ---------------------------
# Stats placeholder
# ---------------------------

def print_stats(student_history: pd.DataFrame):
    import pandas as pd

    if not isinstance(student_history, pd.DataFrame):
        print("[ERROR] student_history is not a DataFrame.")
        return

    needed = {"Question", "Section", "Task", "Number_attempts", "Number_correct"}
    miss = needed - set(student_history.columns)
    if miss:
        print(f"[ERROR] student_history missing columns: {miss}")
        return

    # Overall metrics
    total_questions = len(student_history)
    ever_attempted = int((student_history["Number_attempts"] > 0).sum())
    total_attempts = int(student_history["Number_attempts"].sum())
    total_correct  = int(student_history["Number_correct"].sum())
    overall_acc = (100.0 * total_correct / total_attempts) if total_attempts > 0 else 0.0

    print("=== Student History — Overall ===")
    print(f"Questions ever attempted : {ever_attempted} / {total_questions}")
    print(f"Total attempts           : {total_attempts}")
    print(f"Total correct            : {total_correct}")
    print(f"Overall accuracy         : {overall_acc:.2f}%")

    # By-section aggregation
    grp = student_history.groupby("Section", dropna=False, as_index=False).agg(
        total_questions=("Question", "count"),
        ever_attempted=("Number_attempts", lambda s: int((s > 0).sum())),
        attempts=("Number_attempts", "sum"),
        correct=("Number_correct", "sum"),
    )
    # Section accuracy = sum(correct)/sum(attempts) for that section
    grp["accuracy_pct"] = grp.apply(
        lambda r: (100.0 * r["correct"] / r["attempts"]) if r["attempts"] > 0 else 0.0,
        axis=1
    )

    if not grp.empty:
        print("\n=== By Section ===")
        # Pretty print a compact table
        cols = ["Section", "ever_attempted", "total_questions", "attempts", "correct", "accuracy_pct"]
        # Align and format
        for _, r in grp[cols].sort_values("Section").iterrows():
            print(f"- {r['Section']}: ",
                f"attempted {int(r['ever_attempted'])}/{int(r['total_questions'])} | ",
                f"attempts {int(r['attempts'])} | correct {int(r['correct'])} | ",
                f"acc {r['accuracy_pct']:.2f}%")


# ---------------------------
# Menu and parameter prompts
# ---------------------------

TOPIC_MAP = {
    "1": 'Section 1: Perform quantum operations',
    "2": 'Section 2: Visualize quantum circuits, measurements, and states',
    "3": 'Section 3: Create quantum circuits',
    "4": 'Section 4: Run quantum circuits',
    "5": 'Section 5: Use the sampler primitive',
    "6": 'Section 6: Use the estimator primitive',
    "7": 'Section 7: Retrieve and analyze the results of quantum circuits',
    "8": 'Section 8: Operate with OpenQASM'
}

def main_menu(student_history: pd.DataFrame):
    """
    Returns (quit_flag: bool, practice_flag: bool)
    """
    print("""
What would you like to do (type only the number)
    1 Take a practice test
    2 Save student history
    3 Quit
    4 View stats on my student history
""")
    choice = prompt_choice("Enter choice: ", {"1", "2", "3", "4"})
    if choice == "1":
        return (False, True)
    elif choice == "2":
        save_student_history(student_history)
        return (False, False)
    elif choice == "3":
        return (True, False)
    else:  # "4"
        print_stats(student_history)
        return (False, False)

def get_practice_params(question_df: pd.DataFrame):
    """
    Returns:
      full_test: bool  (True = exam over all topics; False = select topics)
      number_of_questions: int
      timed: bool
      adaptive_mode: int  (0, 1, or 2)
      quit_requested: bool
      topic_key: str or None (if not selecting by topic)
    """
    total_questions = len(question_df)
    if total_questions <= 0:
        print("[ERROR] No questions available.")
        return True, 0, False, 2, True, None

    print("""Type '0' for an exam over all topics (proportions roughly match the test).
Type '1' to select a topic.""")
    ft_choice = prompt_choice("Enter 0 or 1: ", {"0", "1"})
    full_test = (ft_choice == "0")

    # Number of questions (bound to available)
    number_of_questions = prompt_int(
        f"How many questions would you like? (1..{total_questions}) ",
        min_val=1,
        max_val=total_questions)

    # Timing suggestion
    suggested_seconds = int(round(79.411 * number_of_questions))
    timed = prompt_yes_no(
        f"Do you want your test timed? Suggested {suggested_seconds} seconds for {number_of_questions} questions. (1=yes, 0=no)")

    # Adaptive mode
    print("""Adaptive mode:
  0: Focus on questions you haven't seen (if all seen -> least seen; if none seen -> random)
  1: Focus on questions you got wrong the most (if none seen -> random)
  2: Random
""")
    adaptive_mode = prompt_choice("Enter 0, 1, or 2: ", {"0", "1", "2"})
    adaptive_mode = int(adaptive_mode)

    quit_requested = False
    topic_key = None

    if not full_test:
        # Topic selection loop
        while True:
            print("Select a topic (or 0 to quit):")
            for k, v in TOPIC_MAP.items():
                print(f"  {k}: {v}")
            t = prompt_choice("Enter number (0..8): ", set(TOPIC_MAP.keys()) | {"0"})
            if t == "0":
                quit_requested = True
                break
            topic_key = t
            break  # valid topic chosen

    return full_test, number_of_questions, timed, adaptive_mode, quit_requested, topic_key

# ---------------------------
# Example student_history init (if not already loaded)
# ---------------------------

# If you already have a student_history loaded elsewhere, skip this block.
if 'student_history' not in globals():
    base = question_df[["Question", "Section", "Task"]].copy()
    # If the same Question appears in multiple tasks/sections and you want them separate, keep duplicates.
    # Otherwise: base = base.drop_duplicates().reset_index(drop=True)
    student_history = base.copy()
    student_history["Number_attempts"] = 0
    student_history["Number_correct"] = 0
    # Optional: stable ids
    student_history.insert(0, "Question_ID", range(1, len(student_history) + 1))
import pandas as pd

def get_topic_allocation(number_questions: int, percentage_by_task_dict: dict) -> dict:
    """
    Integer allocation per task whose sum equals number_questions,
    using largest-remainder (Hamilton) rounding on percentages.
    """
    # raw (float) allocations
    raw = {
        task: percentage_by_task_dict[task] * number_questions / 100.0
        for task in percentage_by_task_dict}
    # floors
    alloc = {task: int(v) for task, v in raw.items()}
    used = sum(alloc.values())
    remaining = number_questions - used
    if remaining <= 0:
        return alloc

    # distribute by largest fractional remainders
    remainders = sorted(
        ((task, raw[task] - alloc[task]) for task in percentage_by_task_dict),
        key=lambda x: x[1],
        reverse=True)
    for i in range(remaining):
        alloc[remainders[i % len(remainders)][0]] += 1

    return alloc


def generate_test(full_test: bool, 
                  n_questions: int, 
                  timed: bool, 
                  adaptive_mode: int, 
                  want_quit: bool, 
                  topic_key: str,
                  student_history: pd.DataFrame):
    """
    Returns (practice_test_df, test_duration_seconds).
    Uses student_history to prioritize selections for adaptive modes.
    """
    # guard
    required_q_cols = {"Question", "Section", "Task"}
    required_h_cols = {"Question", "Section", "Task", "Number_attempts", "Number_correct"}
    if not required_q_cols.issubset(set(question_df.columns)):
        raise ValueError("question_df missing required columns")
    if not required_h_cols.issubset(set(student_history.columns)):
        raise ValueError("student_history missing required columns")

    test_duration = n_questions * 79.411  # seconds (suggested)

    if full_test:
        question_bank = question_df.copy()
        topic_question_counts = get_topic_allocation(n_questions, percentage_by_task_dict)
    else:
        topic = TOPIC_MAP[topic_key]
        question_bank = question_df[question_df["Section"] == topic].copy()

        tasks_in_topic = list(task_by_section_dict.get(topic, []))
        if len(tasks_in_topic) == 0:
            # fallback: treat entire section as one bucket
            topic_question_counts = {None: n_questions}
        else:
            base = n_questions // len(tasks_in_topic)
            topic_question_counts = {t: base for t in tasks_in_topic}
            # distribute remainder
            rem = n_questions - sum(topic_question_counts.values())
            for i in range(rem):
                topic_question_counts[tasks_in_topic[i % len(tasks_in_topic)]] += 1

    def select_by_history_for_task(task_name: str, k: int, mode: int) -> pd.DataFrame:
        """Pick up to k rows from question_bank for a given task using student_history priority."""
        if task_name is None:
            hist_slice = student_history.copy()
            bank_slice = question_bank.copy()
        else:
            hist_slice = student_history[student_history["Task"] == task_name]
            bank_slice = question_bank[question_bank["Task"] == task_name]

        if mode == 0:  # unseen / least seen first
            hist_slice = hist_slice.sort_values(by="Number_attempts", ascending=True)
        elif mode == 1:  # most wrong (lowest score first)
            # ensure score exists; if not, compute quickly
            if "score" not in hist_slice.columns:
                # (Number_correct + 1) / (Number_attempts*(1+eps) + eps)
                eps = 1e-3
                attempt_penalty = 1.0 + eps
                num = hist_slice["Number_correct"].astype(float) + 1.0
                den = hist_slice["Number_attempts"].astype(float) * attempt_penalty + eps
                hist_slice = hist_slice.copy()
                hist_slice["score"] = num / den
            hist_slice = hist_slice.sort_values(by="score", ascending=True)
        else:
            # random: sample directly from bank
            return bank_slice.sample(n=min(k, len(bank_slice)))

        # Map prioritized questions to rows in bank
        prioritized_qs = hist_slice["Question"].tolist()
        # keep order with Categorical sort, then take top k
        if len(prioritized_qs) == 0:
            return bank_slice.sample(n=min(k, len(bank_slice)))

        ordered = (bank_slice.assign(_ord=pd.Categorical(
            bank_slice["Question"], 
            categories=prioritized_qs, ordered=True)).sort_values(
            "_ord", na_position="last").drop(columns=["_ord"]))
        return ordered.head(k)

    practice_parts = []
    if full_test:
        for task, k in topic_question_counts.items():
            if k <= 0:
                continue
            bank_slice = question_bank[question_bank["Task"] == task]
            if len(bank_slice) == 0:
                continue
            take = select_by_history_for_task(task, k, adaptive_mode)
            practice_parts.append(take)
    else:
        # in-section selection across tasks per topic_question_counts
        for task, k in topic_question_counts.items():
            if k <= 0:
                continue
            if task is None:
                bank_slice = question_bank
            else:
                bank_slice = question_bank[question_bank["Task"] == task]
            if len(bank_slice) == 0:
                continue
            take = select_by_history_for_task(task, k, adaptive_mode)
            practice_parts.append(take)

    practice_test = pd.concat(practice_parts, ignore_index=True) if practice_parts else pd.DataFrame(columns=list(question_df.columns))

    # If we somehow undershot (e.g., not enough items in some buckets), top up randomly from remaining pool
    if len(practice_test) < n_questions and len(question_bank) > len(practice_test):
        remaining_needed = n_questions - len(practice_test)
        remaining_pool = question_bank.merge(practice_test[["Question"]], on="Question", how="left", indicator=True)
        remaining_pool = remaining_pool[remaining_pool["_merge"] == "left_only"].drop(columns=["_merge"])
        if len(remaining_pool) > 0:
            practice_test = pd.concat([practice_test, remaining_pool.sample(n=min(remaining_needed, len(remaining_pool)), random_state=42)],ignore_index=True)

    # final cap in case of over-selection
    if len(practice_test) > n_questions:
        practice_test = practice_test.sample(n=n_questions).reset_index(drop=True)
    else:
        practice_test = practice_test.reset_index(drop=True)

    return practice_test, test_duration

resolved = False


In [None]:
import time

# ------- load (unchanged) -------
while not resolved:
    file_path = input("Provide file path for student history file.  If you don't want to load a student history, type 'None' ")
    student_history, resolved = load_student_history(file_path)

# ------- main loop -------
quit_flag = False
while not quit_flag:
    quit_flag, practice_flag = main_menu(student_history)
    if quit_flag:
        break
    if practice_flag:
        full_test, n_questions, timed, adaptive_mode, want_quit, topic_key = get_practice_params(question_df)
        if want_quit:
            continue

        print("\n=== Practice Parameters ===")
        print(f"Full test?         : {full_test}")
        print(f"Num questions      : {n_questions}")
        print(f"Timed              : {timed}")
        print(f"Adaptive mode      : {adaptive_mode}")
        print(f"Topic              : {TOPIC_MAP.get(topic_key, 'All topics')}")
        print("===========================\n")

        practice_test, duration = generate_test(
            full_test,
            n_questions, 
            timed, 
            adaptive_mode, 
            want_quit, 
            topic_key,
            student_history
        )

        start_time = time.time()
        if timed:
            print(f"You have {duration:.0f} seconds — about {int(duration//60)} minutes.")

        correct_count = 0

        for _, row in practice_test.iterrows():
            print("\nQuestion")
            print(row["Question"])
            print("A:", row.get("Choice_A", ""))
            print("B:", row.get("Choice_B", ""))
            print("C:", row.get("Choice_C", ""))
            print("D:", row.get("Choice_D", ""))

            answer = input("Select your choice: A, B, C, or D ").strip().upper()[:1]

            # update attempts
            mask = (student_history["Question"] == row["Question"])
            student_history.loc[mask, "Number_attempts"] = student_history.loc[mask, "Number_attempts"] + 1

            # time check
            current_duration = time.time() - start_time
            if timed:
                remaining = max(0, duration - current_duration)
                print(f"Time remaining = {remaining:.1f} seconds")
                if current_duration > duration:
                    print("Out of time.")
                    break

            # evaluate answer
            is_valid = answer in {"A", "B", "C", "D"}
            correct_field = row.get("Correct_Answer", None)

            if not is_valid:
                print("You entered an invalid answer choice. Question marked wrong.")
                is_correct = False
            else:
                # If Correct_Answer is a letter, compare letters; otherwise compare the text of the chosen option.
                if isinstance(correct_field, str) and correct_field in {"A", "B", "C", "D"}:
                    is_correct = (answer == correct_field)
                else:
                    is_correct = (row.get(f"Choice_{answer}") == correct_field)

            if is_correct:
                print("Correct!")
                correct_count += 1
                student_history.loc[mask, "Number_correct"] = student_history.loc[mask, "Number_correct"] + 1
            else:
                print("Incorrect. The correct answer is:")
                if isinstance(correct_field, str) and correct_field in {"A", "B", "C", "D"}:
                    print(correct_field)
                else:
                    print(correct_field)

            # explanation (if present)
            expl = row.get("Explanation", None)
            if expl is not None:
                print("Explanation:")
                print(expl)

        total_time = time.time() - start_time
        print(f"\nYou got {correct_count} of {n_questions} correct in {total_time:.1f} seconds.")

        update_scores(student_history)
        print_stats(student_history)
        save_student_history(student_history)

Provide file path for student history file.  If you don't want to load a student history, type 'None'  /kaggle/working/student_history_20251008_155901.pkl
Enter choice:  quit
