In [2]:
import firecloud.api as fapi
import json
import math
import os
import re

bucket = "fc-secure-d99fbd65-eb27-4989-95b4-4cf559aa7d36"

namespace = "testmybroad"
workspace = "Macosko-Pipelines"
cnamespace = "macosko-pipelines"

special = {"ref", "puck"}

In [3]:
def validate_run_data(run_data):
    assert type(run_data) == dict
    for workflow in run_data:
        assert workflow in ["cellranger-count", "spatial-count", "positioning", "reconstruction"]
        assert type(run_data[workflow]) == list
        assert len(run_data[workflow]) > 0
        print(f"{workflow} samples: ")
        for sample in run_data[workflow]:
            print(f"\t{sample}")
            assert type(sample) == dict
            assert len(sample) > 0
            if workflow == "cellranger-count": # one is ref, other is an index
                assert len(sample) == 2
                assert "ref" in sample and type(sample["ref"]) == str
                assert len(set(sample.keys()).intersection(special)) == 1
            if workflow == "spatial-count": # one puck, rest are index
                assert len(sample) >= 2
                assert "puck" in sample and type(sample["puck"]) == list and all(type(puck) == str for puck in sample["puck"])
                assert len(set(sample.keys()).intersection(special)) == 1
            [validate_lanes(sample[index]) for index in sample if index not in special]    

def validate_lanes(lanes):
    assert type(lanes) == list
    assert all(isinstance(L, int) and 1 <= L <= 8 for L in lanes)

def get_fastqs_and_size_and_id(bcl, sample):
    # Get the sizes of all fastqs in the bucket
    sizes = !gsutil du gs://{bucket}/fastqs/{bcl}
    sizes = [size.split() for size in sizes]
    sizes = [(size[0], size[1]) for size in sizes if size[1][-9:] == ".fastq.gz"]
    sizes = [size for size in sizes if "_I1_" not in size[1] and "_I2_" not in size[1]]
    
    # Delete non-index keys
    sample = {k: v for k, v in sample.items() if k not in special}
    
    # Filter down to the fastqs in our sample
    fastqs = [] ; total_size = 0
    for index in sample:
        index_sizes = [size for size in sizes if f"{index}_S" in size[1]]
        if len(sample[index]) > 0:
            lanes = [f"_L00{L}_" for L in sample[index]]
            index_sizes = [size for size in index_sizes if any(lane in size[1] for lane in lanes)]
            assert len(index_sizes) == len(lanes)*2
        
        assert len(index_sizes) % 2 == 0
        assert len(set([re.sub(r'_L00[1-8]_', '__', size[1]) for size in index_sizes])) == 2
        
        fastqs += [size[1] for size in index_sizes]
        total_size += sum([int(size[0]) for size in index_sizes])
    
    total_size_GiB = math.ceil(total_size/1024/1024/1024)
    print(f"{len(fastqs)} fastqs found")
    print(f"total size: {total_size_GiB} GiB")
    
    # Create a unique id
    ids = [index if len(sample[index]) == 0 else index+'-'+''.join(map(str, sample[index])) for index in sample]
    id = bcl+"/"+'_'.join(ids)
    print(f"id: {id}")
    
    # Return
    return fastqs, total_size_GiB, id

def submit(method, user_comment=""):
    # Validate the configuration
    res = fapi.validate_config(namespace, workspace, cnamespace, method).json()
    assert res["extraInputs"] == [], f"ERROR: extra input: \n{res['extraInputs']}"
    assert res["invalidInputs"] == {}, f"ERROR: invalid input: \n{res['invalidInputs']}"
    assert res["invalidOutputs"] == {}, f"ERROR: invalid output: \n{res['invalidOutputs']}"
    assert res["missingInputs"] == [], f"ERROR: missing input: \n{res['missingInputs']}"
    
    # Submit the job
    fapi.create_submission(namespace, workspace, cnamespace, method, user_comment=user_comment).json()
    print(f"Submitted {method} {user_comment}")

In [4]:
def run_cellranger_count(bcl, sample, technique="cellranger", params="", mm=5):
    # Get the paths and sizes of the fastqs
    fastqs, total_size_GiB, id = get_fastqs_and_size_and_id(bcl, sample)
    mem_GiB = 64
    disk_GiB = max(math.ceil(total_size_GiB * mm + 20), 128)
    assert disk_GiB <= 6000, "Increase disk cap"
    
    # Assert the output doesn't exist already
    files = !gsutil ls gs://{bucket}/cellranger-count/{id} ; assert len(files) == 1

    # Get the reference
    reference = f"gs://{bucket}/references/{sample['ref']}"
    files = !gsutil ls {reference} ; assert len(files) > 1

    # Get the sample
    [index] = set(sample.keys())-{"ref"}

    # Update the configuration
    body = fapi.get_workspace_config(namespace, workspace, cnamespace, "cellranger-count").json()  
    body["inputs"]["cellranger_count.id"] = f'"{id}"'
    body["inputs"]["cellranger_count.fastq_paths"] = f"{fastqs}".replace("'", '"')
    body["inputs"]["cellranger_count.sample"] = f'"{index}"'
    body["inputs"]["cellranger_count.reference"] = f'"{reference}"'
    body["inputs"]["cellranger_count.technique"] = f'"{technique}"'
    body["inputs"]["cellranger_count.mem_GiB"] = f'"{mem_GiB}"'
    body["inputs"]["cellranger_count.disk_GiB"] = f'"{disk_GiB}"'
    body["inputs"]["cellranger_count.count_output_path"] = ''
    body["inputs"]["cellranger_count.log_output_path"] = ''
    body["inputs"]["cellranger_count.bucket"] = ''
    body["inputs"]["cellranger_count.docker"] = ''
    body["inputs"]["cellranger_count.params"] = f'"{params}"'
    fapi.update_workspace_config(namespace, workspace, cnamespace, "cellranger-count", body).json()
    submit("cellranger-count", f"{index} {bcl}")
    return True

