In [1]:
import re, subprocess, boto3, json, shlex, mysql, os
import pandas as pd
import numpy as np
from s3path import S3Path
from pathlib import Path
from tqdm.notebook import tqdm
from packaging import version
pd.set_option("display.max_colwidth", 40)

# Define Helper Functions

In [2]:
# Numpy encoder for JSON from pandas series
class NpEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.integer):
            return int(obj)
        elif isinstance(obj, np.floating):
            return float(obj)
        elif isinstance(obj, np.ndarray):
            return obj.tolist()
        else:
            return super(NpEncoder, self).default(obj)

In [3]:
# from SCRIdb
def get_s3_objects(bucket, key, pattern, full_uri=False):
    
    s3r = boto3.resource("s3")
    bucket_s3 = s3r.Bucket(bucket)
    objects = []
    for obj in bucket_s3.objects.filter(Prefix=key):
        hit = pattern.search(obj.key)
        if hit:
            objects.append(obj.key)
    if full_uri:
        objects = [f"s3://{bucket}/{o}" for o in objects]
    return objects

In [4]:
def execute_query(query, user, password):
    with connect(
        host="peer-lab-db.cggxmlwgzzpw.us-east-1.rds.amazonaws.com",
        database="peer_lab_db",
        user=user,
        password=password,
    ) as connection:
        with connection.cursor(buffered=True) as cursor:
            cursor.execute(query)
            result = cursor.fetchall()
    return result

In [5]:
# Get species from database for given sample
from mysql.connector import connect, Error

def get_species(sample_id, user, password):
    try:
        table_sample_data = "peer_lab_db.sample_data"
        table_species = "peer_lab_db.species"
        table_genome_idx = "peer_lab_db.genome_index"
        query = f"""
        SELECT {table_species}.Species
        FROM {table_species}
        LEFT JOIN {table_genome_idx}
        ON {table_species}.id = {table_genome_idx}.species_id
        LEFT JOIN {table_sample_data}
        ON {table_genome_idx}.id = {table_sample_data}.genomeIndex_id
        WHERE {table_sample_data}.id = {sample_id}
        """
        result = execute_query(query, user, password)[0][0]
        return result
    except Error as e:
        print(f"Error: {e}")

In [6]:
# Get species from database for given sample
from mysql.connector import connect, Error

def get_sc_tech(sample_id, user, password):
    try:
        table_sample_data = "peer_lab_db.sample_data"
        table_sc_tech = "peer_lab_db.sc_tech"
        table_genome_idx = "peer_lab_db.genome_index"
        query = f"""
        SELECT {table_sc_tech}.sc_Tech
        FROM {table_sc_tech}
        LEFT JOIN {table_genome_idx}
        ON {table_sc_tech}.id = {table_genome_idx}.scTech_id
        LEFT JOIN {table_sample_data}
        ON {table_genome_idx}.id = {table_sample_data}.genomeIndex_id
        WHERE {table_sample_data}.id = {sample_id}
        """
        result = execute_query(query, user, password)[0][0]
        return result
    except Error as e:
        print(f"Error: {e}")

In [7]:
# Get species from database for given sample
from mysql.connector import connect, Error

def get_sample_id(sample_name, user, password):
    try:
        table_sample_data = "peer_lab_db.sample_data"
        query = f"""
        SELECT {table_sample_data}.id
        FROM {table_sample_data}
        WHERE {table_sample_data}.Sample="{sample_name}"
        """
        result = execute_query(query, user, password)[0][0]
        return result
    except Error as e:
        print(f"Error: {e}")

In [8]:
# Get species from database for given sample
from mysql.connector import connect, Error

def get_project_id(sample_id, user, password):
    try:
        table_sample_data = "peer_lab_db.sample_data"
        table_project_data = "peer_lab_db.project_data"
        query = f"""
        SELECT {table_project_data}.projectName
        FROM {table_project_data}
        LEFT JOIN {table_sample_data}
        ON {table_project_data}.id = {table_sample_data}.projectData_id
        WHERE {table_sample_data}.id = {sample_id}
        """
        result = execute_query(query, user, password)[0][0]
        return result
    except Error as e:
        print(f"Error: {e}")

In [9]:
def get_SEQC_version(loc):
    try:
        cmd = f"aws s3 cp {loc}/seqc-results/seqc_log.txt -"
        out = subprocess.run(shlex.split(cmd), universal_newlines=True, capture_output=True).__dict__["stdout"]
        version = re.match(r".*SEQC=v(\d+\.\d+\.\d+).*", out)[1]
        return version
    except:
        return "N/A"

