In [None]:
output_root_dir = ''
gcs_record_keeping_bucket = ''
gcs_record_keeping_dir = ''

This is the under-dev notebook for checking identity metadata of PacBio flowcells.

The order of execution is the following:

0. Load all flowcells that is workable
1. Based on the flowcells' marked identity metadata, see if its Mercury FP VCF has been uploaded to the cloud database (and upload using on-prem scripts if missing)
2. Check which flowcells have negative LOD, report
3. Check which flowcells have indecisive LOD, report
4. Check which flowcells don't have LOD yet, then
    * if it's ready, i.e. have alignments and FP vcf, then launch `VerifyFingerprint`
    * if it's because there's no BAM yet, i.e. `PBFlowcell` hasn't been run on it, launch the job
    * if it's because there's a shallow BAM, report

There are several dark-knowledge dependencies that are needed for this to run:
    * "known_samples_without_mercury.txt": holding samples that are known to have no GT'ed fingerprinting VCF
    * "known_flowcells_with_issues.txt": holding flowcells that are known to be inappropriate (for various reasons) to be fingerprinted


In [None]:
import os
import datetime

from dateutil import parser

import pandas as pd

from google.cloud import storage
storage_client = storage.Client()
import pprint
from termcolor import colored


In [None]:
from lrmaCU.gcs_utils import *
from lrmaCU.utils import *

from lrmaCU.terra.table_utils import *
from lrmaCU.terra.submission.submission_utils import *

# Date & time, for record keeping

In [None]:
today = datetime.datetime.today().strftime("%Y%m%d")
cutoff_date = pd.to_datetime(datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc))

In [None]:
print("Current Time =", datetime.datetime.now().strftime("%D %H:%M:%S"))

In [None]:
output_root_dir = output_root_dir.rstrip('/')
gcs_record_keeping_dir = gcs_record_keeping_dir.rstrip('/')

In [None]:
output_dir = f'{output_root_dir}/{datetime.datetime.today().strftime("%Y-%m-%d")}'
os.makedirs(output_dir, exist_ok=True)

In [None]:
logger = get_configured_logger(log_level=logging.INFO)

# Filters to apply, still under development, so changes with time

In [None]:
known_flowcells_inappropriate_for_current_pbflowcell = ['DA143934', 'DA073901']

In [None]:
ff = GcsPath('gs://broad-gp-pacbio/metrics/fingerprinting/mercury/known_samples_without_mercury.txt')
if not ff.exists(storage_client):
    raise RuntimeError("Dependency file gs://broad-gp-pacbio/metrics/fingerprinting/mercury/known_samples_without_mercury.txt doesn't exist any more.")
known_samples_without_mercury = ff.get_blob(storage_client).download_as_text().split('\n')
print(f"{len(known_samples_without_mercury)} samples are known to have no Mercury entries.")

In [None]:
ff = GcsPath('gs://broad-gp-pacbio/metrics/fingerprinting/mercury/known_flowcells_with_issues.txt')
if not ff.exists(storage_client):
    raise RuntimeError("Dependency file gs://broad-gp-pacbio/metrics/fingerprinting/mercury/known_flowcells_with_issues.txt doesn't exist any more.")
known_problematic_flowcells = ff.get_blob(storage_client).download_as_text().split('\n')
print(f"{len(known_problematic_flowcells)} flowcells known to have issues preventing them from being VerifyFingerprint'ed.")

In [None]:
ff = GcsPath('gs://broad-gp-pacbio/metrics/fingerprinting/mercury/flowcells_identity_manually_confirmed.txt')
if not ff.exists(storage_client):
    raise RuntimeError("Dependency file gs://broad-gp-pacbio/metrics/fingerprinting/mercury/flowcells_identity_manually_confirmed.txt doesn't exist any more.")
borderline_lod_flowcells_cleared = ff.get_blob(storage_client).download_as_text().split('\n')

In [None]:
samples_in_cloud_mercury = list()
for b in storage_client.list_blobs('broad-gp-pacbio', prefix='metrics/fingerprinting/mercury/vcfs'):
    file_name = b.name.split('/')[-1]
    if file_name.endswith('.vcf.gz'):
        sample = file_name.split('__')[0]
        samples_in_cloud_mercury.append(sample)
