In [151]:
from alive_progress import alive_bar
import requests
import pandas as pd
from io import BytesIO
from PIL import Image
from io import StringIO
import json
import os
import zipfile
import tarfile
from tqdm.notebook import tqdm
from tqdm.asyncio import tqdm as tqdm_async
import aiohttp
import asyncio

In [152]:
# TODO: see if possible to implement async I/O and just have a single session open instead of creating a million requests 

class GDCDataFetcher:
    GDC_BASE_URL = "https://api.gdc.cancer.gov"
    
    def __init__(self, project_id=None):
        self.project_id = project_id

    def check_status(self):
        """Check the status of the GDC API."""
        url = f"{self.GDC_BASE_URL}/status"
        try:
            response = requests.get(url)
            if response.status_code == 200:
                return response.json()
            else:
                print(f"Error: Unable to retrieve status. HTTP Status Code: {response.status_code}")
                return None
        except requests.exceptions.RequestException as e:
            print(f"An error occurred: {e}")
            return None
    
    def display_available_categories_and_type(self):
        """Get all data_categories and data_types available for the project."""
        url = f"{self.GDC_BASE_URL}/files"

        params = {
            "filters": {
                "op": "in",
                "content": {
                    "field": "cases.project.project_id",
                    "value": [self.project_id]
                }
            },
            "fields": "file_id,file_name,data_category,data_type",
            "format": "JSON",
            "size": "10000" # returns min(size, available_files)
        }

        response = requests.post(url, json=params)

        if response.status_code == 200:
            data = response.json()['data']['hits']
            df = pd.DataFrame(data)
            # display unique data_category and data_type combinations
            print(df[['data_category', 'data_type']].drop_duplicates())
        else:
            print(f"Error fetching data: {response.status_code}")

    def fetch_cases(self, case_ids=None, fields=None):
        """
        Fetch case data for specified case IDs or all cases in the project, 
        and return as a Pandas DataFrame.
        
        :param case_ids: Optional list of case IDs to filter the results.
        :param fields: Optional list of fields to retrieve.
        
        :return: A Pandas DataFrame containing the case data if successful, None otherwise.
        """
        url = f"{self.GDC_BASE_URL}/cases"
        filters = {
            "op": "and",
            "content": []
        }

        if case_ids:
            filters["content"].append({
                "op": "in",
                "content": {
                    "field": "case_id",
                    "value": case_ids
                }
            })
        
        if self.project_id:
            filters["content"].append({
                "op": "in",
                "content": {
                    "field": "project.project_id",
                    "value": [self.project_id]
                }
            })

        if not fields:
            fields = ["submitter_id"]
        
        fields_param = ','.join(fields)

        params = {
            "filters": json.dumps(filters),
            "fields": fields_param,
            "format": "JSON",
            "size": "10000"
        }

        try:
            response = requests.post(url, json=params)
            if response.status_code == 200:
                data = response.json()["data"]["hits"]
                df = pd.DataFrame(data)
                return df
            else:
                print(f"Error fetching cases: {response.status_code}")
                print("Response content:", response.text)
                return None
        except requests.exceptions.RequestException as e:
            print(f"An error occurred during the fetch cases request: {e}")
            return None

    def fetch_file_links(self, case_ids, fields=None, data_categories=None):
        """
        Fetch metadata and files (e.g., images, genomic data) associated with the given case IDs from the GDC API, 
        and return as a Pandas DataFrame. 
        
        :param case_ids: Required list of case IDs.
        :param fields: Optional list of fields to retrieve for each file. If not specified, defaults to 
            retrieving 'file_id', 'file_name', 'case_id', 'data_type', and 'data_category'.
        :param data_types: Optional list of data types to filter the files (e.g., "SVS", "Gene Expression Quantification"). 
                If not specified, retrieves files of all data types.
        
        :return: A Pandas DataFrame containing metadata for the retrieved files. Returns None if no files are found or if an error occurs.
        """
        url = f"{self.GDC_BASE_URL}/files"
        filters = {
            "op": "and",
            "content":[
                {
                "op": "in",
                "content": {
                    "field": "cases.case_id",
                    "value": case_ids
                }
                },
                {
                "op": "=",
                "content": {
                    "field": "access",
                    "value": "open"
                }
                }
            ]
        }

        if data_categories:
            filters["content"].append({
                "op": "in",
                "content": {
                    "field": "data_category",
                    "value": data_categories
                }
            })

        if not fields:
            fields = ["file_id", "file_name", "cases.case_id", "data_category", "data_type", "file_size"]
            
        fields_param = ','.join(fields)

        params = {
            "filters": json.dumps(filters),
            "fields": fields_param,
            "format": "JSON",
            "size": "1000000"
        }

        try:
            response = requests.post(url, json=params)
            if response.status_code == 200:
                file_metadata = response.json()["data"]["hits"]
                if not file_metadata:
                    print("No files found for the provided case IDs.")
                    return None

                df = pd.DataFrame(file_metadata)
                
                return df
            else:
                print(f"Error fetching file links: {response.status_code}")
                print("Response content:", response.text)
                return None
        except requests.exceptions.RequestException as e:
            print(f"An error occurred during the fetch file link request: {e}")
            return None
        
    def download_files_for_case(self, file_metadata, case_id, output_dir="cases"):
        """
        Download all files for a specific case and organize them into 
        data_category folders. Prepend the data_type to the filename to differentiate.

        :param file_metadata: Required list of files associated with the given case_id.
        :param data_types: Required case_id which files are associated with.
        :param output_dir: Optional output directory, default is "./cases".
        :return: None
        """
        case_output_dir = os.path.join(output_dir, case_id)
        
        # creates directory for all files of current case_id
        os.makedirs(case_output_dir, exist_ok=True)

        for _, file_info in file_metadata.iterrows():
            file_id = file_info['file_id']
            file_name = file_info['file_name']
            data_category = file_info['data_category']
            data_type = file_info['data_type']
        
            # creates directory for all files in a particular data_category
            category_folder = os.path.join(case_output_dir, data_category)
            os.makedirs(category_folder, exist_ok=True)

            file_path = os.path.join(category_folder, file_name)

            # check if the file already exists, if so, skip the download
            if os.path.exists(file_path):
                print(f"File {file_name} already exists, skipping download.")
                continue

            # download the file using GDC API 
            # TODO: see if can be optimized, benchmark chunk_size, etc
            url = f"{self.GDC_BASE_URL}/data/{file_id}"
            response = requests.get(url, stream=True)
            
            if response.status_code == 200:
                with open(file_path, 'wb') as f:
                    for chunk in response.iter_content(chunk_size=1024):
                        if chunk:
                            f.write(chunk)
                    print(f"Downloaded {file_name} for case: {case_id} in {data_category}/{file_name}")
            else:
                print(f"Failed to download {file_name} for case: {case_id}")

    def download_files(self, file_metadata, output_dir="cases"):
        """
        Download a specific set of files returned by fetch_file_links() and save it to the output directory, sorted by case_id and category.
        Downloads in order of file_metadata, which is not sorted by case_id! Files will download out of order!
        
        :param file_id: The file_id of the file to be downloaded.
        :param output_dir: Optional output directory, default is "./cases".
        :return: None
        """
        # creating base directory for the file
        os.makedirs(output_dir, exist_ok=True)

        total_data = file_metadata["file_size"].sum()

        with tqdm(total=total_data, unit="B", unit_scale=True, desc="Downloading files", leave=False) as total_pb:
            for _, file_info in file_metadata.iterrows():
                file_id = file_info['file_id']
                file_name = file_info['file_name']
                
                if len(file_info['cases']) > 1:
                    case_id = "GENERAL_METADATA"
                else:
                    case_id = file_info['cases'][0]['case_id']

                data_category = file_info['data_category']
                file_size = file_info['file_size']

                case_output_dir = os.path.join(output_dir, case_id)
                os.makedirs(case_output_dir, exist_ok=True)

                category_folder = os.path.join(case_output_dir, data_category)
                os.makedirs(category_folder, exist_ok=True)

                file_path = os.path.join(category_folder, file_name)

                if os.path.exists(file_path):
                    # print(f"File {file_name} for case {case_id} already exists, skipping download.")
                    total_pb.update(file_size)
                    continue

                temp_file_path = file_path + ".part"

                if os.path.exists(temp_file_path):
                    print(f"Partially downloaded file {temp_file_path} found. Removing it.")
                    os.remove(temp_file_path)

                url = f"{self.GDC_BASE_URL}/data/{file_id}"
                response = requests.get(url, stream=True)
                if response.status_code == 200:
                    with tqdm(total=file_size, unit="B", unit_scale=True, desc=f"Downloading {file_name}", leave=False) as file_pb:
                        with open(temp_file_path, 'wb') as f:
                            for chunk in response.iter_content(chunk_size=1024):
                                if chunk:
                                    f.write(chunk)
                                    file_pb.update(len(chunk))
                                    total_pb.update(len(chunk))
                        file_pb.close()
                    os.rename(temp_file_path, file_path)
                    # print(f"Downloaded {file_name} for case: {case_id} in {data_category}/{file_name}")
                else:
                    print(f"Failed to download {file_name} for case: {case_id}")
    
    async def download_files_async(self, file_metadata, output_dir="cases"):
        """
        Asynchronously download files from GDC using file metadata.

        :param file_metadata: DataFrame containing file metadata with columns 'file_id', 'file_name', 'cases', 'data_category', and 'file_size'.
        :param output_dir: Directory where files will be saved, defaults to "cases".
        """

        os.makedirs(output_dir, exist_ok=True)
        total_size = file_metadata["file_size"].sum()

        async def download_file(session, file_info, progress_bar):
            file_id = file_info["file_id"]
            file_name = file_info["file_name"]
            case_id = "GENERAL_METADATA" if len(file_info["cases"]) > 1 else file_info["cases"][0]["case_id"]
            data_category = file_info["data_category"]
            file_size = file_info["file_size"]

            case_dir = os.path.join(output_dir, case_id)
            os.makedirs(case_dir, exist_ok=True)
            category_dir = os.path.join(case_dir, data_category)
            os.makedirs(category_dir, exist_ok=True)

            temp_file_path = os.path.join(category_dir, f"{file_name}.part")
            final_file_path = os.path.join(category_dir, file_name)
            
            if os.path.exists(final_file_path):
                #print("file exists already")
                progress_bar.update(file_size)
                progress_bar.refresh()
                return
            elif os.path.exists(temp_file_path):
                os.remove(temp_file_path)

            url = f"{self.GDC_BASE_URL}/data/{file_id}"

            try:
                async with session.get(url) as response:
                    if response.status == 200:
                        with open(temp_file_path, 'wb') as f:
                            total_downloaded = 0
                            async for chunk in response.content.iter_any():
                                if chunk:
                                    f.write(chunk)
                                    total_downloaded += len(chunk)
                                    progress_bar.update(len(chunk))
                                    progress_bar.set_postfix({"Downloaded": f"{total_downloaded}/{file_size}"})
                                    progress_bar.refresh()
                                    
                        os.rename(temp_file_path, final_file_path)
                    else:
                        print(f"Failed to download {file_name}: {response.status}")
            except Exception as e:
                print(f"Error downloading {file_name}: {e}")

        async with aiohttp.ClientSession() as session:
            with tqdm_async(total=total_size, unit="B", unit_scale=True, desc=f"Downloading files", leave=False) as progress_bar:
                tasks = [
                    download_file(session, file_info, progress_bar)
                    for _, file_info in file_metadata.iterrows()
                ]
                await asyncio.gather(*tasks)


