In [18]:
from typing import Dict, List, Any, Hashable, Tuple, Set
from collections import defaultdict
from datetime import datetime
import pandas as pd
import numpy as np
import json

from models import StudyResult
from utils import generate_key
from config import SCALAR_FIELDS, NON_SCALAR_FIELDS


In [19]:
EXPECTED_TABLES = StudyResult.expected_tables()

In [20]:
def process_study_file(file_loc: str) -> List[StudyResult]:

    batch_results: List[StudyResult] = []
    
    df_studies = pd.read_parquet(file_loc)
    df_studies = pd.json_normalize(df_studies["studies"])
    
    for idx, study in df_studies.iterrows():
        nct_index = SCALAR_FIELDS["nct_id"]
        nct_id = study.get(nct_index)
    
        if not nct_id:
            continue
    
        try:
            result = transform_single_study(nct_id, study)
            batch_results.append(result)
            
        except Exception as e:
            raise
    
    return batch_results

In [21]:
def merge_batch_results(batch_results: List[StudyResult]) -> Dict[str, List[Dict]]:
    
    merged: Dict[str, List[Dict]] = defaultdict(list)
    # print(f" LEN BATCH BEFORE MERGE ---{len(batch_results)}")
    # print(f" TYPE BATCH Before MERGE ---{type(batch_results)}")

    # print(type(batch_results[0].tables()))
    # print("-----")
    print(f"{type(batch_results)}")
    print(f"{len(batch_results)}")
    for study_result in batch_results:
        for table, rows in study_result.tables().items():
            merged[table].extend(rows)

    missing = set(EXPECTED_TABLES) - merged.keys()
    if missing:
        raise ValueError(f"Missing tables: {missing}")


    return merged


In [22]:
def transform_single_study(nct_id: str, study: pd.Series) -> StudyResult:
    study_key = generate_key(nct_id)
    result = defaultdict(list)

    study_fields = transform_scalar_fields(study_key, study)
    result["studies"].append(study_fields)
    
    # identificationModule
    secondary_ids, nct_aliases = transform_identification_module(study_key, study)
    result["secondary_ids"].extend(secondary_ids)
    result["nct_aliases"].extend(nct_aliases)

    # sponsorCollaboratorsModule
    sponsor, study_sponsor, collaborators, study_collaborators = (
        transform_sponsor_and_collaborators(nct_id, study_key, study)
    )
    result["sponsors"].extend(sponsor)
    result["study_sponsors"].extend(study_sponsor)
    result["collaborators"].extend(collaborators)
    result["study_collaborators"].extend(study_collaborators)


    # conditionsModule
    conditions, study_conditions, keywords, study_keywords = transform_conditions(
        nct_id, study_key, study
    )
    result["conditions"].extend(conditions)
    result["study_conditions"].extend(study_conditions)
    result["keywords"].extend(keywords)
    result["study_keywords"].extend(study_keywords)

    return StudyResult(
        studies=result["studies"],
        secondary_ids=result["secondary_ids"],
        nct_aliases=result["nct_aliases"],
        sponsors=result["sponsors"],
        study_sponsors=result["study_sponsors"],
        collaborators=result["collaborators"],
        study_collaborators=result["study_collaborators"],
        conditions=result["conditions"],
        study_conditions=result["study_conditions"],
        keywords=result["keywords"],
        study_keywords=result["study_keywords"],
    )

In [23]:
def transform_scalar_fields(study_key: str, study_data: pd.Series) -> Dict:
    study_record = dict()

    study_record["study_key"] = study_key
    for entity_key in SCALAR_FIELDS:
        index_field = SCALAR_FIELDS.get(entity_key)

        study_record[entity_key] = study_data.get(index_field)

    return study_record

