In [2]:
from __future__ import print_function
import os.path
import dalmatian as dm
import pandas as pd
import sys
pathtoJK = "../JKBio"
sys.path.insert(0, pathtoJK)
import TerraFunction as terra
import CCLF_processing as cclf
from IPython.core.debugger import set_trace

from Helper import *
import numpy as np
from gsheets import Sheets
# https://github.com/jkobject/JKBIO

"""
Log into the Google Developers Console with the Google account whose spreadsheets you want to access.
Create (or select) a project and enable the Drive API and Sheets API (under Google Apps APIs).

https://console.developers.google.com/

Go to the Credentials for your project and create New credentials > OAuth client ID > of type Other.
In the list of your OAuth 2.0 client IDs click Download JSON for the Client ID you just created.
Save the file as client_secrets.json in your home directory (user directory).
Another file, named storage.json in this example, will be created after successful authorization
to cache OAuth data.

On you first usage of gsheets with this file (holding the client secrets),
your webbrowser will be opened, asking you to log in with your Google account to authorize
this client read access to all its Google Drive files and Google Sheets.
"""
sheets = Sheets.from_files('~/.client_secret.json', '~/.storage.json')
replace = {'T': 'Tumor', 'N': 'Normal', 'm': 'Unknown', 'L': 'Unknown'}

# CCLF TWIST Pipeline

*go to the [readme](./README.md) to see more about execution*



This pipeline has the following major steps:
1. Pull in information about the TWIST batch(es) from Google sheet(s).
2. Create a TSV of the new sample information
3. Create a TSV of the new sample set information (e.g. cohorts)
4. Upload the sample information and sample set TSVs to the Terra workspace 
5. Run Terra workflows to get copy number (CNV) and mutation (SNV) information, and to create copy number heat maps by batch and by cohort.


# Initialization
Pull in information about the TWIST batch(es) from Google sheet(s).

**Note:** The following cell contains a lot of information that needs to be changed each time this pipeline is run.

You would want to write the samplesetnames you are interested in and h

In [3]:
# create sample set names for each batch
# if you only have one batch to run, still make it a list e.g. ["CCLF_TWIST1"]
# this ensures that the pipeline will run as designed
samplesetnames = ["CCLF_TWIST1","CCLF_TWIST2","CCLF_TWIST3","CCLF_TWIST4", "CCLF_TWIST5"]
# samplesetnames = ["CCLF_TWIST1"]


# generate the sample set names we will use in Terra
samplesetnames_normals = [s + '_normals' for s in samplesetnames]
samplesetnames_tumors = [s + '_tumors' for s in samplesetnames]
samplesetnames_pairs = [s + '_pairs' for s in samplesetnames]
samplesetnames_all = [s + '_all' for s in samplesetnames]

# workspace where we are pulling in the data from
data_workspace="broad-genomics-delivery/Cancer_Cell_Line_Factory_CCLF_PanCancer_PanelSeq"
# workspace where we are running the workflows
proc_workspace="nci-mimoun-bi-org/PANCAN_TWIST copy"

source="CCLF"

picard_aggregation_type_validation="PCR"
forcekeep=[]
# mapping abbreviations to full names/descriptions
cohorts2id="https://docs.google.com/spreadsheets/d/1R97pgzoX0YClGDr5nmQYQwimnKXxDBGnGzg7YPlhZJU"

# list of the external sheets produced for each batch you want to run through the pipeline
gsheeturllist = ["https://docs.google.com/spreadsheets/d/1LR8OFylVClxf0kmZpAdlVjrn3RBcfZKpNoDYtKdnHB8", #TWIST1
"https://docs.google.com/spreadsheets/d/1S3DqBdVkd9dLP1PDYcdSWuD2Iy2gJpzuYBhvmP37UxU", # TWIST2
"https://docs.google.com/spreadsheets/d/1kVIeIw66AxWLhAZlqUnAY17S87Rtfhijf1o3x0hG3Jw", # TWIST3
"https://docs.google.com/spreadsheets/d/1tZQpxag7BO46pei3s_KaoHvxwN9EVESk3xYvzW7f7Uo/", # TWIST4
"https://docs.google.com/spreadsheets/d/1iqVDNPJcMLbNgfdmoBIHwaDF4yaXlyeVFxEwu_fWsEo"] # TWIST5                ]

In [4]:
wfrom = dm.WorkspaceManager(data_workspace)
wto = dm.WorkspaceManager(proc_workspace)

# Getting the samples

- we load the samples from data workspace and load the metadata files
- we remove data that has already been processed
- we create the final ids

In [None]:
# wto.upload_samples(pd.DataFrame({'sample_id':['NA'], 'participant_id':['NA'], 'external_id_validation':['NA'], 'sample_type':['NA']}).set_index('sample_id'))

In [None]:
# we look at all the samples we already have in the TWIST workspace
refsamples = wto.get_samples()
refids = refsamples.index

# get the External sheet data from google sheets
gsheets = [sheets.get(url).sheets[0].to_frame() for url in gsheeturllist]

