In [1]:
import os
import sys
from datetime import timedelta
from typing import Dict, Any, Optional
import json

import numpy as np
import pandas as pd

from common_utils import load_schema, validate_data

sys.path.append(os.path.abspath("N:\\CancerEpidem\\BrBreakthrough\\DeliveryProcess\\Schema_and_Derivation_utils"))
from utilities import connect_DB, createLogger, read_data
from config import Delivery_log_path, test_server, r0_json_path, out_json_path

In [2]:
def _floor_years_days(days: Optional[int]) -> Optional[int]:
    """
    Convert a day difference into an integer age in years using 365.25-day years.
    Returns None if days is None or NaN.
    """
    if days is None:
        return None
    try:
        if pd.isna(days):
            return None
    except Exception:
        pass
    try:
        return int(np.floor(float(days) / 365.25))
    except Exception:
        return None

In [3]:
def _to_datetime_series(s: pd.Series) -> pd.Series:
    """Coerce a pandas Series to datetime with NaT on failure."""
    return pd.to_datetime(s, errors="coerce")

In [4]:
def _shift_date(dt: Any, random_days: Any) -> Optional[pd.Timestamp]:
    """
    Apply pseudo-anonymization shift by adding participant-specific Random days.
    Returns None if either is missing/invalid.
    """
    if dt is None:
        return None
    try:
        ts = pd.to_datetime(dt, errors="coerce")
    except Exception:
        return None
    if pd.isna(ts):
        return None

    try:
        rd = int(random_days)
    except Exception:
        return None

    return ts + timedelta(days=rd)

In [5]:
def _shift_date_str(dt: Any, random_days: Any) -> Optional[str]:
    """
    Shift a date and return an ISO date string (YYYY-MM-DD) or None.
    Matches schema expectation for DOB and EntryDate.
    """
    shifted = _shift_date(dt, random_days)
    if shifted is None or pd.isna(shifted):
        return None
    return shifted.date().isoformat()

In [None]:
def mailing_database_load(test_server: str, logger) -> Dict[str, Dict[str, Any]]:
    """
    Load ADOB/EventDate/Random from the Mailing database and derive:

      - DOB       (shifted date of birth)
      - YOB       (unshifted year of birth)
      - EntryDate (shifted date of entry)
      - EntryYear (actual year of entry, unshifted)
      - AgeEntry  (age in whole years at entry)

    Returns:
        dict[tcode] = {
            "ADOB": <unshifted datetime>,
            "DOB": <str or None>,
            "YOB": <int or None>,
            "EntryDate": <str or None>,
            "EntryYear": <int or None>,
            "AgeEntry": <int or None>,
        }
    """
    logger.info("Loading Mailing data for entry variable derivation...")

    conn = connect_DB("Mailing", test_server, logger)

    # NOTE: This SELECT is the same logical query as in Derivation.ipynb,
    # but moved here to keep all entry-variable logic together.
    sql = """
    SELECT
        S.StudyID,
        S.TCode,
        S.Random,
        G.ADOB,
        E.EventDate
    FROM [Mailing].[dbo].[Events]   AS E
    JOIN [Mailing].[dbo].[SIDCodes] AS S
        ON S.PersonID = E.Person
    JOIN [Mailing].[dbo].[General]  AS G
        ON G.PersonID = E.Person
    WHERE
        E.Cancelled = 0
        AND E.Event = 6
    """

    df = read_data(sql, conn, logger)

    if df.empty:
        logger.warning("No Mailing rows returned; derived entry variables will be empty.")
        return {}

    # Normalise column names (in case of case/space differences)
    df = df.rename(columns={c: c.strip() for c in df.columns})

    # Coerce date columns
    if "ADOB" in df.columns:
        df["ADOB"] = _to_datetime_series(df["ADOB"])
    if "EventDate" in df.columns:
        df["EventDate"] = _to_datetime_series(df["EventDate"])

    out: Dict[str, Dict[str, Any]] = {}

    for _, r in df.iterrows():
        tcode = r.get("TCode")
        if not isinstance(tcode, str) or not tcode:
            continue

        rand = r.get("Random")
        adob = r.get("ADOB")
        evdt = r.get("EventDate")
        study_id = r.get("StudyID")

        dob_shift = _shift_date_str(adob, rand)
        ent_shift = _shift_date_str(evdt, rand)

        yob = int(adob.year) if pd.notna(adob) else None
        entry_year = int(evdt.year) if pd.notna(evdt) else None

        if pd.notna(adob) and pd.notna(evdt):
            age_days = (evdt.date() - adob.date()).days
            age_years = _floor_years_days(age_days)
        else:
            age_years = None

        out[tcode] = {
            # everything below is intended for output
            "DOB": dob_shift,
            "YOB": yob,
            "EntryDate": ent_shift,
            "EntryYear": entry_year,
            "AgeEntry": age_years,
        }

    logger.info(f"Derived entry variables for {len(out)} participants.")
    
    return out

In [7]:
if __name__ == "__main__":

    logger = createLogger("DeriveEntryVariables", Delivery_log_path)
    mail_by_tcode = mailing_database_load(test_server, logger)

    # convert to list-of-dicts with TCode for easy JSON dumping
    records = []
    for tcode, vals in mail_by_tcode.items():
        rec = {"TCode": tcode}
        rec.update(vals)
        records.append(rec)

    schema = load_schema(r0_json_path, f"RawDerivedvariables_Schema")
    validate_data(records, schema, r0_json_path)

    out_path = os.path.join(out_json_path, 's5_derived', "RawDerivedVariables.json")
    with open(out_path, "w", encoding="utf-8") as f:
        json.dump(records, f, default=str, indent=2)

    logger.info(f"✓ Saved entry variables JSON to {out_path}")


2026-02-05 13:28:20 - INFO: Loading Mailing data for entry variable derivation...
2026-02-05 13:28:55 - INFO: Derived entry variables for 113766 participants.


Validating 113,766 items...
100% - Validation completed in 18.81 seconds
✓ All items are valid


2026-02-05 13:29:24 - INFO: ✓ Saved entry variables JSON to N:\CancerEpidem\BrBreakthrough\DeliveryProcess\Data_Output_Testing\s5_derived\RawDerivedVariables.json
