In [None]:
## Change_Log

### Change Log (13/09/2023) ~SIVA

#### 1. Shift to Asynchronous API Requests
Transitioned from a threaded approach to asynchronous programming with `asyncio` and `httpx` to boost data fetching efficiency. The `workers` parameter remains unused in this version and might be removed or repurposed in future iterations.

#### 2. Batch Processing in API Requests
Introduced batch processing to reduce the total number of HTTP requests made during data retrieval. This is facilitated by the new `fetch_data_batch_async` function, making data fetching quicker and smoother.

#### 3. Refined Error Handling and XML Parsing
Enhanced error handling with more robust management of potential XML parsing errors, maintained within the `process_response_batch` function. This enhancement safeguards the robustness of the data fetching process.

#### 4. Data Storage and Aggregation Optimization
Continued using the `concept_data_dict`, a nested dictionary, for efficient data aggregation and streamlined its population process from batch responses, facilitating seamless data updating and extraction.

#### 5. Documentation and Code Readability
Updated docstrings and inline comments to reflect recent changes and enhance code readability, assisting in better maintenance and understanding of the code structure and functionalities.

#### 6. CSV Writing Logic Adjustment
Modified the CSV writing logic to append data in a single instance at the end of the function, reducing IO overhead and optimizing the data writing process.

#### 7. Incorporation of Nest Asyncio
Included `nest_asyncio.apply()` to ensure compatibility of the asyncio event loop with IPython environments, promoting a smoother development and runtime experience across different Python environments.

In [8]:
#V3

In [10]:
# Specify the path to your TSV file
# Gene-Diesese H.Sapians data from CoCoScore FigShare Bucket
file_path = 'Gene_Disease.tsv.gz'

# Specify column names (replace with your actual column names)
column_names = ['PMID', 'Paragraph No', 'Sentence No', 'Diseas ID', 'Gene ID', 'Text', 'Association_label', 'H']

# Read the tab-separated TSV file into a DataFrame
df = pd.read_csv(file_path, sep='\t', names=column_names)

df['PMID'] = df['PMID'].astype(str)

In [5]:
from tqdm import tqdm
import requests
from collections import defaultdict
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
import xml.etree.ElementTree as ET
import asyncio
import httpx
import nest_asyncio
import time
import os
import csv

In [6]:
nest_asyncio.apply()

def fetch_pubmed_concepts_df(df, max_pmid_count, workers=4):
    """
    Function to fetch pubmed concepts and convert it to a pandas DataFrame.

    Args:
    df (pd.DataFrame): Input dataframe with a 'PMID' column.
    max_pmid_count (int): Maximum number of PMIDs to process.
    workers (int): Number of threads to use for parallel requests.

    Returns:
    str: Summary of the fetching statistics.
    """
    
    # Step 1: Create the CSV file with header if it doesn't exist
    if not os.path.exists('pubmed_concepts.csv'):
        with open('pubmed_concepts.csv', 'w', newline='') as file:
            writer = csv.writer(file)
            writer.writerow(["pubmed_id", "concept_name", "count", "gene_id"])
    
    # Load existing pubmed_ids from the CSV to avoid duplications
    existing_data = pd.read_csv('pubmed_concepts.csv')
    existing_pubmed_ids = existing_data['pubmed_id'].tolist()
    
    start_time = time.time()
    api_request_count = 0
    
    pmid_first_n = df['PMID'][:max_pmid_count]
    querylist = pmid_first_n.tolist()
    
    # Step 3: Remove pubmed_ids that are already in the CSV
    querylist = [pmid for pmid in querylist if pmid not in existing_pubmed_ids]

    pubtator_api_url = "https://www.ncbi.nlm.nih.gov/research/pubtator-api/publications/export/biocxml"
    
    response_cache = {}
    
    concept_data_dict = defaultdict(lambda: defaultdict(lambda: {'count': 0}))
    
    with tqdm(total=len(querylist)) as pbar:
        
        async def fetch_data_async(pmid):
            nonlocal api_request_count
            if pmid in response_cache:
                return response_cache[pmid]

            async with httpx.AsyncClient() as client:
                response = await client.get(f"{pubtator_api_url}?pmids={pmid}&concepts=gene")
                if response.status_code == 200:
                    response_cache[pmid] = response.content
                else:
                    print(f"Request for PMID {pmid} failed with status code {response.status_code}")
                api_request_count += 1
                pbar.update(1)
                return response_cache[pmid]

        async def fetch_data_batch_async(pmids):
            nonlocal api_request_count
            pmids_str = ','.join(map(str, pmids))
            async with httpx.AsyncClient() as client:
                response = await client.get(f"{pubtator_api_url}?pmids={pmids_str}&concepts=gene")
                if response.status_code == 200:
                    response_content = response.content
                    for pmid in pmids:
                        response_cache[pmid] = response_content
                else:
                    print(f"Request for PMIDs {pmids_str} failed with status code {response.status_code}")
                api_request_count += 1
                pbar.update(len(pmids))
                return response_content

        def process_response_batch(response_batch, pmid_batch):
            for response, pmid in zip(response_batch, pmid_batch):
                if response:
                    try:
                        root = ET.fromstring(response)
                        for passage in root.iter('passage'):
                            for annotation in passage.iter('annotation'):
                                gene_id = annotation.find("infon[@key='identifier']").text
                                concept_name = annotation.find("text").text.lower()
                                concept_data_dict[pmid][concept_name]['gene_id'] = gene_id
                                concept_data_dict[pmid][concept_name]['count'] += 1
                    except ET.ParseError as e:
                        print(f"XML parsing error for PMID {pmid}: {e}")

        async def main():
            batch_size = 10
            for i in range(0, len(querylist), batch_size):
                pmid_batch = querylist[i:i+batch_size]
                response_batch = await asyncio.gather(*(fetch_data_async(pmid) for pmid in pmid_batch))
                process_response_batch(response_batch, pmid_batch)

        asyncio.run(main())

    concept_data = [
        {'pubmed_id': pmid, 'concept_name': concept_name, **data}
        for pmid, concepts in concept_data_dict.items()
        for concept_name, data in concepts.items()
    ]

    concept_df = pd.DataFrame(concept_data)
    
    concept_df.to_csv('pubmed_concepts.csv', mode='a', header=False, index=False)

    total_time = time.time() - start_time

    stats = f"Stats: \n1. {len(querylist)} number of pubmed_id parsed with {api_request_count} number of API request\n2. Total Time: {total_time:.2f} seconds\n3. Data successfully appended to CSV"
    print(stats)
    return "fetch success"

# Usage example:
# df = pd.DataFrame({'PMID': [1, 2, 3, ...]})
# result_df = fetch_pubmed_concepts_df(df, max_pmid_count=100, workers=4)

In [7]:
fetch_pubmed_concepts_df(df, max_pmid_count=1000, workers=16)

 97%|████████████████████████████████████████████████████████████████████████████▉  | 974/1000 [01:59<00:03,  8.14it/s]

Stats: 
1. 1000 number of pubmed_id parsed with 974 number of API request
2. Total Time: 119.77 seconds
3. Data successfully appended to CSV





'fetch success'

In [11]:
##END##