print(f"{len(samples_in_cloud_mercury)} samples already in living in the lrma-cloud-mercury.")

In [None]:
ff = GcsPath('gs://broad-gp-pacbio/metrics/fingerprinting/notifiers.tsv')
if not ff.exists(storage_client):
    raise RuntimeError("Dependency file gs://broad-gp-pacbio/metrics/fingerprinting/notifiers.txt doesn't exist any more.")
ll = [tuple(line.split('\t')) for line in ff.get_blob(storage_client).download_as_text().split('\n') if line]
notification_receiver_names, notification_receiver_emails = zip(*ll)

In [None]:
ff = GcsPath('gs://broad-dsde-methods-long-reads/resources/developers.tsv')
if not ff.exists(storage_client):
    raise RuntimeError("Dependency file gs://broad-dsde-methods-long-reads/resources/developers.tsv doesn't exist any more.")
ll = [tuple(line.split('\t')) for line in ff.get_blob(storage_client).download_as_text().split('\n') if line]
developer_names, developer_emails = zip(*ll)

# LOAD & FORMAT FLOWCELLS

In [None]:
primary_namespace = 'production-long-reads'
primary_workspace = 'broad-gp-pacbio'
root_data_type='sample'
flowcell_table = \
  fetch_existing_root_table(ns=primary_namespace,
                            ws=primary_workspace,
                            etype=root_data_type)

In [None]:
batch_submission_type = root_data_type + '_submission_batch'

In [None]:
gcs_locations = ['aligned_bai', 'aligned_bam', 'aligned_pbi',
                 'ccs_bam', 'ccs_pbi', 'ccs_report',
                 'fingerprint_details', 'fingerprint_metrics',
                 'fq', 'gcs_input_dir', 'input_bam', 'input_pbi', 'subreads_bam', 'subreads_pbi']

In [None]:
lab_identity = ['bio_sample', 'description', 'well_sample']
sequencer_identity = ['flowcell_id', 'movie_name', 'well_name']
terra_identity = ['sample']

In [None]:
categorical_columns = {'type': 'category',
                       'columns': ['application', 'experiment_type', 'instrument', 'workspace']}

date_time_columns = {'type': 'datetime64',
                     'timezone': datetime.timezone.utc,
                     'columns': ['created_at']}

boolean_columns = {'type': 'bool',
                   'columns': ['is_ccs', 'is_corrected', 'is_isoseq']}

int_type_columns = {'type': 'Int64',
                    'columns': ['aligned_num_bases','aligned_num_reads','aligned_read_length_N50',
                                'ccs_zmws_fail_filters','ccs_zmws_input','ccs_zmws_pass_filters', 'ccs_zmws_shortcut_filters',
                                'insert_size',
                                'num_bases','num_reads','num_reads_Q10','num_reads_Q12','num_reads_Q15','num_reads_Q5','num_reads_Q7','num_records',
                                'total_length']}

float_type_columns = {'type': 'float64',
                      'columns': ['lod_expected_sample',
                                  'aligned_est_fold_cov', 'raw_est_fold_cov',
                                  'aligned_frac_bases','aligned_read_length_mean','aligned_read_length_median','aligned_read_length_stdev',
                                  'average_identity', 'median_identity',
                                  'ccs_zmws_fail_filters_pct','ccs_zmws_pass_filters_pct','ccs_zmws_shortcut_filters_pct',
                                  'polymerase_read_length_N50', 'polymerase_read_length_mean',
                                  'read_length_N50', 'read_length_mean', 'read_length_median', 'read_length_stdev', 'read_qual_mean', 'read_qual_median',
                                  'subread_read_length_N50','subread_read_length_mean']}

string_type_columns = {'type': 'str',
                       'columns': gcs_locations + terra_identity + lab_identity + sequencer_identity}

In [None]:
for n in boolean_columns['columns']:
    flowcell_table[n] = flowcell_table[n].apply(lambda s: s=='TRUE' or s=='True' or s=='true').astype(boolean_columns['type'])

In [None]:
for n in categorical_columns['columns']:
    flowcell_table[n] = flowcell_table[n].astype(categorical_columns['type'])

