In [4]:
import logging
import os
from datetime import datetime
import pandas as pd
from ai_cdss.constants import BY_ID, BY_PPS

def check_session(session: pd.DataFrame) -> pd.DataFrame:
    """
    Check for data discrepancies in session DataFrame, export findings to ~/.ai_cdss/logs/,
    log summary, and return cleaned DataFrame.

    Parameters
    ----------
    session : pd.DataFrame
        Session DataFrame to check and clean.

    Returns
    -------
    pd.DataFrame
        Cleaned session DataFrame.
    """
    # Step 0: Setup paths
    log_dir = os.path.expanduser("~/.ai_cdss/logs/")
    os.makedirs(log_dir, exist_ok=True)

    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')

    log_file = os.path.join(log_dir, "data_check.log")

    # Setup logging (re-setup per function call to ensure correct log file)
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler(log_file),
            logging.StreamHandler()
        ]
    )

    logger = logging.getLogger(__name__)

    # Step 1: Patient registered but no data yet (no prescription)
    print("Patient registered but no data yet.")
    patients_no_data = session[session["PRESCRIPTION_ID"].isna()]
    if not patients_no_data.empty:
        export_file = os.path.join(log_dir, f"patients_no_data_{timestamp}.csv")
        patients_no_data[["PATIENT_ID", "PRESCRIPTION_ID", "SESSION_ID"]].to_csv(export_file, index=False)
        logger.warning(f"{len(patients_no_data)} patients found without prescription. Check exported file: {export_file}")
    else:
        logger.info("No patients without prescription found.")

    # Drop these rows
    session = session.drop(patients_no_data.index)

    # Step 2: Sessions in session table but not in recording table (no adherence)
    print("Sessions in session table but not in recording table")
    patient_session_discrepancy = session[session["ADHERENCE"].isna()]
    if not patient_session_discrepancy.empty:
        export_file = os.path.join(log_dir, f"patient_session_discrepancy_{timestamp}.csv")
        patient_session_discrepancy[["PATIENT_ID", "PRESCRIPTION_ID", "SESSION_ID"]].to_csv(export_file, index=False)
        logger.warning(f"{len(patient_session_discrepancy)} sessions found without adherence. Check exported file: {export_file}")
    else:
        logger.info("No sessions without adherence found.")

    # Drop these rows
    session = session.drop(patient_session_discrepancy.index)

    # Final info
    logger.info(f"Session data cleaned. Final shape: {session.shape}")

    return session

def safe_merge(
    left: pd.DataFrame,
    right: pd.DataFrame,
    on,
    how: str = "left",
    export_dir: str = "~/.ai_cdss/logs/",
    left_name: str = "left",
    right_name: str = "right",
) -> pd.DataFrame:
    """
    Perform a safe merge and independently report unmatched rows from left and right DataFrames.

    Parameters
    ----------
    left : pd.DataFrame
        Left DataFrame.
    right : pd.DataFrame
        Right DataFrame.
    on : str or list
        Column(s) to join on.
    how : str, optional
        Type of merge to be performed. Default is "left".
    export_dir : str, optional
        Directory to export discrepancy reports and logs.
    left_name : str, optional
        Friendly name for the left DataFrame, for logging.
    right_name : str, optional
        Friendly name for the right DataFrame, for logging.
    drop_inconsistencies : bool, optional
        If True, drop inconsistent rows (left-only). Default is False.

    Returns
    -------
    pd.DataFrame
        Merged DataFrame.
    """
    # Prepare export directory
    export_dir = os.path.expanduser(export_dir)
    os.makedirs(export_dir, exist_ok=True)

    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    log_file = os.path.join(export_dir, "data_check.log")

    # Setup logger — clean, no extra clutter
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler(log_file),
            logging.StreamHandler()
        ]
    )
    logger = logging.getLogger(__name__)

    # Step 1: Outer merge for discrepancy check
    discrepancy_check = left.merge(right, on=on, how="outer", indicator=True)

    left_only = discrepancy_check[discrepancy_check["_merge"] == "left_only"]
    right_only = discrepancy_check[discrepancy_check["_merge"] == "right_only"]

    # Step 2: Export and log discrepancies if found

    if not left_only.empty:
        export_file = os.path.join(export_dir, f"{left_name}_only_{timestamp}.csv")
        try:
            left_only[BY_ID + ["SESSION_DURATION", "SCORE", "DM_VALUE", "PE_VALUE"]].to_csv(export_file, index=False)
        except KeyError as e:
            left_only.to_csv(export_file, index=False)
            
        logger.warning(
            f"{len(left_only)} rows found only in '{left_name}' DataFrame "
            f"(see export: {export_file})"
        )

    if not right_only.empty:
        export_file = os.path.join(export_dir, f"{right_name}_only_{timestamp}.csv")
        try:
            right_only[BY_PPS + ["SESSION_DURATION", "SCORE", "DM_VALUE", "PE_VALUE"]].to_csv(export_file, index=False)
        except KeyError as e:
            right_only.to_csv(export_file, index=False)
        logger.warning(
            f"{len(right_only)} rows from '{right_name}' DataFrame did not match '{left_name}' DataFrame "
            f"(see export: {export_file})"
        )

    # Step 3: Actual requested merge
    merged = left.merge(right, on=on, how=how)

    return merged


In [7]:
# %%
from joblib import Memory
memory = Memory(location='cache_dir', verbose=0)

@memory.cache
def load_session_cached(patient_list):
    return check_session(loader.load_session_data(patient_list=patient_list))

@memory.cache
def load_timeseries_cached(patient_list):
    return loader.load_timeseries_data(patient_list=patient_list)

SAVE_DATA = True
EXPAND = False

# %%

import pandas as pd
from ai_cdss.data_loader import DataLoader
from ai_cdss.data_processor import DataProcessor
from ai_cdss.processing import expand_session_batch
from ai_cdss.constants import *

# NEST DATA
nest_patient = [
    775,  787,  788, 1123, 1169, 1170, 1171, 1172, 1173, 1983, 2110, 2195,
    2955, 2956, 2957, 2958, 2959, 2960, 2961, 2962, 2963, 3081, 3229, 3318, 3432
]

rgs_mode = "app"
scoring_weights = [1,1,1]
ewma_alpha = 0.2

n = 12
days = 7
protocols_per_day = 5

loader = DataLoader(rgs_mode=rgs_mode)
processor = DataProcessor(weights=scoring_weights, alpha=ewma_alpha)

# Load Data
session = load_session_cached(nest_patient)
timeseries = load_timeseries_cached(nest_patient)
ppf = loader.load_ppf_data(patient_list=nest_patient)

# ppf = loader.load_ppf_data(patient_list=nest_patient)
protocol_similarity = loader.load_protocol_similarity()
protocol_metrics = loader.load_protocol_init()

INFO:ai_cdss.data_loader:PPF data loaded successfully.
INFO:ai_cdss.data_loader:Protocol similarity data loaded successfully.
INFO:ai_cdss.data_loader:Protocol initialization data loaded successfully.


In [None]:
# # Process Data
# session = session.dropna(subset=["ADHERENCE"])
# ts = processor.aggregate_dms_by_time(timeseries)
# ts = ts.sort_values(by=BY_PPST)
# # Merge
# data = safe_merge(session, ts, on=BY_PPS, how="inner", left_name="session", right_name="ts")
# score_data = safe_merge(data, ppf, on=BY_PP, how="left", left_name="session", right_name="ppf")
# score_data.sort_values(by=BY_PPST, inplace=True)

# score_data.to_parquet("data/nest_data.parquet", index=False)