In [24]:
def transform_identification_module(study_key: str, study_data: pd.Series) -> Tuple:

    secondary_ids = []
    nct_aliases = []

    identification_index = NON_SCALAR_FIELDS["identification"]["index_field"]

    #Secondary id infos
    secondary_id_infos = study_data.get(f'{identification_index}.secondaryIdInfos')

    if (
        isinstance(secondary_id_infos, (list, np.ndarray))
        and len(secondary_id_infos) > 0
    ):
        for secondary_id_info in secondary_id_infos:
            secondary_id = secondary_id_info.get("id")
            secondary_id_key = generate_key(study_key, secondary_id)

            secondary_ids.append(
                {
                    "secondary_id_key": secondary_id_key,
                    "study_key": study_key,
                    "id": secondary_id,
                    "type": secondary_id_info.get("type"),
                    "domain": secondary_id_info.get("domain"),
                    "link": secondary_id_info.get("link"),
                }
            )

        nct_id_aliases = study_data.get(f'{identification_index}.nctIdAliases')

        if (
            isinstance(nct_id_aliases, (list, np.ndarray))
            and len(nct_id_aliases) > 0
        ):
            for nct_id_alias in nct_id_aliases:
                nct_aliases.append(
                    {
                        "study_key": study_key,
                        "id_alias": nct_id_alias,
                    }
                )

    return nct_aliases, secondary_ids



In [25]:

def transform_sponsor_and_collaborators(
    nct_id: str, study_key: str, study_data: pd.Series
) -> Tuple:
    sponsor = []
    study_sponsor = []
    collaborators = []
    study_collaborators = []

    sponsor_collaborator_index = NON_SCALAR_FIELDS["sponsor_collaborators"][
        "index_field"
    ]

    ## sponsor name and class are scalar values and MUST be transformed as so
    lead_sponsor_name = study_data.get(f"{sponsor_collaborator_index}.leadSponsor.name")
    lead_sponsor_class = study_data.get(
        f"{sponsor_collaborator_index}.leadSponsor.class"
    )

    if pd.notna(lead_sponsor_name) and pd.notna(lead_sponsor_class):

        sponsor_key = generate_key(lead_sponsor_name, lead_sponsor_class)
        sponsor.append(
            {
                "sponsor_key": sponsor_key,
                "name": lead_sponsor_name,
                "sponsor_class": lead_sponsor_class,
            }
        )

        study_sponsor.append({"study_key": study_key, "sponsor_key": sponsor_key})

    # collaborators
    collaborators_list = study_data.get(
        f"{sponsor_collaborator_index}.collaborators"
    )

    if (
        isinstance(collaborators_list, (list, np.ndarray))
        and len(collaborators_list) > 0
    ):
        for collaborator in collaborators_list:
            collaborator_key = generate_key(
                collaborator.get("name"), collaborator.get("class")
            )

            collaborators.append(
                {
                    "collaborator_key": collaborator_key,
                    "name": collaborator.get("name"),
                    "collaborator_class": collaborator.get("class"),
                }
            )

            study_collaborators.append(
                {
                    "study_key": study_key,
                    "collaborator_key": collaborator_key,
                }
            )


    return sponsor, study_sponsor, collaborators, study_collaborators

          


In [26]:
def transform_conditions(nct_id: str, study_key: str, study_data: pd.Series) -> Tuple:
    conditions = []
    study_conditions = []
    keywords = []
    study_keywords = []

    conditions_index = NON_SCALAR_FIELDS["conditions"]["index_field"]

    conditions_list = study_data.get(f"{conditions_index}.conditions")

    if isinstance(conditions_list, (list, np.ndarray)) and len(conditions_list) > 0:
        for condition in conditions_list:
            condition_key = generate_key(condition)

            conditions.append(
                {"condition_key": condition_key, "condition_name": condition}
            )

            study_conditions.append(
                {
                    "study_key": study_key,
                    "condition_key": condition_key,
                }
            )

    keywords_list = study_data.get(f"{conditions_index}.keywords")

    if isinstance(keywords_list, (list, np.ndarray)) and len(keywords_list) > 0:

        for keyword in keywords_list:
            keyword_key = generate_key(keyword)

            keywords.append({"keyword_key": keyword_key, "keyword_name": keyword})

            study_keywords.append(
                {
                    "study_key": study_key,
                    "keyword_key": keyword_key,
                }
            )

    return conditions, study_conditions, keywords, study_keywords


