In [16]:
# Import important libraries
import pandas as pd
import numpy as np

from typing import List, Optional, Tuple

In [17]:
# 1. Validation
def validate_data(
        df: pd.DataFrame,
        total_col: str = "TotalResponses",
        correct_col: str = "CorrectResponses",
        level_col: str = "Level",
        max_lives : int = 3,

) -> Tuple[pd.DataFrame, List[str]]:
    
    """
    Validates and normalises the aggregate Number Recall data so we can reconstruct trial-level rows.
    Rules assumed from your spec:
      - Level == CorrectResponses (should hold; if not, we trust CorrectResponses).
      - TotalResponses = CorrectResponses + wrong_attempts, with wrong_attempts <= max_lives.
      - Drop rows with no attempts (TotalResponses <= 0).
    """

    df = df.copy()
    notes: List[str] = []

    # Required columns present?
    for c in [total_col, correct_col, level_col]:
        if c not in df.columns:
            raise KeyError(f"Missing required column: {c}")
        
    # If Level missing, backfill from CorrectResponses
    df[level_col] = df[level_col].fillna(df[correct_col])

    # Drop rows with nulls in the key columns
    before = len(df)
    df = df.dropna(subset=[total_col, correct_col, level_col]).copy()
    dropped_nulls = before - len(df)
    if dropped_nulls > 0:
        notes.append(f"[Drop] Removed {dropped_nulls} rows with nulls in {total_col}/{correct_col}/{level_col}.")

    # Enforce Level == CorrectResponses
    mask_lvl_mismatch = df[level_col] != df[correct_col]
    if mask_lvl_mismatch.any():
        n = int(mask_lvl_mismatch.sum())
        notes.append(f"[Fix] Level != CorrectResponses for {n} rows. Overwrote Level with CorrectResponses.")
        df.loc[mask_lvl_mismatch, level_col] = df.loc[mask_lvl_mismatch, correct_col]

    # Enforce TotalResponses >= CorrectResponses
    mask_too_low = df[total_col] < df[correct_col]
    if mask_too_low.any():
        n = int(mask_too_low.sum())
        notes.append(f"[Fix] TotalResponses < CorrectResponses in {n} rows. Set TotalResponses = CorrectResponses.")
        df.loc[mask_too_low, total_col] = df.loc[mask_too_low, correct_col]

    # Enforce wrong attempts <= max_lives
    wrong_attempts = df[total_col] - df[correct_col]
    mask_too_high = wrong_attempts > max_lives
    if mask_too_high.any():
        n = int(mask_too_high.sum())
        notes.append(f"[Fix] Wrong attempts exceeded {max_lives} in {n} rows. "
                     f"Capped at CorrectResponses + {max_lives}.")
        df.loc[mask_too_high, total_col] = df.loc[mask_too_high, correct_col] + max_lives

    # Drop rows with no attempts
    before2 = len(df)
    df = df[df[total_col] > 0].copy()
    dropped_zero = before2 - len(df)
    if dropped_zero > 0:
        notes.append(f"[Drop] Removed {dropped_zero} rows with {total_col} <= 0.")

    return df

In [18]:
# 2. Reconstruct data for IRT Model ie item-response rows
def row_to_item_responses(correct, total):
    """
    Build item_id and response lists for a single aggregate row.
    - item_id = 1..total
    - first 'correct' are 1, remainder are 0

    """
    total = int(total)
    correct = int(correct)
    items = list(range(1, total + 1))
    responses = [1] * correct + [0] * max(0, total - correct)

    return items, responses

def explode_trials(
    df: pd.DataFrame,
    id_col: str = "AccountId",
    total_col: str = "TotalResponses",
    correct_col: str = "CorrectResponses",
    keep_session: bool = False

) -> pd.DataFrame:
    """
    Produce a long table with participant_id, item_id, response.

    """
    items_and_responses = df[[correct_col, total_col]].apply(
        lambda r: row_to_item_responses(r[correct_col], r[total_col]), axis=1
    )
    df = df.copy()
    df["__items"] = [ir[0] for ir in items_and_responses]
    df["__responses"] = [ir[1] for ir in items_and_responses]

    long_df = df[[id_col, "__items", "__responses"]].explode(["__items", "__responses"], ignore_index=True)
    out = long_df.rename(columns={id_col: "participant_id", "__items": "item_id", "__responses": "response"})
    return out[["participant_id", "item_id", "response"]]

