Read sample table from Terra workspace

In [1]:
import dalmatian
from dataclasses import dataclass
import pandas as pd


namespace = "broad-firecloud-ccle"
workspaces = ["DepMap_WES_CN_hg38", "DepMap_WGS_CN"]
dest_dataset = "depmap-omics.maf_staging_0916"


In [2]:

@dataclass
class Transfer:
    srcs : str
    dest_table : str
    cds_id : str

def get_transfers(workspace):
    wm = dalmatian.WorkspaceManager(f"{namespace}/{workspace}")

    sample = wm.get_entities("sample")
    sample = sample.reset_index()

    transfers = []
    for rec in sample.to_dict("records"):
        if isinstance(rec['full_file'], list):
            dest_table = f"{dest_dataset}.stage_maf_{rec['sample_id'].replace('-', '_')}"
            transfers.append(Transfer(rec['full_file'], dest_table, rec["sample_id"]))

    return transfers

transfers = []
for workspace in workspaces:
    transfers.extend(get_transfers(workspace))

In [3]:
len(transfers)

2395

Create "external" tables, one per cds_id from the associated uris

In [4]:
from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()
import re

from google.api_core.exceptions import Conflict

def create_ext_table(srcs, dest_table, job_prefix):
    table = bigquery.Table(dest_table)

    external_config = bigquery.ExternalConfig(bigquery.external_config.ExternalSourceFormat.PARQUET)
    external_config.source_uris = srcs
    table.external_data_configuration = external_config

    client.create_table(table, exists_ok=True)


In [5]:
from tqdm.notebook import tqdm

def cleanup_uris(uris):
    result = []
    for uri in uris:
        result.extend([x.strip() for x in uri.split(",")])
    return list(set(result))

#transfers = transfers[:10]

# create a bunch of tables which correspond to CDS_IDs, because we want to add CDS_ID as a column
for transfer in tqdm(transfers):
    uris = cleanup_uris(transfer.srcs) # at least one row has a entry which looks like a string containing a comma seperated list instead of a real list
    create_ext_table(uris, transfer.dest_table, "t2")


  0%|          | 0/2395 [00:00<?, ?it/s]

In [None]:
table = client.get_table(table_id)

In [7]:
transfer

