In [92]:
import os
import sys
import dlt
from dlt import pipeline as dlt_pipeline
import logging
import sys
import time
from tqdm import tqdm
import time
import pickle
from dlt.sources.helpers import requests
import re



# Ensure the 'models' directory is in the Python path
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), '..', 'models')))

"""
Using Hugging Face Model Clinical-AI-Apollo/Medical-NER instead of GPT-3.5
due to the cost I will incur if use OpenAI, as I pull all the studies from
clinicaltrials.gov
"""
from entity_extractor import EntityExtractor 

# Data Pipeline

params = {
    'pageSize': 1000,
    'format': 'json', 
    #'pageToken': None  # first page doesn't need it
}

dlt_pipeline = dlt_pipeline(pipeline_name="clinical_trial_pipeline", destination="duckdb", dataset_name="clinical_trial_data")

In [2]:
# Set up logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

# Create a stream handler to output to stdout
stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setLevel(logging.INFO)

# Create a formatter and set it for the handler
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
stream_handler.setFormatter(formatter)

# Add the handler to the logger
if not logger.hasHandlers():
    logger.addHandler(stream_handler)

In [3]:
base_url = "https://clinicaltrials.gov/api/v2/studies"
page = 1 # initialize page count
studies = [] # initialize all_page_data


# Start timer
start_time = time.time()

while True:
  logger.info(f'--- page: {page} ---')  # Log page information
  
  response = requests.get(base_url, params=params)
  
  if not response.ok:
      logger.error('Request failed:', response.text)  # Log error with details
      break
  
  data = response.json()
  data_responses = data['studies']
  studies.extend(data_responses)
  
  
  # Check for next page token
  next_page_token = data.get('nextPageToken')
  if not next_page_token:
      break  # No next page, exit the loop

  # Update params for next page
  params['pageToken'] = next_page_token
  page += 1  # Increment page counter