In [None]:
for n in string_type_columns['columns']:
    flowcell_table[n] = flowcell_table[n].astype(string_type_columns['type'])

In [None]:
def convert_to_float(e) -> float or None:
    if e:
        if e.lower() in ['nan', 'none']:
            return None
        else:
            try:
                return float(e)
            except TypeError:
                print(e)
                raise
    else:
        return None

def convert_to_int(e) -> int:
    f = convert_to_float(e)
    return round(f) if f else None

In [None]:
for n in int_type_columns['columns']:
    try:
        flowcell_table[n] = flowcell_table[n].apply(convert_to_int).astype(int_type_columns['type'])
    except ValueError:
        print(n)
        raise

In [None]:
for n in float_type_columns['columns']:
    try:
        flowcell_table[n] = flowcell_table[n].apply(convert_to_float).astype(float_type_columns['type'])
    except ValueError:
        print(n)
        raise

In [None]:
def convert_date_time(s):
    try:
        t = parser.isoparse(s).astimezone(tz=date_time_columns['timezone'])
        return pd.to_datetime(t)
    except (ValueError, pd.errors.OutOfBoundsDatetime):
        return pd.Timestamp.min
for n in date_time_columns['columns']:
    flowcell_table[n] = flowcell_table[n].apply(lambda s: pd.to_datetime(convert_date_time(s)))

# FILTER DATA

In [None]:
def filter_pacbio_flowcells(terra_table_row, cutoff_date_to_study,
                            columns_and_blacklist: dict) -> bool:
    """
    Filter applicable to all flowcells.
    :param terra_table_row:
    :param cutoff_date_to_study:
    :param columns_and_blacklist:
    :return: true if the row should be kept
    """

    # filter out known bad ones
    keep = True
    for col, black_list in columns_and_blacklist.items():
        keep &= terra_table_row[col] not in black_list

    # no time zone information
    sequencing_date = terra_table_row['created_at']
    if sequencing_date.tzinfo is None:
        return False

    # remove unknowns
    keep &= ' ' not in terra_table_row['description']
    keep &= 'unknown' != terra_table_row['description']

    # NON-genomic applications
    keep &= not terra_table_row['description'].startswith('SIRV_')
    keep &= 'amplicon' not in terra_table_row['application']

    # too early
    keep &= sequencing_date >= cutoff_date_to_study

    return keep

In [None]:
my_blacklists = {'flowcell_id': [*known_problematic_flowcells , *known_flowcells_inappropriate_for_current_pbflowcell],
                 'well_sample': known_samples_without_mercury,
                 'experiment_type': ['ISOSEQ', 'MASSEQ'],
                 'application': ['isoSeq', 'unknown']}

In [None]:
usable_flowcell_table = flowcell_table.loc[flowcell_table.apply(lambda row: filter_pacbio_flowcells(row, cutoff_date, my_blacklists), axis=1),:].reset_index(drop=True)
usable_flowcell_table.shape

In [None]:
usable_flowcell_table.loc[usable_flowcell_table['flowcell_id'].isin(known_problematic_flowcells),:]

In [None]:
print(f"{len(usable_flowcell_table['well_sample'].unique())} unique SM-[A-Z0-9]+ samples")

In [None]:
samples_upto_date = usable_flowcell_table[['bio_sample', 'description', 'well_sample']].sort_values(by=['well_sample']).drop_duplicates(ignore_index=True)
samples_upto_date.shape

In [None]:
desired_columns_in_order = ['flowcell_id', 'bio_sample', 'description', 'well_sample',
                            'aligned_est_fold_cov',
                            'lod_expected_sample',  'aligned_bam',
                            'application', 'experiment_type',
                            'is_ccs', 'is_corrected', 'is_isoseq',
                            'ccs_zmws_pass_filters_pct',
                            'instrument', 'movie_name', 'well_name', 'insert_size', 'created_at',
                            'sample', 'workspace']

# Negative LOD, i.e. swapped

In [None]:
swapped_flowcells = usable_flowcell_table[usable_flowcell_table.lod_expected_sample < -3.0].reset_index(drop=True).sort_values(by=['created_at'])
swapped_flowcells[desired_columns_in_order]