def run_spatial_count(bcl, sample, mm=2):
    # Get the paths and sizes of the fastqs
    fastqs, total_size_GiB, id = get_fastqs_and_size_and_id(bcl, sample)
    mem_GiB = max(math.ceil(total_size_GiB * mm), 64)
    disk_GiB = max(math.ceil(total_size_GiB * mm), 64)
    assert disk_GiB <= 640, "Increase memory cap"
    assert disk_GiB <= 6000, "Increase disk cap"
    
    # Get the pucks
    pucks = sample["puck"]
    for puck in pucks:
        res = !gsutil stat {puck}
        assert len(res) > 1

    # Update the configuration
    body = fapi.get_workspace_config(namespace, workspace, cnamespace, "spatial-count").json()  
    body["inputs"]["spatial_count.id"] = f'"{id}"'
    body["inputs"]["spatial_count.fastq_paths"] = f"{fastqs}".replace("'", '"')
    body["inputs"]["spatial_count.pucks"] = f"{pucks}".replace("'", '"')
    body["inputs"]["spatial_count.mem_GiB"] = f'"{mem_GiB}"'
    body["inputs"]["spatial_count.disk_GiB"] = f'"{disk_GiB}"'
    body["inputs"]["spatial_count.count_output_path"] = ''
    body["inputs"]["spatial_count.log_output_path"] = ''
    body["inputs"]["spatial_count.bucket"] = ''
    body["inputs"]["spatial_count.docker"] = ''
    fapi.update_workspace_config(namespace, workspace, cnamespace, "spatial-count", body).json()
    submit("spatial-count", f"{id}")
    return True

def run_positioning(bcl, sample, mm=2):
    pass

def run_reconstruction(bcl, sample, params="", mm=1.5):
    # Get the paths and sizes of the fastqs
    fastqs, total_size_GiB, id = get_fastqs_and_size_and_id(bcl, sample)
    mem_GiB = max(math.ceil(total_size_GiB * mm), 64)
    disk_GiB = max(math.ceil(total_size_GiB * mm), 64)
    assert mem_GiB  <= 640, "Increase memory cap"
    assert disk_GiB <= 6000, "Increase disk cap"
    
    # Update the configuration
    body = fapi.get_workspace_config(namespace, workspace, cnamespace, "reconstruction").json()  
    body["inputs"]["reconstruction.id"] = f'"{id}"'
    body["inputs"]["reconstruction.fastq_paths"] = f"{fastqs}".replace("'", '"')
    body["inputs"]["reconstruction.params"] = f'"{params}"'
    body["inputs"]["reconstruction.mem_GiB"] = f'"{mem_GiB}"'
    body["inputs"]["reconstruction.disk_GiB"] = f'"{disk_GiB}"'
    body["inputs"]["reconstruction.recon_output_path"] = ''
    body["inputs"]["reconstruction.log_output_path"] = ''
    body["inputs"]["reconstruction.bucket"] = ''
    body["inputs"]["reconstruction.docker"] = ''
    fapi.update_workspace_config(namespace, workspace, cnamespace, "reconstruction", body).json()
    submit("reconstruction", f"{id} {params}")
    return True

In [7]:
[run_spatial_count(bcl, sample, mm=2) for sample in run_data["spatial-count"]]

14 fastqs found
total size: 103 GiB
id: 240911_SL-EXD_0362_A22FLV2LT4/SI-TT-D1
Submitted spatial-count 240911_SL-EXD_0362_A22FLV2LT4/SI-TT-D1


[True]

In [8]:
run_spatial_count(bcl, run_data["spatial-count"][0], mm=2)

14 fastqs found
total size: 103 GiB
id: 240911_SL-EXD_0362_A22FLV2LT4/SI-TT-D1
Submitted spatial-count 240911_SL-EXD_0362_A22FLV2LT4/SI-TT-D1


True

In [20]:
# List all submissions
subs = fapi.list_submissions("testmybroad", "Macosko-Pipelines").json()
subs = [sub for sub in subs if sub["status"] not in ["Done","Aborted"]]
print(len(subs))

# Abort all submissions
# ids = [sub["submissionId"] for sub in subs]
# [fapi.abort_submission("testmybroad", "Macosko-Pipelines", submission_id) for submission_id in ids]

26
