In [1]:
import tempfile
import tarfile
import pandas as pd
import concurrent.futures

from tqdm import tqdm
from pathlib import Path
from concurrent.futures import as_completed

from api_helpers import get_tcga_projects, get_uuid_from_manifest, download_files, map_file_name_to_file_uuid, map_file_uuid_to_sample_id

# List all TCGA projects

In [2]:
tcga_projects = get_tcga_projects()

# Generate expression data files for each TCGA project based on the manifest files

Can take more than 30min to complete.

In [9]:
def retrieve_expression_data(project: str):
    file_uuids = get_uuid_from_manifest(f"manifest/expression_data/{project}.tsv")
    fname_to_file_uuid = map_file_name_to_file_uuid(f"manifest/expression_data/{project}.tsv")
    file_uuid_to_sample_id = map_file_uuid_to_sample_id(file_uuids)

    file_path = f"dask/raw/{project}.csv"
    path = Path(file_path)

    if path.exists():
        return f"{project} already exists."

    with tempfile.TemporaryDirectory() as temp_dir:
    
        with tarfile.open(fileobj=download_files(file_uuids)) as tar:
            tar.extractall(temp_dir)

        dfs = []
        temp_dir_path = Path(temp_dir)
        for csv_file in temp_dir_path.glob('**/*.tsv'):
            file_uuid = fname_to_file_uuid[csv_file.name]
            sample_id = file_uuid_to_sample_id[file_uuid]

            df = pd.read_csv(csv_file, sep='\t', skiprows=[0,2,3,4,5])
            df = df[['gene_id', 'tpm_unstranded']] # df['gene_type'] == 'protein_coding']
            # if we remove version number there are duplicate gene_ids
            # df['gene_id'] = df['gene_id'].str.split('.').str[0] 
            df = df.set_index('gene_id')
            df = df.rename(columns={'tpm_unstranded': sample_id})
            dfs.append(df)

        final_df = pd.concat(dfs, axis=1)
        final_df = final_df.T
        final_df.index.name = 'samples'
        final_df.columns.name = None
        # final_df.to_parquet(f"data/{project}.parquet", index=True)
        final_df.to_csv(file_path, sep=',')

    return f"Done processing {project}"


with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = {executor.submit(retrieve_expression_data, project) for project in tcga_projects}

    for future in tqdm(as_completed(futures), total=len(futures)):
        # This will raise an exception if one occurred in the thread.
        future.result()



  0%|          | 0/3 [00:00<?, ?it/s]

100%|██████████| 3/3 [15:06<00:00, 302.14s/it]


In [4]:
tcga_projects[:5]

['TCGA-BRCA', 'TCGA-THCA', 'TCGA-UCEC', 'TCGA-DLBC', 'TCGA-COAD']

In [5]:
tcga_projects[5:10]

['TCGA-CESC', 'TCGA-BLCA', 'TCGA-CHOL', 'TCGA-ESCA', 'TCGA-ACC']

In [6]:
tcga_projects[:10]


['TCGA-BRCA',
 'TCGA-THCA',
 'TCGA-UCEC',
 'TCGA-DLBC',
 'TCGA-COAD',
 'TCGA-CESC',
 'TCGA-BLCA',
 'TCGA-CHOL',
 'TCGA-ESCA',
 'TCGA-ACC']