In [None]:
# add a column with batch information (e.g. CCLF_TWIST1 vs CCLF_TWIST2)
metadata = pd.concat(gsheets,sort=False, keys = samplesetnames)
metadata = metadata.reset_index().rename(columns = {'level_0':'batch', "External ID":'external_id_validation'}).drop(['level_1'], axis = 'columns')
print(len(metadata))

# we use this gsheet package to get all the sheets into one dataframe
cohorts = sheets.get(cohorts2id).sheets[0].to_frame()

# we look at all the samples we already have in Terra
# we do some corrections just in case
samples1 = wfrom.get_samples().replace(np.nan, '', regex=True)

# creating sample_id (like in processing workspace) for metadata and samples1
newmetadata = metadata.dropna(0, subset=['Collaborator Sample ID','Sample Type','Exported DNA SM-ID'])
print("dropped indices: "+str(set(metadata.index.tolist())-set(newmetadata.index.tolist())))
print('new length: '+str(len(newmetadata)))
metadata=newmetadata

ttype = [i for i in metadata["Sample Type"]]
metadata['sample_id'] = [str(val['Collaborator Sample ID'][:-1]) + '-' + str(val['Sample Type']) + '-' + str(val['Exported DNA SM-ID']) for i, val in metadata.iterrows()]

samples1.index = [i.split('_')[2] for i, val in samples1.iterrows()]

samples1['sample_id'] = [str(val["individual_alias"]) + '-' + str(val['sample_type']) + '-' + i for i, val in samples1.iterrows()]
metadata.index = metadata['Exported DNA SM-ID']
# filtering on what already exists in the processing workspace (refids)
newsamples = samples1[(~samples1.index.isin(refids)) | samples1.index.isin(forcekeep)]
tokeep = set(metadata.index) & set(newsamples.index)
len(tokeep)

In [None]:
# useful to merge the two df, sm-id is one of the only unique id here
if len(newsamples[~newsamples.index.isin(tokeep)]) > 0:
    print('we could not add these samples from the data workspace as we don\'t have metadata for them: ' + '\n' 
          + str(newsamples[~newsamples.index.isin(tokeep)].index))
newsamples = newsamples[newsamples.index.isin(tokeep)]
newmetadata = metadata[metadata.index.isin(tokeep)].sort_index().drop_duplicates("Exported DNA SM-ID")
newsamples.shape

# Creating the sample information dataframe
Create a dataframe of the new sample information

**Note:** It can be difficult to recreate the sample_info variable below after you have already uploaded TSVs to Terra since this pipeline specifically looks for samples that do not already exist in the workspace. When running the pipeline on a new batch of data, **I recommend writing the final sample_info to a file.**

**Note 2:** We replace all "/" in the External IDs with "_". This prevents errors when filepaths are created using the external IDs in Terra.

In [None]:
# sanity check: this should match some of the data in the External Sheet
print(newmetadata[['batch','external_id_validation']].head().to_string())

## I think this cell and the following cell needs to be moved prior to the creation of the sample_info dataframe - we didn't remove *any* samples from TWIST1-4 that were missing these metadata columns
We do not include samples that were missing information in any of the following columns in the external sheet:
- Collaborator Participant ID
- Exported DNA SM-ID
- Stock DNA SM-ID
- Patient ID <- not sure about adding this requirement, but it will be used when plotting the CNV heat maps
- Sample Type
- ~~Tumor Type~~ <- this won't be populated for normals.
- Original Material Type
- Material Type
- Primary Disease <- this only works if the normals also have a primary disease associated with them, which they should. Only the technical controls won't have this information.
- ~~Media on Tube~~ <- tissue samples won't have a media but we do want to include them
- Collection
- Tissue Site <- This column should eventually be populated

Without this list of metadata, the samples will not be added to Terra.

In [None]:
# merge the data from the External Sheet(s) and the data from the data source (e.g. Broad genomics delivery)
df = pd.concat([newmetadata, newsamples], axis=1, sort=True)

In [None]:
print('Since they don\'t have full data, we will be dropping ' + str(len(df.iloc[[j for j,i in enumerate(df[['Collaborator Participant ID','Exported DNA SM-ID',
                                              'Stock DNA SM-ID', 'Participant ID', 'Sample Type','Tissue Site',
                                              'Original Material Type', 'Material Type','Primary Disease',
                                              'Collection']].isna().values.sum(1)) if i !=0]].index.tolist())) + ' samples: \n' + 
      str(df.iloc[[j for j,i in enumerate(df[['Collaborator Participant ID','Exported DNA SM-ID',
                                              'Stock DNA SM-ID', 'Participant ID', 'Sample Type','Tissue Site',
                                              'Original Material Type', 'Material Type','Primary Disease',
                                              'Collection']].isna().values.sum(1)) if i !=0]].index.tolist()))



