In [22]:
import datetime
import os
from requests_cache import CachedSession
from tqdm.notebook import tqdm
import pandas as pd
import json

session = CachedSession(
    expire_after=datetime.timedelta(days=1), allowable_methods=["GET", "POST"]
)

key = os.getenv("CTS_V2_API_KEY")

In [None]:
from typing import Callable


OPEN_TRIALS = [
    "Active",
    "Approved",
    "Enrolling by Invitation",
    "In Review",
    "Temporarily Closed to Accrual",
    "Temporarily Closed to Accrual and Intervention",
]
OPEN_SITES = [
    "active",
    "approved",
    "enrolling_by_invitation",
    "in_review",
    "temporarily_closed_to_accrual",
]
TODAY = datetime.date.today().strftime("%Y%m%d")


def get_ctsapi_trials(start: int, **others):
    data = {
        "from": start,
        **others,
    }
    res = session.post(
        "https://clinicaltrialsapi.cancer.gov/api/v2/trials",
        json=data,
        headers={"X-API-KEY": key},
    )
    res.raise_for_status()
    return res.json()


def gather_trials(**kwargs):
    page = get_ctsapi_trials(start=0, **kwargs)
    total = page["total"]
    trials = page["data"]
    if DEBUG:
        return trials
    pbar = tqdm(total=total)
    pbar.update(len(trials))
    while len(trials) < total:
        next_page = get_ctsapi_trials(start=len(trials), **kwargs)
        trials.extend(next_page["data"])
        pbar.update(len(next_page["data"]))
    pbar.refresh()
    return trials


def get_nci_campus_trials():
    trials = gather_trials(
        **{
            "size": 50,
            "include": ["nci_id", "nct_id", "brief_title", "current_trial_status"],
            "sites.org_postal_code": "20892",
            "sites.recruitment_status": OPEN_SITES,
            "current_trial_status": OPEN_TRIALS,
        }
    )
    df = pd.DataFrame(
        trials, columns=["nct_id", "nci_id", "current_trial_status", "brief_title"]
    )
    df.to_csv(f"nih_onsite_trials_{TODAY}.csv", index=False, encoding="utf_8_sig")


def get_all_trials(
    preprocess: list[Callable[[dict,], None]],
    postprocess: list[Callable[[dict,], None]],
    filename,
):
    trials = gather_trials(
        **{
            "size": 1 if DEBUG else 50,
            "include": [
                "active_sites_count",
                "amendment_date",
                "anatomic_sites",
                "arms.type",
                # biomarkers (only the below nested fields and only if biomarkers.inclusion_indicator=TRIAL),
                "biomarkers.name",
                "biomarkers.eligibility_criterion",
                "biomarkers.nci_thesaurus_concept_id",
                "biomarkers.inclusion_indicator",
                "brief_summary",
                "brief_title",
                "ccr_id",
                "central_contact.email",
                "central_contact.name",
                "central_contact.phone",
                "central_contact.type",
                "classification_code",
                "collaborators.functional_role",
                "collaborators.name",
                "completion_date",
                "completion_date_type_code",
                "current_trial_status",
                "current_trial_status_date",
                # diseases (only the below nested fields and only if diseases.inclusion_indicator=TRIAL),
                "diseases.is_lead_disease",
                "diseases.name",
                "diseases.nci_thesaurus_concept_id",
                "diseases.inclusion_indicator",
                "eligibility.structured.accepts_healthy_volunteers",
                "eligibility.structured.gender"  # (hotfix deploying in mid/late-April that will change this to 'eligibility.structured.sex'),
                "eligibility.structured.max_age_in_years",
                "eligibility.structured.min_age_in_years",
                "keywords",
                "lead_org",
                "lead_org_cancer_center",
                "minimum_target_accrual_number",
                "nci_funded",
                "nci_id",
                "nct_id",
                "number_of_arms",
                "official_title",
                "phase",
                "primary_purpose",
                "principal_investigator",
                # prior therapy (only the below nested fields and only if prior_therapy.inclusion_indicator=TRIAL)
                "prior_therapy.eligibility_criterion",
                "prior_therapy.name",
                "prior_therapy.nci_thesaurus_concept_id",
                "prior_therapy.inclusion_indicator",
                "record_verification_date",
                "start_date",
                "start_date_type_code",
                "status_history",
                "study_model_code",
                "study_model_other_text",
                "study_population_description",
                "study_protocol_type",
                "study_source",
                "why_study_stopped",
            ],
        }
    )
    for hook in preprocess:
        for trial in trials:
            hook(trial)
    df = pd.DataFrame(trials)
    for hook in postprocess:
        df = hook(df)
    df.to_csv(filename, index=False, encoding="utf_8_sig")

In [None]:
NESTED_FIELDS_W_TRIAL_LEVEL = ["biomarkers", "diseases", "prior_therapy"]
NESTED_FIELDS_TO_EXPLODE = [
    "biomarkers",
    "diseases",
    "prior_therapy",
]


def drop_non_trial_level(trial):
    for field in NESTED_FIELDS_W_TRIAL_LEVEL:
        if field in trial:
            trial[field] = [
                item for item in trial[field] if item["inclusion_indicator"] == "TRIAL"
            ]


def cleanup_inclusion_indicators(trial):
    for field in NESTED_FIELDS_W_TRIAL_LEVEL:
        if field in trial:
            for item in trial[field]:
                del item["inclusion_indicator"]


found_one = False


def check_eligibility(trial):
    global found_one
    if found_one:
        return
    if "eligibility" in trial and "structured" in trial["eligibility"]:
        for item in trial["eligibility"]["structured"]:
            if "max_age_in_years" in item and "gender" in item:
                print(json.dumps(trial, indent=None, separators=(",", ":")))
                found_one = True


def count_n_nested_fields(trial):
    for field in NESTED_FIELDS_TO_EXPLODE:
        if field in trial:
            if isinstance(trial[field], list):
                print(len(trial[field]))


field_length_max = {}
max_field_trial_id = {}
max_field_contents = {}


def count_size_of_fields(trial):
    for field in trial:
        length = 0
        contents: str
        if isinstance(trial[field], str):
            contents = trial[field]
            length = len(contents)
        elif isinstance(trial[field], (list, dict)):
            contents = json.dumps(trial[field])
            length = len(contents)
        else:
            continue
        if field in field_length_max:
            if length > field_length_max[field]:
                field_length_max[field] = length
                max_field_trial_id[field] = trial["nct_id"]
                max_field_contents[field] = contents
        else:
            field_length_max[field] = length
            max_field_trial_id[field] = trial["nct_id"]
            max_field_contents[field] = contents


def explode_nested(df):
    for field in NESTED_FIELDS_TO_EXPLODE:
        if field in df.columns:  # Check if the column exists before exploding
            df = df.explode(field)
        print(df.shape)
    return df


DEBUG = False
# get_all_trials(
#     preprocess=[drop_non_trial_level, count_n_nested_fields],
#     postprocess=[explode_nested],
#     filename=f"all_trials_{TODAY}.csv",
# )
get_all_trials(
    preprocess=[
        drop_non_trial_level,
        cleanup_inclusion_indicators,
        check_eligibility,
        count_size_of_fields,
    ],
    postprocess=[],
    filename=f"all_trials_{TODAY}.csv",
)

  0%|          | 0/24388 [00:00<?, ?it/s]

In [26]:
highest = {pair for pair in field_length_max.items() if pair[1] > 20000}
for item in highest:
    print(item)

('diseases', 21625)
