In [1]:
%pip install requests pandas tqdm

Note: you may need to restart the kernel to use updated packages.


In [2]:
import requests  # Library for making HTTP requests
import pandas as pd  # Library for data manipulation and analysis
import uuid  # Library for generating unique identifiers
from datetime import datetime  # Library for handling date and time
from pathlib import Path  # Library for working with file system paths
from tqdm import tqdm  # Library for displaying progress bars
import logging  # Library for logging messages

# Setup logging configuration
logging.basicConfig(filename='fetch_studies.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def fetch_data(base_url, params):
    """ 
    Function to fetch data from the API and return JSON response. 
    
    Args:
        base_url (str): The base URL of the API endpoint.
        params (dict): Parameters to be passed in the request.

    Returns:
        dict or None: The JSON response if successful, None otherwise.
    """
    try:
        response = requests.get(base_url, params=params)
        response.raise_for_status()  # Check for any HTTP errors
        return response.json()  # Return the JSON response
    except requests.RequestException as e:
        logging.error(f"Request failed: {e}")  # Log error message
        print(f"Request failed: {e}")
        return None

def save_to_parquet(data, output_folder, pipeline_run_id, pipeline_start_timestamp, file_part):
    """ 
    Function to save data to a Parquet file and return the file path. 
    
    Args:
        data (list): List of dictionaries representing the data.
        output_folder (str or Path): Path to the output folder.
        pipeline_run_id (str): Unique ID for each pipeline run.
        pipeline_start_timestamp (timestamp): Timestamp when the pipeline run starts.
        file_part (int): Part number of the file.

    Returns:
        str: Path to the saved Parquet file.
    """
    # Convert data to a pandas DataFrame
    df = pd.json_normalize(data)
    df['pipeline_run_id'] = pipeline_run_id
    df['pipeline_start_timestamp'] = pipeline_start_timestamp
    # Construct the output file path
    output_file = output_folder / f"clinical_trials_run_{pipeline_run_id}_part{file_part}.parquet"
    # Save the DataFrame to a Parquet file
    df.to_parquet(output_file, index=False)
    return output_file  # Return the file path

def fetch_studies(from_date=None, pageSize=100, pagePerParquet=50):
    """ 
    Function to fetch studies from the API and save them to Parquet files. 
    
    Args:
        from_date (str, optional): Specifies the date in YYYY-MM-DD format from which to fetch studies. If provided, the function will pull only studies that have been updated on or after this date. Defaults to None.
        pageSize (int, optional): Number of studies to fetch per page. Defaults to 100.
        pagePerParquet (int, optional): Number of pages to save per Parquet file. Defaults to 50.
    """
    # Create the output folder if it does not exist
    output_folder = Path.cwd() / "output_parquet"
    output_folder.mkdir(exist_ok=True)

    base_url = "https://clinicaltrials.gov/api/v2/studies"  # Base URL of the API endpoint
    params = {"format": "json", "countTotal": "true", "pageSize": pageSize}  # Parameters for the API request
    if from_date:
        params["query.term"] = f"AREA[LastUpdateSubmitDate]RANGE[{from_date},MAX]"  # Add date filter if provided

    # Fetch initial response to determine total count of studies
    initial_response = fetch_data(base_url, params)
  
    # Extract total count of studies and calculate total pages and files
    total_count = initial_response.get('totalCount', 0)
    total_pages = (total_count // pageSize) + (total_count % pageSize > 0)
    total_files = (total_pages // pagePerParquet) + (total_pages % pagePerParquet > 0)
    if total_count == 0:
        logging.info('Request succeeded but no studies fetched.')  # Log info message
        print('Request succeeded but no studies fetched.')
    else:
        print(f"Total count of the studies: {total_count}; Total pages to be fetched: {total_pages}; Total parquet files to be saved: {total_files}")

    # Initialize variables for tracking progress
    all_studies = []
    page_count = 0
    file_part = 1
    pipeline_run_id = str(uuid.uuid4())
    pipeline_start_timestamp = datetime.now()

    # If the response is not empty, loop through pages to fetch all studies
    if total_count != 0:    
        pbar = tqdm(total=total_pages, desc="Fetching pages", leave=True, position=0)
        nextPageToken = None
        while True:
            # Fetch data from the API
            if nextPageToken:
                params["pageToken"] = nextPageToken
            json_response = fetch_data(base_url, params)
    
            # Extract studies from the JSON response
            page_studies = json_response.get('studies', [])
            nextPageToken = json_response.get('nextPageToken')
            all_studies.extend(page_studies)
            page_count += 1
            pbar.update(1)  # Update progress bar
    
            # Save to Parquet every pagePerParquet pages or at the last page
            if page_count % pagePerParquet == 0 or not nextPageToken:
                output_file = save_to_parquet(all_studies, output_folder, pipeline_run_id, pipeline_start_timestamp, file_part)
                logging.info(f"Data written to {output_file}")  # Log file path
                all_studies = []  # Clear list for the next batch
                file_part += 1
    
            if not nextPageToken:
                break  # Exit loop if there are no more pages

        # Close the progress bar
        pbar.close()

if __name__ == "__main__":
    # Example usage: Fetch studies updated since April 10, 2024, and save every 100 pages to 1 parquet file
    fetch_studies('2024-04-10',pagePerParquet=100)
 
        

Total count of the studies: 4719; Total pages to be fetched: 48; Total parquet files to be saved: 1


Fetching pages: 100%|██████████████████████████████████████████████████████████████████| 48/48 [00:59<00:00,  1.24s/it]