# examine which columns in the External Sheet had missing information
print('\nNumber of NAs for each required column:')
print(df[['Collaborator Participant ID','Exported DNA SM-ID',
                                              'Stock DNA SM-ID', 'Participant ID', 'Sample Type','Tissue Site',
                                              'Original Material Type', 'Material Type','Primary Disease',
                                              'Collection']].isna().sum())

In [None]:
print('We have now dropped the samples for which we didn\'t have full data')
# only keep samples that have all the appropriate information
df = df.iloc[[j for j,i in enumerate(df[['Exported DNA SM-ID','Collaborator Participant ID',
                                         'Stock DNA SM-ID', 'Participant ID', 'Sample Type','Tissue Site',
                                         'Original Material Type', 'Material Type','Primary Disease',
                                         'Collection']].isna().values.sum(1)) if i ==0]]

In [None]:
print('creating new sample information df')
# from this filtered set of samples (df) we create a dataframe which will get uploaded to terra
sample_info = df[['crai_or_bai_path', 'cram_or_bam_path']]
sample_info['batch'] = df['batch'].astype(str)
sample_info['individual_id'] = df['Collaborator Participant ID'].astype(str)
sample_info['reference_id'] = df['Exported DNA SM-ID'].astype(str)
sample_info['patient_id'] = df['Participant ID'].astype(str) ## cehck: this is a new column in the external sheet. The name may change.
sample_info['participant'] = df['Collaborator Participant ID'].astype(str)
sample_info['aggregation_product_name_validation'] = df['bait_set'].astype(str)
# here we add this number as the reference id might be present many times already for different samples
# in the processing workspace

# start building external_id_validation column
sample_info['external_id_validation'] = 'nan'
# for each SM-ID:
for i in range(len(sample_info['reference_id'])):
    # get the external id for the sample
    ext_id_for_sample = df[df.index == sample_info['reference_id'][i]]['external_id_validation'].values[0]
    # replace any "/" that exist with "_"; otherwise get errors because looks like new directory when try to build file paths
    ext_id_for_sample = ext_id_for_sample.replace('/', '_') #[ext_id_for_sample.replace('/', '_') for ext_id in ext_id_for_sample]
    
    # tack on a number to distinguish external IDs that we have run more than once
    # using str.contains because we want to ignore the tacked on numbers we've added to the ext_id (e.g. _1, _2)
    # num of samples with this ext_id already in the workspace
    num_in_workspace = refsamples[refsamples.external_id_validation.str.contains(ext_id_for_sample)].shape[0]
    # num of samples with this ext_id that we've already seen in the data we're adding
    try:
        num_already_seen_here = sample_info[sample_info.external_id_validation.str.contains(ext_id_for_sample)].shape[0]
        num_to_add = num_in_workspace + num_already_seen_here + 1
    except:
        num_to_add = num_in_workspace + 1
    sample_info['external_id_validation'][i] = ext_id_for_sample + '_' + str(num_to_add)

sample_info['bsp_sample_id_validation'] = df.index.astype(str)
sample_info['stock_sample_id_validation'] = df['Stock DNA SM-ID'].astype(str)
sample_info['sample_type'] = df['Sample Type'].astype(str)
sample_info['picard_aggregation_type_validation'] = [picard_aggregation_type_validation] * sample_info.shape[0]
sample_info['tumor_subtype'] = df['Tumor Type'].astype(str)
sample_info['squid_sample_id_validation'] = sample_info['external_id_validation']
sample_info['source_subtype_validation'] = df['Original Material Type'].astype(str)
sample_info['processed_subtype_validation'] = df['Material Type'].astype(str)
sample_info['primary_disease'] = df['Primary Disease'].astype(str)
sample_info['media'] = df['Media on Tube'].astype(str)
sample_info['Collection'] = df['Collection'].astype(str)
# match collection data and error out
cohortlist = []
for k, val in sample_info['Collection'].iteritems():
    res = cohorts[cohorts['Name'] == val]
    if len(res) == 0:
        print("we do not have a corresponding cohort for this collection for sample: " + str(k))
        cohortlist.append('nan')
    else:
        cohortlist.append(res['ID'].values[0])
sample_info['cohorts'] = cohortlist

sample_info['tissue_site'] = df['Tissue Site'].astype(str)
sample_info['source'] = [source] * sample_info.shape[0]
sample_info['sample_id'] = df.index.astype(str)

sample_info = sample_info.set_index('sample_id')

In [None]:
# sanity check: this should be the sample TSV you plan on uploading to Terra
print(sample_info.shape)
display(sample_info.head())

In [None]:
# Run this chunk to save the sample_info TSV to a file. I highly recommend this when running a pipeline on a new batch.
# This way, if anything goes wrong in the workspace, you can fall back to this.

## check: create dir if does not exist
filepath = 'temp/sample_infos/%s_sample_info.tsv' % '_'.join(samplesetnames)
sample_info.to_csv(filepath, sep='\t')

In [None]:
# read in the file you just saved
filepath = 'temp/sample_infos/%s_sample_info.tsv' % '_'.join(samplesetnames)
sample_info = pd.read_csv(filepath, sep = '\t', na_filter = False)
sample_info = sample_info.set_index('sample_id')
print(sample_info.shape)
sample_info.head()