In [None]:
swapped_flowcells[desired_columns_in_order]\
    .to_csv(f'{output_dir}/negative.LOD.flowcells.tsv', sep='\t', header=True, index=False)

# Indecisive LOD

In [None]:
indecisive_idx = ~usable_flowcell_table['flowcell_id'].isin( borderline_lod_flowcells_cleared)
indecisive_idx &= ((usable_flowcell_table.lod_expected_sample >= -3) & (usable_flowcell_table.lod_expected_sample < 6))
indecisive_flowcells = usable_flowcell_table[indecisive_idx].reset_index(drop=True).sort_values(by=['created_at'])
indecisive_flowcells[desired_columns_in_order]

In [None]:
if 0 < len(indecisive_flowcells):
    indecisive_flowcells[desired_columns_in_order]\
        .to_csv(f'{output_dir}/indecisive.LOD.flowcells.tsv',
                sep='\t', header=True, index=False)

# No LOD

In [None]:
no_lod = usable_flowcell_table.loc[usable_flowcell_table.lod_expected_sample.isna(),:].reset_index(drop=True)
print(f'{len(no_lod)} flowcell have no LOD.')

In [None]:
no_lod

In [None]:
is_with_bam = no_lod['aligned_bam'].apply(lambda s: s.startswith('gs://'))
is_enough_coverage = no_lod['aligned_est_fold_cov'].apply(lambda s: float(s) > 1.0)
is_with_mercury = no_lod['well_sample'].isin(samples_in_cloud_mercury)

## No LOD&mdash;meaningless coverage

In [None]:
shallow_bam = no_lod.loc[is_with_bam & ~is_enough_coverage].sort_values(by=['well_sample']).reset_index(drop=True)
shallow_bam[desired_columns_in_order]

In [None]:
if 0 < len(shallow_bam):
    shallow_bam.to_csv(f'{output_dir}/shallow.BAM.flowcells.tsv', sep='\t', header=True, index=False)

## No LOD&mdash;just need to run it. !!! WARN: NEED TO CHECK IT'S NOT RUNNING!!!

In [None]:
ready_to_fp = no_lod.loc[is_with_bam & is_enough_coverage & is_with_mercury].sort_values(by=['well_sample']).reset_index(drop=True)
ready_to_fp[desired_columns_in_order]

In [None]:
# explicitly disable call caching because database updates almost daily
if 0 < len(ready_to_fp):
    verify_before_submit(primary_namespace, primary_workspace,
                         workflow_name='VerifyFingerprint',
                         etype=root_data_type, enames=ready_to_fp['sample'].tolist(),
                         use_callcache=False,
                         batch_type_name = 'dailynotebookrun_flowcell_batch', expression = 'this.samples')

## No LOD&mdash;no BAM yet !!! WARN: NEED TO CHECK IT'S NOT RUNNING!!!

In [None]:
need_bam = no_lod.loc[~is_with_bam].sort_values(by=['well_sample']).reset_index(drop=True)
need_bam[desired_columns_in_order]

In [None]:
if 0 < len(need_bam):
    verify_before_submit(primary_namespace, primary_workspace,
                         workflow_name='PBFlowcell',
                         etype=root_data_type, enames=need_bam['sample'].tolist(),
                         use_callcache=True,
                         batch_type_name = 'dailynotebookrun_sample_batch', expression = 'this.samples')

# Query on-prem Mercury, prep for next round

In [None]:
need_mercury_sample_ids = \
    samples_upto_date.loc[~samples_upto_date['well_sample'].isin(samples_in_cloud_mercury), :]\
        .rename({'bio_sample': 'Collab_Part_ID',
                 'description': 'Collab_SM_ID',
                 'well_sample': 'Broad_LSID'
                 }, axis=1)\
        .sort_values(by=['Broad_LSID'], axis=0)\
        .reset_index(drop=True)

need_mercury_sample_ids['Broad_LSID'] = need_mercury_sample_ids['Broad_LSID'].apply(lambda s: re.sub('^SM-', '', s))

need_mercury_sample_ids['Date'] = today

print(f"{len(need_mercury_sample_ids)} newly found samples need to have their FP VCFs queried.")
need_mercury_sample_ids