In [10]:
def get_file_prefix(loc):
    try:
        cmd = f"aws s3 ls {loc}/seqc-results/"
        out = subprocess.run(shlex.split(cmd), universal_newlines=True, capture_output=True).__dict__["stdout"]
        
        # Note: I'm expecting the aligned bam file to be in loc
        bam_pattern = re.compile(r"(.*)_Aligned\.out\.bam$")
        filename = list(filter(bam_pattern.match, out.split()))[0]
        file_prefix = re.match(bam_pattern, filename)[1]
        return file_prefix
    except:
        raise ValueError(f"BAM file not found in {loc}")
        return ""

In [11]:
def get_reference(sample_id):
    # Get species from database to decide reference
    species = get_species(sample_id, creds["user"], creds["password"])
    
    # Map to reference locations
    if "Human" in species:
        return "s3://seqc-public/genomes/hg38_long_polya/annotations.gtf"
    elif "Mouse" in species:
        return "s3://seqc-public/genomes/mm38_long_polya/annotations.gtf"
    else:
        raise ValueError(f"Unknown Species: {species}")

In [12]:
def get_bc_whitelist(sample_id):
    # Get version from database to decide whitelist
    sc_tech = get_sc_tech(sample_id, creds["user"], creds["password"])
    
    # Map to reference locations
    if "V3" in sc_tech:
        return "s3://seqc-public/barcodes/ten_x_v3/flat/3M-february-2018.txt"
    elif "V2" in sc_tech:
        return "s3://seqc-public/barcodes/ten_x_v2/flat/737K-august-2016.txt"
    else:
        raise ValueError(f"Unknown Technology: {sc_tech}")

In [13]:
def run(
    workflow_path: str,
    execp: str,
    secrets: str,
    inputs: str,
    labels: str,
    options: str,
):
    # change working directory to the pipeline package
    oldwd = os.getcwd()
    os.chdir(workflow_path)
    
    # execute the pipeline command
    cmd = f"{workflow_path}/{execp} -k {secrets} -i {inputs} -l {labels} -o {options}"
    var = subprocess.run(shlex.split(cmd), universal_newlines=True, capture_output=True)
    out = var.__dict__
    
    # change working directory back
    os.chdir(oldwd)
    
    return out

# Process Samples

## Setup

In [14]:
# Location of docker files
common_docker_registry = "quay.io/hisplan"

prefix = "Velopipe2" # Workflow to run; also .wdl filename prefix
pipeline_type = prefix # field in *.labels.json
output_dirname = "velopipe"

# If need to add comment, put here
comment = ""

In [15]:
# Velopipe-specific parameters
min_SEQC_version = "0.2.9"
options_prefix="Velopipe"

In [16]:
# Locations of workflow-related directories and files
path_to_cromwell_secrets = f"{Path.home()}/.cromwell/cromwell-secrets.json" # CHANGE THIS
workflow_dir = f"{Path.home()}/scing/bin/velopipe-0.0.9" # CHANGE THIS
path_to_exec = f"{workflow_dir}/submit.sh" # CHANGE THIS FOR SHARP
config_dir = f"{workflow_dir}/config"
path_to_options = f"{workflow_dir}/{options_prefix}.options.aws.json"

# Other file locations
db_credentials_path = f"{Path.home()}/.config.json" # CHANGE THIS

In [17]:
# Set credentials based on SCRIdb CLI config file
with open(db_credentials_path) as f:
    creds = json.load(f)

In [18]:
# Samples on which to run Velopipe
# Note: Assumes data is transferred to AWS S3 (this should be an s3 location)
# Note: Assumes directory name is name of sample
sample_paths = [
    "s3://dp-lab-data/moormana/CRC_Triplets/KG146Li/"
]

## Execution

In [19]:
# Get information for all samples
sample_paths = [s.strip('/') for s in sample_paths] # remove trailing slash if exists
sample_names = [os.path.basename(s) for s in sample_paths]
samples = pd.DataFrame(
    sample_paths,
    index=sample_names,
    columns=["S3_Path"],
    dtype=str,
)
samples["File_Prefix"] = samples["S3_Path"].apply(get_file_prefix)
samples["Sample_ID"] = pd.Series(samples.index).apply(
    lambda x: get_sample_id(x, creds['user'], creds['password'])
).values
samples["File_Prefix"] = samples["S3_Path"].apply(get_file_prefix)

# Determine whether samples must be re-run with updated version of SEQC
samples["SEQC_Version"] = samples["S3_Path"].apply(get_SEQC_version)
is_outdated = samples["SEQC_Version"].apply(lambda x: False if (x=='N/A') else (version.parse(x) < version.parse(min_SEQC_version)))
outdated = samples[is_outdated]
print(f"{len(outdated)} samples must be re-run with SEQC >= v{min_SEQC_version}: {', '.join(outdated.index.values)}")

0 samples must be re-run with SEQC >= v0.2.9: 


In [26]:
# Load minimum inputs and labels fields from templates
with open(f"{workflow_dir}/config/template.inputs.json") as f:
    std_inputs_fields = list(json.load(f).keys())
    