# Creating the pairs
Create a TSV of the new pairs information.

In [None]:
normals = [r["participant"] for i, r in sample_info.iterrows() if r['sample_type'] == "Normal"]
normalsid = [i for i, r in sample_info.iterrows() if r['sample_type'] == "Normal"]
tumors = [r["participant"] for i, r in sample_info.iterrows() if r['sample_type'] == "Tumor"]
tumorsid = [i for i, r in sample_info.iterrows() if r['sample_type'] == "Tumor"]
prevtumors = [val["participant"] for k, val in refsamples.iterrows() if val.sample_type == "Tumor"]
prevnormals = [val["participant"] for k, val in refsamples.iterrows() if val.sample_type == "Normal"]

print("creating new pairs...")
# do we have new tumors/normals for our previous ones
newpairs = {'pair_id': [], 'case_sample': [], 'control_sample': [], 'participant': [], 'match_type':[]}

toreprocess_normals = set(tumors) & set(prevnormals)
for val in toreprocess_normals:
    if val != 'nan':
        for tumor_id in sample_info[sample_info['participant'] == val][sample_info[
                'sample_type'] == 'Tumor'].index.tolist():
            normal_id = refsamples[refsamples['participant'] == val][refsamples[
              'sample_type'] == 'Normal'].index.tolist()[0]
            newpairs['pair_id'].append(tumor_id + '_' + normal_id)
            newpairs['case_sample'].append(tumor_id)
            newpairs['control_sample'].append(normal_id)
            newpairs['participant'].append(val)
            newpairs['match_type'].append("Tumor_Normal")

paired = set(tumors) & set(normals)
for val in set(tumors) - toreprocess_normals:
    if val != 'nan':
        for tumor_id in sample_info[sample_info['participant'] == val][sample_info[
                'sample_type'] == 'Tumor'].index.tolist():
            normal_id = sample_info[(sample_info['participant'] == val) & (sample_info[
              'sample_type'] == 'Normal')].index.tolist()[0] if val in paired else 'NA'
            newpairs['pair_id'].append(tumor_id + "_" + normal_id)
            newpairs['case_sample'].append(tumor_id)
            newpairs['control_sample'].append(normal_id)
            newpairs['participant'].append(val)
            newpairs['match_type'].append("Tumor_Normal" if val in paired else 'Tumor_NA')

newpairs = pd.DataFrame(newpairs).set_index('pair_id')
print("Done")

# Create pair sets and sample sets

In the following cell, we create:
- a pair set for each batch
- sample sets for each batch 
- sample sets for each cohort

And then we upload these entities to the Terra workspace.

**Note:** all the entities (e.g. sample, sample set, participant tsv) need to exist! Else it will raise an error and block further uploads to Terra. You can do this by just uploading TSVs with NA. The below code does this automatically for the sample TSV.

In [None]:
print("uploading new samples...")
wto.upload_samples(sample_info)
if not "NA" in wto.get_samples().index.tolist():
    wto.upload_samples(pd.DataFrame({'sample_id':['NA'], 'participant_id':['NA']}).set_index('sample_id'))

In [None]:
print("creating pairs and pairsets...")
wto.upload_entities('pair', newpairs)

In [None]:
# Pair => case_sample => look into samples => retrieve the batch, assign to the key
dict_pairs_per_batch = {}
for samplesetname in samplesetnames:
    dict_pairs_per_batch[samplesetname] = []
    
for pair_id, pair in newpairs.iterrows():
    case_sample = pair['case_sample']
    # Retrieve the batch which sample belongs to
    pair_s_batch = sample_info.loc[case_sample]['batch']
    dict_pairs_per_batch[pair_s_batch].append(pair_id)

In [None]:
# create a pair set for each batch. 
cohorts_per_batch = {} # will be dict of cohorts in each batch; we do not actually use this for anything. Could be used for QC.
for i in range(len(samplesetnames)):
    current_batch = samplesetnames[i]
    terra.addToPairSet(wto, samplesetnames_pairs[i], dict_pairs_per_batch[current_batch])
    
    # get appropriate subset of the samples for each batch
    batch_sample_info = sample_info[sample_info['batch'] == samplesetnames[i]]
    cohorts_in_batch = []
    cohorts_with_pairs = [] # check: currently not used.
    # for each batch, make pairsets by cohort
    for val in cohorts['ID'].values:
        cohortsamples = batch_sample_info[batch_sample_info["cohorts"] == val].index.tolist()
        tumorsamplesincohort = batch_sample_info[batch_sample_info["cohorts"] == val][batch_sample_info['sample_type']=="Tumor"].index.tolist()
        pairsamples = newpairs[newpairs['case_sample'].isin(tumorsamplesincohort)].index.tolist()
        if len(cohortsamples)>0:
            cohorts_in_batch.append(val)
            try:
                terra.addToSampleSet(proc_workspace, val, cohortsamples)
            except KeyError: # we may not have this set yet
                print("KeyError for sampleset: " + str(val))
                wto.update_sample_set(val, cohortsamples)
        if len(pairsamples)>0:
            cohorts_with_pairs.append(val)
            try:
                terra.addToPairSet(proc_workspace,val, pairsamples)
            except KeyError: # we may not have this set yet
                print("KeyError for pairset: " + str(val))
                wto.update_pair_set(val, pairsamples)
    batch_name = samplesetnames[i]
    cohorts_per_batch.update(batch_name = cohorts_in_batch)
            