In [19]:
# 3. End to end function
def transform_number_recall_to_irt(
    df: pd.DataFrame,
    *,
    id_col: str = "AccountId",
    total_col: str = "TotalResponses",
    correct_col: str = "CorrectResponses",
    level_col: str = "Level",
    max_lives: int = 3,
) -> pd.DataFrame:
    clean = validate_data(
        df,
        total_col=total_col,
        correct_col=correct_col,
        level_col=level_col,
        max_lives=max_lives,
    )
    out = explode_trials(
        clean,
        id_col=id_col,
        total_col=total_col,
        correct_col=correct_col,
    )
    return out

In [20]:

def append_original_columns_to_irt(
    irt_data: pd.DataFrame,
    original_df: pd.DataFrame,
    *,
    id_col: str = "AccountId",                 # id column in original_df
    irt_id_col: str = "participant_id",        # id column in irt_data
    cols_to_append: Optional[List[str]] = None # which original columns to append
) -> pd.DataFrame:
    """
    Append selected columns from the original aggregate data to the long-form IRT rows.

    Parameters
    ----------
    irt_data : DataFrame
        Output from your transform_number_recall_to_irt(...) function (cols: participant_id, item_id, response).
    original_df : DataFrame
        The original aggregate table that contains id_col plus columns you want to append.
    id_col : str
        Identifier in original_df (e.g., 'AccountId').
    irt_id_col : str
        Identifier in irt_data (e.g., 'participant_id').
    cols_to_append : list[str] | None
        Columns from original_df to append. Defaults to ['Score', 'Percentage', 'Percentile'].

    Returns
    -------
    DataFrame
        irt_data with the requested columns appended per participant.
    """
    if cols_to_append is None:
        cols_to_append = ["Score", "Percentage", "Percentile"]

    # Validate presence
    missing = [c for c in [id_col, *cols_to_append] if c not in original_df.columns]
    if missing:
        raise KeyError(f"Missing required column(s) in original_df: {missing}")

    if irt_id_col not in irt_data.columns:
        raise KeyError(f"Missing '{irt_id_col}' column in irt_data.")

    # Make a slim, deduplicated right table
    right = (
        original_df[[id_col, *cols_to_append]]
        .dropna(subset=[id_col])
        .drop_duplicates(subset=[id_col])
        .copy()
    )

    # Merge onto IRT rows
    merged = irt_data.merge(
        right,
        how="left",
        left_on=irt_id_col,
        right_on=id_col,
        validate="many_to_one" if right[id_col].is_unique else "many_to_many"
    ).drop(columns=[id_col], errors="ignore")

    return merged


In [21]:
df = pd.read_excel("/Users/op24226/Desktop/PsychGames/Data/NumberRecall_UserScores.xlsx")

In [22]:
irt_df = transform_number_recall_to_irt(df)

In [23]:
irt_scores = append_original_columns_to_irt(irt_df, df)

In [24]:
# Change irt data to mirt format

#Get the maximum item_id for each participant
max_items = irt_df.groupby('participant_id')['item_id'].max().max()

# Create a pivot table with aggfunc to handle multiple values
wide_df = irt_df.pivot_table(
    index='participant_id',
    columns='item_id',
    values='response',
    aggfunc='first'  # Takes the first response if there are multiple
)

# Ensure all item_ids up to max are present
for i in range(1, max_items + 1):
    if i not in wide_df.columns:
        wide_df[i] = np.nan

# Sort columns numerically
wide_df = wide_df.reindex(sorted(wide_df.columns), axis=1)

wide_df.columns = [f'item_id {col}' for col in wide_df.columns]

wide_df = wide_df.reset_index()