Transfer(srcs=['gs://fc-secure-bd7b8bc9-f665-4269-997e-5a402088a369/submissions/745d8e9e-5651-453e-842a-7add2f3c16b5/run_vcf_to_depmap/15616287-c648-4836-8a34-3296d3fe2638/call-vcf_to_depmap/glob-5efb9cd35734f694b162198c2ed1e6ac/238d247a228040148d3a63ab161ad46e-0.parquet', 'gs://fc-secure-bd7b8bc9-f665-4269-997e-5a402088a369/submissions/745d8e9e-5651-453e-842a-7add2f3c16b5/run_vcf_to_depmap/15616287-c648-4836-8a34-3296d3fe2638/call-vcf_to_depmap/glob-5efb9cd35734f694b162198c2ed1e6ac/89cb21285167491e98a7aa0267a2256d-0.parquet', 'gs://fc-secure-bd7b8bc9-f665-4269-997e-5a402088a369/submissions/745d8e9e-5651-453e-842a-7add2f3c16b5/run_vcf_to_depmap/15616287-c648-4836-8a34-3296d3fe2638/call-vcf_to_depmap/glob-5efb9cd35734f694b162198c2ed1e6ac/935cda2204e547d9a00128a5cf086676-0.parquet', 'gs://fc-secure-bd7b8bc9-f665-4269-997e-5a402088a369/submissions/745d8e9e-5651-453e-842a-7add2f3c16b5/run_vcf_to_depmap/15616287-c648-4836-8a34-3296d3fe2638/call-vcf_to_depmap/glob-5efb9cd35734f694b162198c2ed

Copy from the external table into a single table adding the cds_id to each row.

In [6]:
from tqdm.notebook import tqdm
import pandas as pd

def create_batches(data, batch_size):
    return [data[x:x+batch_size] for x in range(0, len(data), batch_size)]

# Concatenate table adding CDS_ID to the dest table
def concatenate_tables(dest_table, transfers, job_prefix, parallelism):
    create_table_stmt = f"create table if not exists {dest_table} as select 'invalid' CDS_ID, * from `{transfers[0].dest_table}` limit 0"
    job = client.query(create_table_stmt)
    job.result() # wait for completion

    # figure out which cds_ids have already been loaded
    already_loaded = set(pd.read_gbq(f"""select distinct cds_id from `{dest_dataset}.merged_maf` """)["cds_id"])
    
    # drop transfers already loaded
    remaining_transfers = [x for x in transfers if x.cds_id not in already_loaded]
    print(f"{len(already_loaded)} CDS IDs already loaded. {len(remaining_transfers)} of {len(transfers)} tables need to be loaded")
    transfers = remaining_transfers
          
    batches = create_batches(transfers, parallelism)
    #return
    
    for batch in tqdm(batches, desc="batch", position=0):
        jobs = []

        # submit a batch to run in parallel
        for transfer in tqdm(batch, desc=" submit", position=1, leave=False):
            append_stmt = f"insert into {dest_table} select '{transfer.cds_id}' CDS_ID, * from {transfer.dest_table} where hugo_symbol != '' and hugo_symbol is not NULL"

            job = client.query(append_stmt)            
            jobs.append(job)
        
        # wait for batch to complete
        for job in tqdm(jobs, desc=" wait", position=1, leave=False):            
            job.result() # wait for completion
        

In [8]:
concatenate_tables(f"{dest_dataset}.merged_maf", transfers, "t5", parallelism=10)

0 CDS IDs already loaded. 2395 of 2395 tables need to be loaded


batch:   0%|          | 0/240 [00:00<?, ?it/s]

 submit:   0%|          | 0/10 [00:00<?, ?it/s]

 wait:   0%|          | 0/10 [00:00<?, ?it/s]

BadRequest: 400 Query column 6 has type INT64 which cannot be inserted into column civic_description, which has type STRING at [1:54]

Location: US
Job ID: 0c018b91-a1cd-49c2-b624-bfd798c57878


In [18]:
from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()
q = client.query(f"""select * from (
  SELECT chrom, pos, variant_type, count(1) mut_count FROM `{dest_dataset}.merged_maf` 
  where hugo_symbol='BRAF'
  group by chrom, pos, variant_type) where mut_count > 10
  LIMIT 1000""")


In [19]:
result = q.result()

In [20]:
pd.DataFrame(result)

In [21]:
result

<google.cloud.bigquery.table.RowIterator at 0x17ded4640>

In [27]:
import pandas as pd
df = pd.read_gbq(f"""select * from (
  SELECT chrom, pos, variant_type, count(1) mut_count FROM `{dest_dataset}.merged_maf` 
  group by chrom, pos, variant_type)
  LIMIT 1000""")

In [28]:
df

Unnamed: 0,chrom,pos,variant_type,mut_count
0,chr1,144412187,SNP,1
1,chr22,22690933,DNP,2
2,chr1,177976318,DNP,1
3,chr3,159842127,DNP,1
4,chr7,5030546,SNP,1
...,...,...,...,...
995,chr3,167868549,SNP,1
996,chr2,239529175,SNP,1
997,chr8,12036530,SNP,4
998,chr8,43247356,SNP,6


In [4]:
from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()
import re

from google.api_core.exceptions import Conflict



In [8]:
tables = list(client.list_tables("depmap-omics.maf_staging_0916"))

In [19]:
stage_mafs = [x.table_id[len("stage_maf_"):].replace("_", "-") for x in tables if x.table_id.startswith("stage_maf_CDS")]

In [21]:
import pandas as pd
df = pd.read_gbq(f"""select distinct cds_id from `{dest_dataset}.merged_maf` """)

In [23]:
set(stage_mafs).difference(set(df["cds_id"]))

{'CDS-dhBHhw'}

In [28]:
jobs = list(client.list_jobs())

In [32]:
missing = [x for x in jobs if "CDS_dhBHhw" in x.job_id]

In [39]:
missing[0].errors

[{'reason': 'internalError',
  'message': 'An internal error occurred and the request could not be completed. This is usually caused by a transient issue. Retrying the job with back-off as described in the BigQuery SLA should solve the problem: https://cloud.google.com/bigquery/sla. If the error continues to occur please contact support at https://cloud.google.com/support.'}]

In [40]:
missing

[QueryJob<project=broad-achilles, location=US, id=concat_t5_stage_maf_CDS_dhBHhw>,
 QueryJob<project=broad-achilles, location=US, id=concat_t3_stage_maf_CDS_dhBHhw>]

In [44]:
just_one_transfer = [x for x in transfers if "CDS_dhBHhw" in x.dest_table]


In [62]:
concatenate_tables(f"{dest_dataset}.merged_maf", just_one_transfer, "t6", parallelism=10)

2514 CDS IDs already loaded. 0 of 1 tables need to be loaded


In [49]:
job

QueryJob<project=broad-achilles, location=US, id=160ccb99-d142-4fc9-9687-8362f1a286f6>

In [51]:
missing[0].result()

InternalServerError: 500 An internal error occurred and the request could not be completed. This is usually caused by a transient issue. Retrying the job with back-off as described in the BigQuery SLA should solve the problem: https://cloud.google.com/bigquery/sla. If the error continues to occur please contact support at https://cloud.google.com/support.

Location: US
Job ID: concat_t5_stage_maf_CDS_dhBHhw


In [53]:
transfers[0]

Transfer(srcs=['gs://fc-secure-d2a2d895-a7af-4117-bdc7-652d7d268324/submissions/c46ec126-d6b9-4664-b7b7-536ce97d9d0e/run_vcf_to_depmap/ac80bb6b-40aa-491d-9cbe-cccdb9b3e42a/call-vcf_to_depmap/glob-233ae0abbeccb769e7426b0faeb6b946/b953a6d9f73d4464bcb7a4a64a09bd31-0.parquet'], dest_table='depmap-omics.maf_staging_0916.stage_maf_CDS_00rz9N', cds_id='CDS-00rz9N')