print("creating sample sets...")
# want to create a sample set for each batch
for i in range(len(samplesetnames)):
    # get appropriate subset of the samples
    batch_sample_info = sample_info[sample_info['batch'] == samplesetnames[i]]
    # define batch-specific tumors and normals
    batch_normals = [r["participant"] for _, r in batch_sample_info.iterrows() if r['sample_type'] == "Normal"]
    batch_normalsid = [k for k, _ in batch_sample_info.iterrows() if _['sample_type'] == "Normal"]
    batch_tumors = [r["participant"] for _, r in batch_sample_info.iterrows() if r['sample_type'] == "Tumor"]
    batch_tumorsid = [k for k,_ in batch_sample_info.iterrows() if _['sample_type'] == "Tumor"]
    # create batch-level sample sets
    ## check: worried that I'll just overwrite whatever samples sets I've made previously.
    terra.addToSampleSet(proc_workspace, samplesetid=samplesetnames_all[i], samples=batch_sample_info.index.tolist())
    terra.addToSampleSet(proc_workspace, samplesetid=samplesetnames_tumors[i], samples=batch_tumorsid)
    terra.addToSampleSet(proc_workspace, samplesetid=samplesetnames_normals[i], samples=batch_normalsid)
#     wto.update_sample_set(sample_set_id=samplesetnames_all[i], sample_ids=batch_sample_info.index.tolist())
#     wto.update_sample_set(sample_set_id=samplesetnames_tumors[i], sample_ids=batch_tumorsid)
#     wto.update_sample_set(sample_set_id=samplesetnames_normals[i], sample_ids=batch_normalsid)

# create sample sets for all samples in workspace, and all normals in workspace
# Same as cum pon but better
normalsid.extend([k for k, _ in refsamples.iterrows() if _.sample_type == "Normal"]) # add pre-existing normals

try:
    terra.addToSampleSet(proc_workspace, samplesetid="All_normals_TWIST", samples=normalsid)
except KeyError:
    wto.update_sample_set(sample_set_id="All_normals_TWIST", sample_ids=normalsid)
all_samples = wto.get_samples().index.tolist()
all_samples.remove('NA')
try:
    terra.addToSampleSet(proc_workspace, samplesetid="All_samples_TWIST", samples=all_samples)
except KeyError:
    wto.update_sample_set(sample_set_id="All_samples_TWIST", sample_ids=all_samples)

# Running Terra Worlflows
Run Terra workflows to get copy number (CNV) and mutation (SNV) information, and to create copy number heat maps by batch and by cohort.

The order of running the workflows is as follows:
- RenameBAM_TWIST
- CalculateTargetCoverage_PANCAN, 
    + DepthOfCov_PANCAN
- CreatePanelOfNormalsGATK_PANCAN, (edit the output config "normals_pon attribute"))
    + DepthOfCovQC_PANCAN
- CallSomaticCNV_PANCAN (edit the input config to match the output from CreatePanelOfNormalsGATK_PANCAN)
- MutationCalling_Normals_TWIST
- FilterGermlineVariants_NormalSample_TWIST
(edit the "PoN_name" config for CreatePoNSNV_Mutect1 and CreatePoNSNV_Mutect2)
- CreatePoNSNV_Mutect1, 
    + CreatePoNSNV_Mutect2
- PlotSomaticCNVMaps_PANCAN: we plot CN heat maps for each batch and also for each cohort
- SNV_PostProcessing_Normals, 
    + MutationCalling_Tumors_TWIST (edit the input config to match pon_mutect1, pon_mutect2)
- FilterGermlineEvents_TumorSample
- SNVPostProcessing_TWIST, 
    + FNG_Compile_Pileup_Cnt
- FNG_Compile_db_slow_download
- FNG_Query_db

More information about the pipeline exist here: https://cclf.gitbook.io/tsca/

**Note 1:** If for som reason, one of the terra submission function gives no output and it does not seem to submit anything to terra, it might be that you have been logged out of terra you will have to reload the workspace manager and package.