with open(f"{workflow_dir}/config/template.labels.json") as f:
    std_labels_fields = list(json.load(f).keys())
    
# Annotate all samples with workflow inputs and labels
inputs = pd.DataFrame(index=samples.index, columns=std_inputs_fields,)
labels = pd.DataFrame(index=samples.index, columns=std_labels_fields,)

# Annotate inputs
inputs[f"{prefix}.sampleName"] = inputs.index # may need to change
inputs[f"{prefix}.numOfChunks"] = int(20) # may need to change
inputs[f"{prefix}.gtf"] = samples["Sample_ID"].apply(get_reference)
inputs[f"{prefix}.fullBarcodeWhitelist"] = samples["Sample_ID"].apply(get_bc_whitelist)
inputs[f"{prefix}.dockerRegistry"] = common_docker_registry
samples["Full_S3_Path"] = samples["S3_Path"] + "/seqc-results/" +  samples["File_Prefix"]
inputs[f"{prefix}.bam"] = samples["Full_S3_Path"] + "_Aligned.out.bam"
inputs[f"{prefix}.cbCorrection"] = samples["Full_S3_Path"] + "_cb-correction.csv.gz"
inputs[f"{prefix}.umiCorrection"] = samples["Full_S3_Path"] + "_umi-correction.csv.gz"

# ********************
# Note: Whitelist may need to be changed on a per-sample basis

inputs[f"{prefix}.filteredBarcodes"] = "s3://dp-lab-data/moormana/CRC_Triplets/KG146Li/seqc-results/KG146Li_Epi_dense.csv"

# ********************

# Annotate labels
labels["pipelineType"] = pipeline_type
labels["project"] = samples["Sample_ID"].apply(lambda x: get_project_id(x, creds["user"], creds["password"]))
labels["sample"] = labels.index
labels["owner"] = creds["user"]
labels["destination"] = samples['S3_Path'] + "/" + output_dirname
labels["transfer"] = "-"
labels["comment"] = creds["user"]

assert (std_inputs_fields == list(inputs.columns)) & (inputs.notna().values.all())
assert (std_labels_fields == list(labels.columns)) & (labels.notna().values.all())

In [27]:
inputs

Unnamed: 0,Velopipe2.sampleName,Velopipe2.filteredBarcodes,Velopipe2.bam,Velopipe2.numOfChunks,Velopipe2.gtf,Velopipe2.fullBarcodeWhitelist,Velopipe2.cbCorrection,Velopipe2.umiCorrection,Velopipe2.dockerRegistry
KG146Li,KG146Li,s3://dp-lab-data/moormana/CRC_Triple...,s3://dp-lab-data/moormana/CRC_Triple...,20,s3://seqc-public/genomes/hg38_long_p...,s3://seqc-public/barcodes/ten_x_v3/f...,s3://dp-lab-data/moormana/CRC_Triple...,s3://dp-lab-data/moormana/CRC_Triple...,quay.io/hisplan


In [28]:
labels

Unnamed: 0,pipelineType,project,sample,owner,destination,transfer,comment
KG146Li,Velopipe2,CRC Primary Met Organoid,KG146Li,moormana,s3://dp-lab-data/moormana/CRC_Triple...,-,moormana


In [35]:
stdouts = [] # to store all outputs
process = True

with tqdm(samples.index) as t:

    for sample_name in t:

        # Write inputs and labels to file
        path_to_inputs = f"{config_dir}/{sample_name}.inputs.json"
        with open(path_to_inputs, "w") as f_inputs:
            json.dump(inputs.loc[sample_name].to_dict(), f_inputs, indent=4, cls=NpEncoder)

        path_to_labels = f"{config_dir}/{sample_name}.labels.json"
        with open(path_to_labels, "w") as f_labels:
            json.dump(labels.loc[sample_name].to_dict(), f_labels, indent=4, cls=NpEncoder)

        if process:
            stdouts.append(run(
                workflow_path = workflow_dir,
                execp = "submit.sh",
                secrets = path_to_cromwell_secrets,
                inputs = path_to_inputs,
                labels = path_to_labels,
                options = path_to_options,
            ))

  0%|          | 0/1 [00:00<?, ?it/s]

In [36]:
stdouts

[{'args': ['/Users/moormana/scing/bin/velopipe-0.0.9/submit.sh',
   '-k',
   '/Users/moormana/.cromwell/cromwell-secrets.json',
   '-i',
   '/Users/moormana/scing/bin/velopipe-0.0.9/config/KG146Li.inputs.json',
   '-l',
   '/Users/moormana/scing/bin/velopipe-0.0.9/config/KG146Li.labels.json',
   '-o',
   '/Users/moormana/scing/bin/velopipe-0.0.9/Velopipe.options.aws.json'],
  'returncode': 0,
  'stdout': '{"id":"61f746cb-437c-4a88-ab84-2100521ef9e3","status":"Submitted"}\n',
  'stderr': ''}]