In [153]:
gdc_fetcher = GDCDataFetcher(project_id="TCGA-COAD")

cases = gdc_fetcher.fetch_cases()
cases = cases[:10]

# data_categories = [
#     "Clinical", 
#     "Biospecimen", 
#     "DNA Methylation", 
#     "Copy Number Variation", 
#     "Simple Nucleotide Variation", 
#     "Transcriptome Profiling",
#     "Sequencing Reads"
# ]

data_categories = ["Clinical", "Biospecimen", "DNA Methylation"]

print("Found", cases["id"].size, "cases.")

Found 10 cases.


In [154]:
os.system("")

case_metadata = {}
total_cases = len(cases)

case_ids = [case.id for case in cases.itertuples()]

# Fetch metadata for all case IDs in a single query
file_metadata = gdc_fetcher.fetch_file_links(case_ids, data_categories=data_categories)

if file_metadata is not None:
    total_files = file_metadata.shape[0]
    total_size_GB = file_metadata["file_size"].sum() / 2**30
    print(f"Found {total_files} files with {total_size_GB:.3f} GB.")

Found 137 files with 6.390 GB.


In [157]:
#gdc_fetcher.download_files(file_metadata, "cases_TEST_nonasync")
await gdc_fetcher.download_files_async(file_metadata, "cases_TEST_TRAIN_100")

                                                                        