In [None]:
if 0 < len(need_mercury_sample_ids):
    csv_location = f'{output_dir}/need_mercury_sample_ids_headerless.csv'
    need_mercury_sample_ids.to_csv(csv_location,
                                   sep=',', index=False, header=False)

### !!! Now go and upload the VCFs... !!!

#### Upload to designated record-keeping bucket

In [None]:
base = output_dir.split('/')[-1]
for f in absolute_file_paths(output_dir):
    bf = os.path.basename(f)
    upload_blob(gcs_record_keeping_bucket, f, f"{gcs_record_keeping_dir}/{base}/{bf}")

# Notification

In [None]:
if 0 < len(swapped_flowcells):
    headline = f'PacBio flowcells failed fingerprint check as of {today}'
    html = """\
    <html>
      <head></head>
      <body>
        {0}
      </body>
      </html>
    """.format(swapped_flowcells[desired_columns_in_order].to_html())
    msg = 'Please check'
    # msg = swapped_flowcells[desired_columns_in_order].to_string(index=False)
    send_notification(logger=logger, notification_sender_name = 'LRMA-AUTO-NOTIF',
                      notification_receiver_names = notification_receiver_names, notification_receiver_emails = notification_receiver_emails,
                      email_subject = headline, email_body = msg, html_body = html)

if 0 < len(indecisive_flowcells):
    headline = f'PacBio flowcells with indecisive fingerprint LOD as of {today}'
    html = """\
    <html>
      <head></head>
      <body>
        {0}
      </body>
    </html>
    """.format(indecisive_flowcells[desired_columns_in_order].to_html())
    msg = 'Please check'
    # msg = indecisive_flowcells[desired_columns_in_order].to_string(index=False)
    send_notification(logger=logger, notification_sender_name = 'LRMA-AUTO-NOTIF',
                      notification_receiver_names = notification_receiver_names, notification_receiver_emails = notification_receiver_emails,
                      email_subject = headline, email_body = msg, html_body = html)

In [None]:
trace_back = 7
failure_cnt_threshold = 3

repeated_failures_PBFlowcell = get_repeatedly_failed_entities(primary_namespace, primary_workspace,
                                                              'PBFlowcell', 'sample',
                                                              trace_back, failure_cnt_threshold)
repeated_failures_PBCCSIsoSeq = get_repeatedly_failed_entities(primary_namespace, primary_workspace,
                                                               'PBCCSIsoSeq', 'sample_set',
                                                               trace_back, failure_cnt_threshold)
repeated_failures_PBCCSWholeGenome = get_repeatedly_failed_entities(primary_namespace, primary_workspace,
                                                                    'PBCCSWholeGenome', 'sample_set',
                                                                    trace_back, failure_cnt_threshold)
repeated_failures_PBCLRWholeGenome = get_repeatedly_failed_entities(primary_namespace, primary_workspace,
                                                                    'PBCLRWholeGenome', 'sample_set',
                                                                    trace_back, failure_cnt_threshold)

msg = ''
if 0 < len(repeated_failures_PBFlowcell):
    msg += colored("\n\nRepeated failures with PBFlowcell:\n", color='red', attrs=['bold']) + pprint.pformat(repeated_failures_PBFlowcell)
if 0 < len(repeated_failures_PBCCSIsoSeq):
    msg += colored("\n\nRepeated failures with PBCCSIsoSeq:\n", color='red', attrs=['bold']) + pprint.pformat(repeated_failures_PBCCSIsoSeq)
if 0 < len(repeated_failures_PBCCSWholeGenome):
    msg += colored("\n\nRepeated failures with PBCCSWholeGenome:\n", color='red', attrs=['bold']) + pprint.pformat(repeated_failures_PBCCSWholeGenome)
if 0 < len(repeated_failures_PBCLRWholeGenome):
    msg += colored("\n\nRepeated failures with PBCLRWholeGenome:\n", color='red', attrs=['bold']) + pprint.pformat(repeated_failures_PBCLRWholeGenome)

if msg:
    headline = f'Repeated failed workflows in workspace {primary_namespace}/{primary_workspace} as of {today}'
    send_notification(logger=logger, notification_sender_name = 'LRMA-AUTO-NOTIF',
                      notification_receiver_names = developer_names, notification_receiver_emails = developer_emails,
                      email_subject = headline, email_body = msg)