In [None]:
#setup dependencies, install metric parser
import sys
!{sys.executable} -m pip install --no-cache-dir --upgrade  qc-metric-aggregator

import os
import os.path
import firecloud.api as fapi

In [None]:
#set up constants
bucket = os.environ['WORKSPACE_BUCKET']
workspace_namespace = os.environ['WORKSPACE_NAMESPACE']
workspace_name = os.environ['WORKSPACE_NAME']
threshold_file_name = "thresholds.yml"
final_output_file_name = "qc_results.tsv"
error_output_file_name = 'samples_with_errors'

#we should pull this from a central place rather than a workspace specific bucket
master_thresholds_file = bucket + "/" + threshold_file_name
#copy the thresholds file to the notebook env
!gsutil cp $master_thresholds_file .

In [None]:
#fetch sample ids from the terra workspace table
samples = fapi.get_entities(workspace_namespace, workspace_name, "sample").json()
sample_ids = [s['name'] for s in samples]

In [None]:
#figure out which files from cromwell runs we will need to localize this heuristic can definitely be futher optimized
#allow for resuming a failed workflow by checking if the file already exists
if not os.path.isfile('files_to_localize'):
    !gsutil ls -r $bucket/** | grep -v -E 'stderr|stdout|\.sh$|\.log$|/pipelines-logs/output|/rc$|/script$|\.pdf$' > files_to_localize
else:
    print("Found existing file list.")

In [None]:
#prepare the working dir and read in already processed samples
!mkdir -p localized_files
completed_samples = {}
if not os.path.isfile(final_output_file_name):
    print("No previous results, processing all samples.")
else:
    line_number = 1
    with open(final_output_file_name, 'r') as existing_file:
        for line in existing_file:
            if line_number >= 2:
                sample_name = line.split("\t")[0]
                completed_samples[sample_name] = True
            line_number += 1
            
    print(f'Loaded {len(completed_samples)} completed samples')            

In [None]:
#run the metric aggregator for each sample and write out the results
from process_metrics.threshold_file_parser import ThresholdFileParser
from process_metrics.qc_validator import QcValidator
from process_metrics.metrics import AvailableMetrics
from process_metrics.report_generator import ReportGenerator

pass_fail_thresholds = ThresholdFileParser(threshold_file_name).thresholds()

qc_results = []    
samples_with_errors = []

if len(completed_samples) == 0: 
    first_sample = True
else:
    first_sample = False

#if we're resuming, the cursor will be placed at the end of the final line, so we need to print a newline
append_newline = os.path.isfile(final_output_file_name) and os.path.getsize(final_output_file_name) > 0

with open(final_output_file_name, 'a') as fout:  
    if append_newline:
        fout.write("\n")
    for sample_id in sample_ids:
        #don't reprocess a sample that we've already processed
        if sample_id in completed_samples:
            #print(f'Already Processed {sample_id}.')
            continue
        try:
            #localize files relevant to this sample
            _ = !cat files_to_localize | grep $sample_id | grep -v .cram | gsutil -m cp -I ./localized_files
        
            metrics = AvailableMetrics(sample_id)
            validator = QcValidator("localized_files/")
            res = ReportGenerator(sample_id, pass_fail_thresholds, metrics, validator).gather_metrics()
            headers = res[0]
            values = res[1]
            if first_sample:
                first_sample = False
                headers[0] = "entity:qc_result_sample_id"
                lowercased_headers = [h.lower() for h in headers]
                print(str.join("\t", lowercased_headers), file=fout)
            print(f'Writing row for {sample_id}')
            print(str.join("\t", values), file=fout)
        except:
            print(f'ERROR with {sample_id}')
            samples_with_errors.append(sample_id)
        finally:
            #clean up localized files for this sample
            !rm -rf localized_files/*

In [None]:
#write out samples with errors
with open(error_output_file_name, 'w') as fout:
    print(str.join("\n", samples_with_errors), file=fout)

#copy samples with errors to the workspace bucket
uploaded_errors = f'{bucket}/{error_output_file_name}'
!gsutil cp $error_output_file_name $uploaded_errors
    
#copy the results into terra as a datatable
fapi.upload_entities_tsv(workspace_namespace, workspace_name, final_output_file_name, "flexible")

#copy the TSV to the workspace bucket
uploaded_tsv = bucket + '/' + final_output_file_name
!gsutil cp $final_output_file_name $uploaded_tsv