In [27]:
def post_process_tables(results: Dict[str, List[Dict]]) -> List[pd.DataFrame]:

    df_studies = pd.DataFrame(results["studies"])

    # identificationModule
    df_secondary_ids = pd.DataFrame(results["secondary_ids"])
    df_nct_aliases = pd.DataFrame(results["nct_aliases"])

    # sponsorCollaboratorsModule
    df_sponsors = pd.DataFrame(results["sponsors"])
    df_study_sponsors = pd.DataFrame(results["study_sponsors"])
    df_collaborators = pd.DataFrame(results["collaborators"])
    df_study_collaborators = pd.DataFrame(results["study_collaborators"])

    # conditionsModule
    df_conditions = pd.DataFrame(results["conditions"])
    df_study_conditions = pd.DataFrame(results["study_conditions"])
    df_keywords = pd.DataFrame(results["keywords"])
    df_study_keywords = pd.DataFrame(results["study_keywords"])
    

    return [
        df_studies,
        df_secondary_ids,
        df_nct_aliases,
        df_sponsors,
        df_study_sponsors,
        df_collaborators,
        df_study_collaborators,
        df_conditions,
        df_study_conditions,
        df_keywords,
        df_study_keywords,
    ]



In [28]:
def transform_studies_batch():
    try:
        batch_result = process_study_file("1.parquet")
        merged_batch_results = merge_batch_results(batch_result)
        dfs = post_process_tables(merged_batch_results)

        return dfs

    except Exception as e:
        raise


In [31]:
(
    df_studies,
    df_secondary_ids,
    df_nct_aliases,
    df_sponsors,
    df_study_sponsors,
    df_collaborators,
    df_study_collaborators,
    df_conditions,
    df_study_conditions,
    df_keywords,
    df_study_keywords,
) = transform_studies_batch()


<class 'list'>
1000


In [20]:
# print(df_studies.head())

In [40]:
df_studies.to_csv("data/study_data.csv", index=False)

In [41]:
df_secondary_ids.to_csv("data/secondary_ids.csv", index=False)
df_nct_aliases.to_csv("data/nct_aliases.csv", index=False)

In [42]:
df_sponsors.to_csv("data/sponsors.csv", index=False)
df_study_sponsors.to_csv("data/bridge_study_sponsors.csv", index=False)
df_collaborators.to_csv("data/collab.csv", index=False)
df_study_collaborators.to_csv("data/bridge_study_collab.csv", index=False)

In [32]:
df_conditions.to_csv("data/conditions.csv", index=False)
df_study_conditions.to_csv("data/study_conditions.csv", index=False)
df_keywords.to_csv("data/keywords.csv", index=False)
df_study_keywords.to_csv("data/study_keywords.csv", index=False)

In [None]:
df_interventions.to_csv("data/interventions.csv", index=False)
df_study_interventions.to_csv("data/study_interventions.csv", index=False)
df_arm_group_interventions.to_csv("data/arm_groups_intrv.csv", index=False)

In [None]:
df_central_contacts.to_csv("data/contacts.csv", index=False)
df_study_central_contacts.to_csv("data/study_contacts.csv", index=False)

In [46]:
df_study_locations.to_csv("data/study_locations.csv", index=False)
df_locations.to_csv("data/locations.csv", index=False)

In [39]:
# studies[studies['study_key'] == '9de216aef0c75756']

In [104]:
df_references.to_csv("data/refs.csv", index=False)
df_links.to_csv("data/links.csv", index=False)

In [257]:
df_ipds.to_csv("data/ipds.csv", index=False)

In [20]:
df_flow_groups.to_csv("data/flow_groups.csv", index=False)
df_flow_period_events.to_csv("data/flow_events.csv", index=False)

In [21]:
duplicates.to_csv("data/dups.csv", index=False)