**Note 2:** If you get the preflight error "expression and etype must BOTH be None or a string value", check the workflow configuration using wto.get_config("NAME_OF_WORKFLOW"). This error usually occurs when you pass in expression and etype information, but the etype is already set as the "rootEntity" aka the default for the workflow. You can fix this by either changing the workflow configuration in Terra, or by not passing in the etype or expression. If you want to see why this error occurs, look at the preflight function in lapdog.py (https://github.com/broadinstitute/lapdog/blob/master/lapdog/lapdog.py).

In [None]:
# # create sample set for TWIST1-5
# normal_samples = []
# for samplesetid in samplesetnames_normals:
#     prevsamples = wto.get_sample_sets()['samples'][samplesetid]
#     normal_samples.extend(prevsamples)

In [None]:
# terra.addToSampleSet(proc_workspace, "CCLF_TWIST1-5_normals", normal_samples)

In [None]:
# # set batch ID / batch number
# # e.g. 'TWIST1'
# batch_id = 'TWIST1'

In [None]:
# wto.get_configs()

In [None]:
print("Creating Terra submissions: remember you can only cancel \n or interact with terra submissions from the Terra website. \n https://app.terra.bio/#workspaces/"+proc_workspace.replace(" ", "%20")+"/job_history")

RenameBAM_TWIST = terra.createManySubmissions(wto, "RenameBAM_TWIST", samplesetnames_all, 
                                              entity='sample_set', expression='this.samples')

print("waiting for 'Rename'")
terra.waitForSubmission(proc_workspace, RenameBAM_TWIST)

In [None]:
CalculateTargetCoverage_PANCAN = terra.createManySubmissions(wto, "CalculateTargetCoverage_PANCAN", samplesetnames_all, 
                                              entity='sample_set', expression='this.samples')
DepthOfCov_PANCAN = terra.createManySubmissions(wto, "DepthOfCov_PANCAN", samplesetnames_all, 
                                              entity='sample_set', expression='this.samples')

print("waiting for 'CalculateTargetCoverage' & 'DepthOfCov_PANCAN'")
combined_list = CalculateTargetCoverage_PANCAN + DepthOfCov_PANCAN
terra.waitForSubmission(proc_workspace, combined_list)
# terra.waitForSubmission(proc_workspace, CalculateTargetCoverage_PANCAN, DepthOfCov_PANCAN)

## NOTE: we edit the output config "normals_pon attribute" CreatePanelOfNormalsGATK_PANCAN
We do this directly in the code below.

In [None]:
# batch_id_normal = "TWIST1-5"

In [None]:
# ## Update the config for TWIST1-5 aggregated PON
# # get current config
# createPON_config = wto.get_config('CreatePanelOfNormalsGATK_PANCAN')
# # edit the config
# createPON_config['outputs']['CreatePanelOfNormals.combined_normals'] = 'workspace.combined_normals_' + batch_id_normal
# createPON_config['outputs']['CreatePanelOfNormals.normals_pon'] = 'workspace.pon_normals_' + batch_id_normal
# createPON_config['outputs']

In [None]:
# # update the config in Terra
# wto.update_config(createPON_config)

In [None]:
# # create PON from all existing normals
# CreatePanelOfNormalsGATK_PANCAN = terra.createManySubmissions(wto, "CreatePanelOfNormalsGATK_PANCAN", "CCLF_TWIST1-5_normals")

In [None]:
batch_ids = ["TWIST1", "TWIST2", "TWIST3", "TWIST4", "TWIST5"]

In [None]:
## Update the config for each batch id
CreatePanelOfNormalsGATK_PANCAN = []
DepthOfCovQC_PANCAN = []
for ind, batch_id in enumerate(batch_ids):
    # get current config
    createPON_config = wto.get_config('CreatePanelOfNormalsGATK_PANCAN')
    # edit the config
    createPON_config['outputs']['CreatePanelOfNormals.combined_normals'] = 'workspace.combined_normals_' + batch_id
    createPON_config['outputs']['CreatePanelOfNormals.normals_pon'] = 'workspace.pon_normals_' + batch_id
    createPON_config['outputs']
    
    # update the config in Terra
    wto.update_config(createPON_config)
    
    # create batch-specific PON 
    CreatePanelOfNormalsGATK_PANCAN += terra.createManySubmissions(wto, "CreatePanelOfNormalsGATK_PANCAN", [samplesetnames_normals[ind]])
    DepthOfCovQC_PANCAN += terra.createManySubmissions(wto, "DepthOfCovQC_PANCAN", [samplesetnames_all[ind]], 
                                                  entity='sample_set', expression='this.samples')

In [None]:
print("waiting for 'DepthOfCovQC_PANCAN' & 'CNV_CreatePoNForCNV'")
combined_list = DepthOfCovQC_PANCAN + CreatePanelOfNormalsGATK_PANCAN
terra.waitForSubmission(proc_workspace, combined_list)

## NOTE: we edit the inputs config for CallSomaticCNV_PANCAN so that it uses the correct CallSomaticCNV.normals_pon (batch-specific PONs)
We do this directly in the code below.

In [None]:
CallSomaticCNV_PANCAN = []
for ind, batch_id in enumerate(batch_ids):
    # get current config
    CNV_config = wto.get_config('CallSomaticCNV_PANCAN')
    CNV_config['inputs']['CallSomaticCNV.normals_pon']

    # edit the config
    CNV_config['inputs']['CallSomaticCNV.normals_pon'] = 'workspace.pon_normals_' + batch_id
    CNV_config['inputs']

    # update the config in Terra
    wto.update_config(CNV_config)

    CallSomaticCNV_PANCAN += terra.createManySubmissions(wto, "CallSomaticCNV_PANCAN", [samplesetnames_all[ind]], 
                                                  entity='sample_set', expression='this.samples', use_callcache = True)


In [None]:
print("waiting for 'CallSomaticCNV_PANCAN'")
terra.waitForSubmission(proc_workspace, CallSomaticCNV_PANCAN)

In [None]:
# create CNV map for each batch
PlotSomaticCNVMaps_PANCAN = terra.createManySubmissions(wto, "PlotSomaticCNVMaps_PANCAN", samplesetnames_all)

print("submitted final jobs for CNV pipeline")
print("you don't need to wait before moving onto the next cell")

In [None]:
MutationCalling_Normals_TWIST = terra.createManySubmissions(wto, "MutationCalling_Normals_TWIST", samplesetnames_normals, 
                                              entity='sample_set', expression='this.samples')
print("waiting for 'MutationCalling_Normals_TWIST'")
terra.waitForSubmission(proc_workspace, MutationCalling_Normals_TWIST)

In [None]:
# had errors when using call caching on TWIST1-3. No errors for TWIST4
FilterGermlineVariants_NormalSample_TWIST = terra.createManySubmissions(wto, "FilterGermlineVariants_NormalSample_TWIST", samplesetnames_normals, 
                                              entity='sample_set', expression='this.samples', use_callcache=False)
print("waiting for 'SNV_FilterGermline'")
terra.waitForSubmission(proc_workspace, FilterGermlineVariants_NormalSample_TWIST)

## NOTE: we edit the "PoN_name" config (input) and the "normals_pon_vcf" config (output) for both CreatePoNSNV_Mutect1 and CreatePoN_SNV_MuTect2
We do this directly in the code below.

In [None]:
for ind, batch_id in enumerate(batch_ids):
    # get current config
    mutect1_config = wto.get_config('CreatePoNSNV_Mutect1')
    mutect2_config = wto.get_config('CreatePoN_SNV_MuTect2')
    
    # print current version
    print(mutect1_config['methodRepoMethod']['methodVersion'])
    print(mutect2_config['methodRepoMethod']['methodVersion'])

    # edit the config
    mutect1_config['inputs']['CreatePanelOfNormals.PoN_name'] = '"Cum_PoN_' + batch_id + '_all_vcf_mutect1"'
    mutect2_config['inputs']['CreatePanelOfNormals.PoN_name'] = '"Cum_PoN_' + batch_id + '_all_vcf_mutect2"'
    mutect1_config['outputs']['CreatePanelOfNormals.normals_pon_vcf'] = 'workspace.Cum_PoN_' + batch_id + '_all_vcf_mutect1'
    mutect2_config['outputs']['CreatePanelOfNormals.createPanelOfNormals.normals_pon_vcf'] = 'workspace.Cum_PoN_' + batch_id + '_all_vcf_mutect2'

    # update the config in Terra
    wto.update_config(mutect1_config)
    wto.update_config(mutect2_config)

    # create PON for SNV from all the normals we have in the workspace so far
CreatePoNSNV_Mutect1 += [wto.create_submission('CreatePoNSNV_Mutect1', "All_normals_TWIST")]
CreatePoN_SNV_MuTect2 += [wto.create_submission('CreatePoN_SNV_MuTect2', "All_normals_TWIST")]

    # CreatePoNSNV_Mutect1 += terra.createManySubmissions(wto, "CreatePoNSNV_Mutect1", 'All_normals_TWIST')
    # CreatePoN_SNV_MuTect2 += terra.createManySubmissions(wto, "CreatePoN_SNV_MuTect2", 'All_normals_TWIST')

In [None]:
print("waiting for 'CreatePoN_SNV_MuTect2' & 'CreatePoNSNV_Mutect1'")
combined_list = CreatePoNSNV_Mutect1 + CreatePoN_SNV_MuTect2
terra.waitForSubmission(proc_workspace, combined_list)
# terra.waitForSubmission(proc_workspace, [CreatePoNSNV_Mutect1, CreatePoN_SNV_MuTect2])

In [None]:
## re-create sample_info by pulling in sample data from the Terra workspace
sample_info = wto.get_samples()

In [None]:
## create / re-create cohorts_per_batch dictionary
cohorts_per_batch = {} # will be dict of cohorts in each batch 
all_changed_cohorts = set()
for i in range(len(samplesetnames)):
    # get appropriate subset of the samples for each batch
    batch_sample_info = sample_info[sample_info['batch'] == samplesetnames[i]]
    cohorts_in_batch = []
    cohorts_with_pairs = [] # check: currently not used.
    # for each batch, make pairsets by cohort
    for val in cohorts['ID'].values:
        cohortsamples = batch_sample_info[batch_sample_info["cohorts"] == val].index.tolist()
        tumorsamplesincohort = batch_sample_info[batch_sample_info["cohorts"] == val][batch_sample_info['sample_type']=="Tumor"].index.tolist()
        if len(cohortsamples)>0:
            cohorts_in_batch.append(val)
    batch_name = samplesetnames[i]
    cohorts_per_batch[batch_name] = cohorts_in_batch
    all_changed_cohorts.update(cohorts_in_batch) # add all the new cohorts in this batch to the full list
# cohorts_per_batch
all_changed_cohorts

## NOTE: we edit the config for MutationCalling_Tumors_TWIST so that it uses the correct pon_mutect1 and pon_mutect2 (cumulative PONs from the first batch through the current batch)
We do this directly in the code below.

In [None]:
SNV_PostProcessing_Normals = []
MutationCalling_Tumors_TWIST = []
for ind, batch_id in enumerate(batch_ids):
    
    # get config 
    mutcall_tumor = wto.get_config('MutationCalling_Tumors_TWIST')

    # edit the config
    mutcall_tumor['inputs']['MutationCalling_Tumor.pon_mutect1'] = 'workspace.Cum_PoN_' + batch_id + '_all_vcf_mutect1'
    mutcall_tumor['inputs']['MutationCalling_Tumor.pon_mutect2'] = 'workspace.Cum_PoN_' + batch_id + '_all_vcf_mutect2'
    
    # check config
    print(mutcall_tumor['inputs']['MutationCalling_Tumor.pon_mutect1'])
    print(mutcall_tumor['inputs']['MutationCalling_Tumor.pon_mutect2'])
    
    # update the config in Terra
    wto.update_config(mutcall_tumor)

    # create submission
    SNV_PostProcessing_Normals += terra.createManySubmissions(wto, "SNV_PostProcessing_Normals", [samplesetnames_normals[ind]])
    
    MutationCalling_Tumors_TWIST += terra.createManySubmissions(wto, "MutationCalling_Tumors_TWIST", [samplesetnames_pairs[ind]],
                                                                entity='pair_set', expression='this.pairs')

In [None]:
print("waiting for 'SNV_PostProcessing_Normals' & 'MutationCalling_Tumors_TWIST'")
combined_list = SNV_PostProcessing_Normals + MutationCalling_Tumors_TWIST
terra.waitForSubmission(proc_workspace, combined_list)

In [None]:
## note: the workflow needs cohorts with at least 2 acceptable CL to run (if only 1, then the workflow will fail)
FilterGermlineEvents_TumorSample = terra.createManySubmissions(wto, 'FilterGermlineEvents_TumorSample', samplesetnames_pairs, 'pair_set', expression='this.pairs')
print("waiting for 'FilterGermlineEvents_TumorSample'")
terra.waitForSubmission(proc_workspace, FilterGermlineEvents_TumorSample)

Successfully created submission bca94918-7ce5-4645-92e9-676d605728b0.
Successfully created submission 39ff7c3f-6a65-49c0-a583-09b63c5b1516.
Successfully created submission fe16eccd-122b-4cad-8ff3-b3fffa4ddd9a.
Successfully created submission 2358a407-60fb-46e8-a7dc-b2eb5e23109e.
Successfully created submission 794786ab-5cc3-431c-a7ca-4cc6627b7cba.
waiting for 'FilterGermlineEvents_TumorSample'
status is: Failed for 0 jobs in submission 0. 2 mn elapsed.

In [None]:
# SNVPostProcessing_TWIST = terra.createManySubmissions(wto, "SNVPostProcessing_TWIST", samplesetnames_pairs, 
#                                               entity='pair_set', expression='this.pairs')
SNVPostProcessing_TWIST = terra.createManySubmissions(wto, "SNVPostProcessing_TWIST", samplesetnames_pairs)
print("Submitted final jobs for SNV pipeline")

# sometimes get space errors when run FNG_Compile_Pileup_Cnt if use 4 GB; changed to 10 GB
FNG_Compile_Pileup_Cnt = terra.createManySubmissions(wto, "FNG_Compile_Pileup_Cnt", samplesetnames_all, 
                                              entity='sample_set', expression='this.samples')
print("waiting for 'FNG_Compile_Pileup_Cnt'")
terra.waitForSubmission(proc_workspace, FNG_Compile_Pileup_Cnt)

In [None]:
FNG_Compile_db_slow_download = wto.create_submission("FNG_Compile_db_slow_download", "All_samples_TWIST")
print("waiting for 'FNG_Compile_db'")
terra.waitForSubmission(proc_workspace, FNG_Compile_db_slow_download)

In [None]:
FNG_Query_db = terra.createManySubmissions(wto, "FNG_Query_db", samplesetnames_all)
print("Submitted final FNG Job")
terra.waitForSubmission(proc_workspace, FNG_Query_db)
print('Done')

In [None]:
terra.waitForSubmission(proc_workspace, FNG_Query_db)

# You've finished running through the pipeline!
You should have all the SNV, CNV, and FNG results ready in Terra.