# End timer
end_time = time.time()
elapsed_time = end_time - start_time
minutes = int(elapsed_time // 60)
seconds = elapsed_time % 60


logger.info('Successfully retrieved all pages.')  # Log completion message
logger.info(f'Total elapsed time: {minutes} minutes and {seconds:.2f} seconds')  # Log total elapsed time

2024-06-08 18:02:33,665 - __main__ - INFO - --- page: 1 ---
2024-06-08 18:02:34,628 - __main__ - INFO - --- page: 2 ---
2024-06-08 18:02:35,416 - __main__ - INFO - --- page: 3 ---
2024-06-08 18:02:36,274 - __main__ - INFO - --- page: 4 ---
2024-06-08 18:02:37,282 - __main__ - INFO - --- page: 5 ---
2024-06-08 18:02:38,001 - __main__ - INFO - --- page: 6 ---
2024-06-08 18:02:38,951 - __main__ - INFO - --- page: 7 ---
2024-06-08 18:02:39,872 - __main__ - INFO - --- page: 8 ---
2024-06-08 18:02:40,830 - __main__ - INFO - --- page: 9 ---
2024-06-08 18:02:41,663 - __main__ - INFO - --- page: 10 ---
2024-06-08 18:02:42,454 - __main__ - INFO - --- page: 11 ---
2024-06-08 18:02:43,486 - __main__ - INFO - --- page: 12 ---
2024-06-08 18:02:44,319 - __main__ - INFO - --- page: 13 ---
2024-06-08 18:02:45,220 - __main__ - INFO - --- page: 14 ---
2024-06-08 18:02:46,532 - __main__ - INFO - --- page: 15 ---
2024-06-08 18:02:47,285 - __main__ - INFO - --- page: 16 ---
2024-06-08 18:02:47,992 - __main_

In [4]:
len(studies)

497445

In [5]:
# Initialize an empty list to store the data

# Start timer
start_time = time.time()

# Initialize an empty list to store the data
studies_data = []

# Loop for storing data with progress bar
for study in tqdm(studies, desc="Processing studies"):
    # Safely access nested keys
    nctId = study['protocolSection']['identificationModule'].get('nctId', 'Unknown')
    overallStatus = study['protocolSection']['statusModule'].get('overallStatus', 'Unknown')
    startDate = study['protocolSection']['statusModule'].get('startDateStruct', {}).get('date', 'Unknown Date')
    conditions = ', '.join(study['protocolSection'].get('conditionsModule',{}).get('conditions', ['No conditions listed']))
    briefTitle = study['protocolSection']['identificationModule'].get('briefTitle', 'Unknown')
    acronym = study['protocolSection']['identificationModule'].get('acronym', 'Unknown')

    # Extract interventions safely
    interventions_list = study['protocolSection'].get('armsInterventionsModule', {}).get('interventions', [])
    interventions = ', '.join([intervention.get('name', 'No intervention name listed') for intervention in interventions_list]) if interventions_list else "No interventions listed"

    # Extract locations safely
    locations_list = study['protocolSection'].get('contactsLocationsModule', {}).get('locations', [])
    locations = ', '.join([f"{location.get('city', 'No City')} - {location.get('country', 'No Country')}" for location in locations_list]) if locations_list else "No locations listed"

    # Extract dates and phases
    primaryCompletionDate = study['protocolSection']['statusModule'].get('primaryCompletionDateStruct', {}).get('date', 'Unknown Date')
    studyFirstPostDate = study['protocolSection']['statusModule'].get('studyFirstPostDateStruct', {}).get('date', 'Unknown Date')
    lastUpdatePostDate = study['protocolSection']['statusModule'].get('lastUpdatePostDateStruct', {}).get('date', 'Unknown Date')
    studyType = study['protocolSection'].get('designModule',{}).get('studyType', 'Unknown')
    phases = ', '.join(study['protocolSection'].get('designModule',{}).get('phases', ['Not Available']))

    # Extract Eligibility
    eligibilityCriteria = study['protocolSection'].get('eligibilityModule',{}).get('eligibilityCriteria','Unknown')
    sex = study['protocolSection'].get('eligibilityModule',{}).get('sex','Unknown')
    minimumAge = study['protocolSection'].get('eligibilityModule',{}).get('minimumAge','0 Year')
    maximumAge = study['protocolSection'].get('eligibilityModule',{}).get('maximumAge','120 Years')
    stdAges = study['protocolSection'].get('eligibilityModule',{}).get('stdAges','Uknown')

    # Extend the data to the list as a dictionary
    studies_data.append({
        'nctId' : nctId,
        'briefTitle' : briefTitle,
        'acronym' : acronym,
        'overallStatus' : overallStatus,
        'startDate' : startDate,
        'conditions' : conditions,
        'interventions' : interventions,
        'locations' : locations,
        'primaryCompletionDate' : primaryCompletionDate,
        'studyFirstPostDate' : studyFirstPostDate,
        'lastUpdatePostDate' : lastUpdatePostDate,
        'studyType' : studyType,
        'phases' : phases,
        'eligibilityCriteria' : eligibilityCriteria,
        'sex' : sex,
        'minimumAge' : minimumAge,
        'maximumAge' : maximumAge,
        'stdAges' : stdAges
    })

# End timer
end_time = time.time()
elapsed_time = end_time - start_time
minutes = int(elapsed_time // 60)
seconds = elapsed_time % 60


logger.info('Successfully stored data as list ')  # Log completion message
logger.info(f'Total elapsed time: {minutes} minutes and {seconds:.2f} seconds')  # Log total elapsed time of Loop

Processing studies: 100%|██████████| 497445/497445 [01:25<00:00, 5827.35it/s] 

2024-06-08 18:12:11,454 - __main__ - INFO - Successfully stored data as list 
2024-06-08 18:12:11,454 - __main__ - INFO - Total elapsed time: 1 minutes and 25.62 seconds





In [None]:
def convert_age_to_years(age_str):
    # Define conversion factors
    conversion_factors = {
        'year': 1,
        'years': 1,
        'month': 1/12,
        'months': 1/12,
        'week': 1/52,
        'weeks': 1/52,
        'day': 1/365,
        'days': 1/365,
        'hour': 1/8760,
        'hours': 1/8760,
        'minute': 1/525600,
        'minutes': 1/525600
    }

    # Regular expression to extract age and unit
    match = re.match(r'(\d+)\s*(year|years|month|months|week|weeks|day|days|hour|hours|minute|minutes)', age_str.lower())
    
    if match:
        value, unit = match.groups()
        return float(value) * conversion_factors[unit]
    else:
        raise ValueError(f"Unknown age format: {age_str}")
    
def normalize_ages(studies_data):
    for study in studies_data:
        min_age_str = study.get('minimumAge', '0')
        max_age_str = study.get('maximumAge', '120')
        
        try:
            min_age = convert_age_to_years(min_age_str)
        except ValueError as e:
            min_age = float('inf')  # Handle unknown format as no limit
            print(e)
        
        try:
            max_age = convert_age_to_years(max_age_str)
        except ValueError as e:
            max_age = float('inf')  # Handle unknown format as no limit
            print(e)
        
        study['normalized_minimumAge'] = min_age
        study['normalized_maximumAge'] = max_age

In [None]:
normalize_ages(studies_data)

In [74]:
from transformers import pipeline as hf_pipeline
import torch

class EntityExtractor:
    def __init__(self, model_name="Clinical-AI-Apollo/Medical-NER", aggregation_strategy='simple'):
        """
        Initialize the EntityExtractor with a specific model.

        Args:
            model_name (str): The name of the model to use for the pipeline.
            aggregation_strategy (str): The strategy to use for aggregation.
        """
        self.pipe = hf_pipeline("token-classification", model=model_name, aggregation_strategy=aggregation_strategy)

    def extract_specific_entities(self, text, target_entities=['DISEASE_DISORDER', 'MEDICATION']):
        """
        Extract specific entities from the given text using the initialized pipeline.

        Args:
            text (str): The input text from which to extract entities.
            target_entities (list): List of entity types to extract (default is ['DISEASE_DISORDER', 'MEDICATION']).

        Returns:
            dict: A dictionary with lists of extracted entities for each target type.
        """
        results = self.pipe(text)
        extracted_entities = {'diseases': [], 'medications': []}
        entity_mapping = {
            'DISEASE_DISORDER': 'diseases',
            'MEDICATION': 'medications'
        }
        
        for entity in results:
            entity_type = entity['entity_group']
            if entity_type in target_entities:
                key = entity_mapping[entity_type]
                extracted_entities[key].append(entity['word'])
        
        return extracted_entities

In [29]:
def get_all_eligibility_criteria(studies_data):
    eligibility_criteria_list = [study["eligibilityCriteria"] for study in studies_data]
    return eligibility_criteria_list

# All eligibility_criteria_list
eligibility_criteria_list = get_all_eligibility_criteria(studies_data)

In [34]:
def get_all_overall_status(studies_data):
    overall_status_list = [study["overallStatus"] for study in studies_data]
    return overall_status_list

# All eligibility_criteria_list
overall_status_list = get_all_overall_status(studies_data)

In [37]:

"""
# This when I check count of studies for every unique overall_status
from collections import Counter

def count_overall_status(overall_status_list):
    status_counts = Counter(overall_status_list)
    return status_counts

# Count each unique overall status
status_counts = count_overall_status(overall_status_list)
"""

In [66]:
def filter_studies_by_status(studies_data, target_statuses):
    filtered_studies = [study for study in studies_data if study.get("overallStatus") in target_statuses]
    return filtered_studies

In [77]:
"""
Define target statuses, this is to reduce the count of the studies
that I'll be extracting it's entity as I would take more than 2 days running 
on my local machine if full data

This is just for a purpose of demonstration of NER (Named Entity Recognition)
using Clinical-AI-Apollo/Medical-NER from hugging face instead of GPT-3.5
"""
target_statuses = ['APPROVED_FOR_MARKETING','AVAILABLE','ENROLLING_BY_INVITATION']

In [78]:
# Filter studies by target statuses
filtered_studies = filter_studies_by_status(studies_data, target_statuses)

In [80]:
# Entity Extractor for Diseases and Medications
extractor = EntityExtractor()

# Start timer
start_time = time.time()

# Loop for storing the entity of diseases and medication with progress bar
"""
This is the one that has been used in place of GPT-3.5
Using Hugging Face Model Clinical-AI-Apollo/Medical-NER
"""
for study in tqdm(filtered_studies, desc="Processing filtered studies"):
    eligibilityCriteria = study['eligibilityCriteria']
    diseases_medications = extractor.extract_specific_entities(eligibilityCriteria)
    study['diseases'] = diseases_medications.get('diseases', '[]')
    study['medications'] = diseases_medications.get('medications', '[]')


# End timer
end_time = time.time()
elapsed_time = end_time - start_time
minutes = int(elapsed_time // 60)
seconds = elapsed_time % 60


logger.info('Successfully stored data as list ')  # Log completion message
logger.info(f'Total elapsed time: {minutes} minutes and {seconds:.2f} seconds')  # Log total elapsed time of Loop

Processing studies:   0%|          | 0/4400 [00:00<?, ?it/s]Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no truncation.
Processing studies: 100%|██████████| 4400/4400 [12:28<00:00,  5.88it/s]  

2024-06-08 20:53:48,328 - __main__ - INFO - Successfully stored data as list 
2024-06-08 20:53:48,329 - __main__ - INFO - Total elapsed time: 12 minutes and 28.05 seconds





In [96]:
normalize_ages(filtered_studies)

In [98]:
# filtered_studies[6]

In [49]:
%%capture
# Normalize and load the data onto the locally created duckdb database 'clinical_trial_pipeline.duckdb'
dlt_pipeline.run(studies_data, table_name='studies')

In [99]:
dlt_pipeline.run(filtered_studies, table_name='filtered_studies')

LoadInfo(pipeline=<dlt.pipeline.pipeline.Pipeline object at 0xdc7f47c70>, metrics={'1717852724.159038': [{'started_at': DateTime(2024, 6, 8, 13, 18, 45, 985290, tzinfo=Timezone('UTC')), 'finished_at': DateTime(2024, 6, 8, 13, 18, 47, 187969, tzinfo=Timezone('UTC'))}]}, destination_type='dlt.destinations.duckdb', destination_displayable_credentials='duckdb:////Users/glenn/Desktop/Exams/trially/clinical_trial_data_pipeline/grp_dalida_data_clinical_trial_data_pipeline/src/notebooks/clinical_trial_pipeline.duckdb', destination_name='duckdb', environment=None, staging_type=None, staging_name=None, staging_displayable_credentials=None, destination_fingerprint='', dataset_name='clinical_trial_data', loads_ids=['1717852724.159038'], load_packages=[LoadPackageInfo(load_id='1717852724.159038', package_path='/Users/glenn/.dlt/pipelines/clinical_trial_pipeline/load/loaded/1717852724.159038', state='loaded', schema=Schema clinical_trial at 59048220016, schema_update={'filtered_studies__diseases': {

In [101]:
import duckdb

conn = duckdb.connect(f"{dlt_pipeline.pipeline_name}.duckdb")

display(conn.sql("DESCRIBE"))

┌──────────────────────┬─────────────────────┬──────────────────────┬───┬──────────────────────┬───────────┐
│       database       │       schema        │         name         │ … │     column_types     │ temporary │
│       varchar        │       varchar       │       varchar        │   │      varchar[]       │  boolean  │
├──────────────────────┼─────────────────────┼──────────────────────┼───┼──────────────────────┼───────────┤
│ clinical_trial_pip…  │ clinical_trial_data │ _dlt_loads           │ … │ [VARCHAR, VARCHAR,…  │ false     │
│ clinical_trial_pip…  │ clinical_trial_data │ _dlt_pipeline_state  │ … │ [BIGINT, BIGINT, V…  │ false     │
│ clinical_trial_pip…  │ clinical_trial_data │ _dlt_version         │ … │ [BIGINT, BIGINT, T…  │ false     │
│ clinical_trial_pip…  │ clinical_trial_data │ filtered_studies     │ … │ [VARCHAR, VARCHAR,…  │ false     │
│ clinical_trial_pip…  │ clinical_trial_data │ filtered_studies__…  │ … │ [VARCHAR, VARCHAR,…  │ false     │
│ clinical_trial_pi

In [102]:
# Define the table name you want to view
table_name = 'clinical_trial_data.studies'

# Execute a query to select all data from the table
query = f"SELECT * FROM {table_name} limit 3"
result_df = conn.execute(query).fetchdf()

display(result_df)

Unnamed: 0,nct_id,brief_title,acronym,overall_status,start_date,conditions,interventions,locations,primary_completion_date,study_first_post_date,last_update_post_date,study_type,phases,eligibility_criteria,sex,minimum_age,maximum_age,_dlt_load_id,_dlt_id,std_ages
0,NCT00072579,Sargramostim in Treating Patients With Chronic...,Unknown,COMPLETED,2003-05,Leukemia,sargramostim,"Phoenix - United States, Oakland - United Stat...",2006-04,2003-11-05,2017-01-19,INTERVENTIONAL,PHASE2,DISEASE CHARACTERISTICS:\n\n* Histologically c...,ALL,18 Years,120 Years,1717791667.195856,eGYecbqpN1Xmug,
1,NCT00517179,Effect of Vardenafil on Blood Pressure in Pati...,Unknown,COMPLETED,2006-04,"Prostatic Hyperplasia, Impotence",Vardenafil 10mg,Hong Kong - China,Unknown Date,2007-08-16,2011-06-16,INTERVENTIONAL,,Inclusion Criteria:\n\n* Age between 50 to 80 ...,MALE,50 Years,80 Years,1717791667.195856,msBengrB3YSJUQ,
2,NCT00812279,Investigate the Exposure to Selected Smoke Con...,Unknown,COMPLETED,2008-11,Smoking,Distillation based smoking article (SMAR cigar...,Warsaw - Poland,2009-02,2008-12-22,2019-11-07,INTERVENTIONAL,,Inclusion Criteria:\n\n* Caucasian adult smoke...,ALL,23 Years,55 Years,1717791667.195856,0kJWUJUssIyD1Q,


In [108]:
# Define the table name you want to view
table_name = 'clinical_trial_data.filtered_studies'

# Execute a query to select all data from the table

query = f"SELECT nct_id, brief_title, normalized_minimum_age, normalized_maximum_age FROM {table_name}"
result_df = conn.execute(query).fetchdf()

display(result_df)

Unnamed: 0,nct_id,brief_title,normalized_minimum_age,normalized_maximum_age
0,NCT05900479,"Multi-site, Longitudinal Trial Evaluating the ...",18.0,120.0
1,NCT06221579,"An Intergenerational, Cognitively Enriched Int...",1.0,120.0
2,NCT04081779,Survivorship Care Plans and Telehealth Educati...,18.0,120.0
3,NCT05523479,The Maximizing Extubation Outcomes Through Edu...,18.0,120.0
4,NCT03820479,A Study to Investigate Quality of Recovery up ...,18.0,75.0
...,...,...,...,...
4395,NCT03079687,Expanded Access Program for Olaparib Tablets a...,18.0,120.0
4396,NCT03338387,"Co-Feedback Action of Growth Hormone, PP and P...",18.0,30.0
4397,NCT04646187,De-escalation of Anti-TNF Therapy in Inflammat...,12.0,25.0
4398,NCT05658887,Preoperative Gabapentin vs Placebo for Vaginal...,18.0,120.0


: 