In [1]:
from core.utils import Tibanna
from core import ff_utils
from core.utils import run_workflow

env = 'fourfront-webprod'
tibanna = Tibanna(env=env)
ff = ff_utils.fdn_connection(key=tibanna.ff_keys)

tibanna.ff_keys['default']['server'] = 'https://data.4dnucleome.org'
raw_bucket = tibanna.s3.raw_file_bucket
out_bucket = tibanna.s3.outfile_bucket
exclude_miseq = True

def extract_file_info(obj_id, arg_name):
    """Creates the formatted dictionary for input files.
    """
    # start a dictionary
    template = {"workflow_argument_name": arg_name}
    
    # if it is list of items, change the structure
    if isinstance(obj_id, list):
        object_key = []
        uuid = []
        buckets = []
        for obj in obj_id:
            metadata = ff_utils.get_metadata(obj, connection=ff)
            object_key.append(metadata['display_title'])
            uuid.append(metadata['uuid'])
            # get the bucket
            if 'FileProcessed' in metadata['@type']:
                my_bucket = out_bucket
            else:  # covers cases of FileFastq, FileReference, FileMicroscopy
                my_bucket = raw_bucket
            buckets.append(my_bucket)
        # check bucket consistency
        try:
            assert len(list(set(buckets))) == 1
        except:
            print('Files from different buckets', obj_id)
            return
        template['object_key'] = object_key
        template['uuid'] = uuid
        template['bucket_name'] = buckets[0]
    # if obj_id is a string
    else:
        metadata = ff_utils.get_metadata(obj_id, connection=ff)
        template['object_key'] = metadata['display_title']
        template['uuid'] = metadata['uuid']
        # get the bucket
        if 'FileProcessed' in metadata['@type']:
            my_bucket = out_bucket
        else:  # covers cases of FileFastq, FileReference, FileMicroscopy
            my_bucket = raw_bucket
        template['bucket_name'] = my_bucket
    return template
    

def run_json(input_files, env, parameters, wf_uuid, wf_name, run_name):
    """Creates the trigger json that is used by tibanna.
    """
    input_json = {'input_files': input_files,
                  'output_bucket': out_bucket,
                  'workflow_uuid': wf_uuid,
                  "app_name": wf_name,
                  "parameters": parameters,
                  "config": {
                        "ebs_type": "io1",
                        "json_bucket": "4dn-aws-pipeline-run-json",
                        "ebs_iops": 500,
                        "shutdown_min": 30,
                        "s3_access_arn": "arn:aws:iam::643366669028:instance-profile/S3_access",
                        "ami_id": "ami-cfb14bb5",
                        "copy_to_s3": True,
                        "launch_instance": True,
                        "password": "dragonfly",
                        "log_bucket": "tibanna-output",
                        "script_url": "https://raw.githubusercontent.com/4dn-dcic/tibanna/master/awsf/",
                        "key_name": "4dn-encode"
                    },
                  "_tibanna": {"env": env, 
                               "run_type": wf_name,
                               "run_id": run_name},
                  "tag": '0.2.5'
                  }
    return input_json


def find_pairs(my_rep_set):
    """Find pairs and make sure they are fine my qc.
    """
    report = {}
    rep_resp = my_rep_set['experiments_in_set']
    enzymes = []
    organisms = []
    for exp in rep_resp:
        exp_resp = ff_utils.get_metadata(exp, connection=ff)
        report[exp_resp['accession']] = []
        if not organisms:
            biosample = ff_utils.get_metadata(exp_resp['biosample'], connection=ff, frame='embedded')      
            organisms = list(set([bs['individual']['organism']['display_title'] for bs in biosample['biosource']]))
            if len(organisms) != 1:
                print 'multiple organisms in set', my_rep_set['accession']
                break
        exp_files = exp_resp['files']
        enzyme = exp_resp.get('digestion_enzyme')
        enzymes.append(enzyme)
        for fastq_file in exp_files:
            file_resp = ff_utils.get_metadata(fastq_file, connection=ff)  
            # skip pair no 2
            if file_resp.get('paired_end')=='2':
                continue 
            # exclude miseq
            if exclude_miseq:
                if file_resp.get('instrument') == 'Illumina MiSeq':
                    print 'skipping miseq files', exp
                    continue
                
            #Some checks before running
            #check if status is deleted
            if file_resp['status'] == 'deleted':
                print 'deleted file', file_resp['accession'], 'in', my_rep_set['accession']
                continue
            #if no uploaded file in the file item report and skip
            if not file_resp.get('filename'):
                print file_resp['accession'], "does not have a file"
                continue
            # check if file is in s3
            head_info = tibanna.s3.does_key_exist(file_resp['upload_key'], tibanna.s3.raw_file_bucket)
            if not head_info:
                print file_resp['accession'], "does not have a file in S3"
                continue
            # check that file has a pair
            f1 = file_resp['@id']
            f2 = ''
            relations = file_resp.get('related_files')
            for relation in relations:
                if relation['relationship_type'] == 'paired with':
                    f2 = relation['file']
            if not f2:
                print f1, 'does not have a pair'
                continue
            report[exp_resp['accession']].append((f1, f2))
            
    # get the organism
    if len(list(set(organisms))) == 1:
        organism = organisms[0]
    else:
        organism = None
        print 'problematic organism', set(organisms)
        
    # get the enzyme
    if len(list(set(enzymes))) == 1:
        enz = enzymes[0].split('/')[2]
    else:
        enz = None
        print 'problematic enzyme', set(enzymes)
    return report, organism, enz


def get_wfr_out(file_id, wfr_name, file_format):
    emb_file = ff_utils.get_metadata(file_id, connection=ff, frame = 'embedded')
    workflows = emb_file.get('workflow_run_inputs')
    wfr = {}
    run_status = 'did not run'
    if workflows:
        for a_wfr in workflows:
            wfr_resp = ff_utils.get_metadata(a_wfr['uuid'], connection=ff)  
            wfr_resp_name = wfr_resp['display_title']
            if wfr_resp_name.startswith(wfr_name):
                wfr = wfr_resp
                run_status = wfr_resp['run_status']
    else:
        return "no workflow in file"
    
    if run_status == 'complete':
        outputs = wfr.get('output_files')
        file_id = [i['value'] for i in outputs if i['format'] == file_format][0]
        if file_id:
            return file_id
        else:
            return "no file found"
    else:
        return "no completed run"

def add_processed_files(item_id, list_pc, ff):
    # patch the exp or set
    patch_data = {'processed_files': list_pc}
    ff_utils.patch_metadata(patch_data, obj_id=item_id ,connection=ff)
    return

def release_files(set_id, list_items, ff):
    item_status = ff_utils.get_metadata(set_id, connection=ff)['status']
    # bring files to same status as experiments and sets
    if item_status in ['released', 'released to project']:
        for a_file in list_items:
            it_resp = ff_utils.get_metadata(a_file, connection=ff)
            workflow = it_resp.get('workflow_run_outputs')
            # release the wfr that produced the file
            if workflow:
                ff_utils.patch_metadata({"status":item_status}, obj_id=workflow[0] ,connection=ff)  
            ff_utils.patch_metadata({"status":item_status}, obj_id=a_file ,connection=ff)

            
def run_missing_wfr(wf_info, input_files, run_name ,ff):
    all_inputs = []
    for arg, files in input_files.iteritems():
        inp = extract_file_info(files, arg)
        all_inputs.append(inp)
    wf_name = wf_info['wf_name']
    wf_uuid = wf_info['wf_uuid']
    parameters = wf_info['parameters']
    input_json = run_json(all_inputs, env, parameters, wf_uuid, wf_name, run_name)
    # print input_json
    run_workflow(input_json)
        

In [2]:
import time
from datetime import datetime

# for a given experiment set and some parameters like instrument
# print set of files and their partA hic workflow status
# if there are one that are running report the number of running cases
# if there are file pairs that don't have a corresponding part A, report them separately

wf_dict =[
    {'wf_name': 'bwa-mem',
     'wf_uuid': '3feedadc-50f9-4bb4-919b-09a8b731d0cc',
     'parameters':{"nThreads": 16},
    },
    {'wf_name': 'hi-c-processing-bam',
     'wf_uuid': '023bfb3e-9a8b-42b9-a9d4-216079526f68',
     'parameters':{"nthreads_merge": 16, "nthreads_parse_sort": 16},
    },
    {'wf_name': 'hi-c-processing-pairs',
     'wf_uuid': 'c9e0e6f7-b0ed-4a42-9466-cadc2dd84df0',
     'parameters': {"nthreads": 1, "maxmem": "32g"},
    }    
]

# url for hic exps
exp_types = ['in%20situ%20Hi-C', 'dilution%20Hi-C']
set_url = '/search/?'+'&'.join(['experiments_in_set.experiment_type='+i for i in exp_types])+'&type=ExperimentSetReplicate'
run_sets = ff_utils.get_metadata(set_url , connection=ff)['@graph']

add_pc = True
add_rel = True
add_wfr = False

#test_set = '4DNES2R6PUEK'
#test_set = '4DNESZ2PVZWR'
#run_sets = [ff_utils.get_metadata(test_set , connection=ff)]
counter = 0
completed = 0
completed_acc = []
print len(run_sets)
for a_set in run_sets: 
    counter += 1
    print

    if "HiC_Pipeline_0.2.5" in a_set.get('completed_processes', []):
        print counter, a_set['accession'], 'complete'
        continue  


    fastqpairs, organism, enzyme = find_pairs(a_set)
    
    if organism not in  ['human']:
        print counter, a_set['accession'], 'skipping non human'
        continue
    
    if enzyme not in ['MboI', 'DpnII', 'HindIII']:
        print counter, a_set['accession'], 'skipping not ready NZ', enzyme
        continue
        
    print counter, a_set['accession']
    print enzyme, organism
    part3 = 'done'
    list_release = []
    set_pairs = []        
    # cycle through the experiments
    for exp in fastqpairs.keys():
        if not fastqpairs.get(exp):
            print(exp, 'does not have any fastq pairs')
            continue
        # Check Part 1 and See if all are okay
        exp_bams = []
        part1 = 'done'
        part2 = 'done'
        for pair in fastqpairs[exp]:
            #############
            bam1 = get_wfr_out(pair[0], 'bwa-mem 0.2.5', 'bam')
            bam2 = get_wfr_out(pair[1], 'bwa-mem 0.2.5', 'bam')
            # if run is not successful
            if bam1.startswith('no') or not bam1 or bam1 != bam2:
                part1 = 'not ready'
                if add_wfr:
                    # find the correct index
                    if organism == 'human':
                        bwa = '4DNFIZQZ39L9'
                    elif organism == 'mouse':
                        bwa = '4DNFI823LSI8'
                    else:
                        print 'not yet usable', organism
                        continue

                    inp_f = {'fastq1':pair[0], 'fastq2':pair[1], 'bwa_index':bwa}
                    #run_missing_wfr(wf_dict[0], inp_f, pickaname, ff) 
            # if successful
            else:
                exp_bams.append(bam1)
                list_release.append(bam1)
        # stop progress to part2 
        if part1 is not 'done':
            print exp, 'has missing Part1 runs'
            part2 = 'not ready'
            part3 = 'not ready'
            continue
        print exp, 'part1 complete'
        #check if part 2 is run already, it not start the run
        exp_com_bam = []
        exp_pairs = []
        for bam in exp_bams:
            com_bam = get_wfr_out(bam, 'hi-c-processing-bam 0.2.5', 'bam')
            pairs = get_wfr_out(bam, 'hi-c-processing-bam 0.2.5', 'pairs')
            # try to run if missing
            if pairs.startswith('no') or not pairs:
                part2 = 'not ready'   
            else:
                exp_com_bam.append(com_bam)
                exp_pairs.append(pairs)
        
        # make sure all bams went through the same wfr and produces same file
        if part2 != 'done' or len(list(set(exp_com_bam))) != 1 or len(list(set(exp_pairs))) !=1:
            print exp, 'Part2 did not complete'
            part3 = 'not ready' 
        
            if add_wfr:
                # find the correct chrsize
                if organism == 'human':
                    chrsize = '4DNFI823LSII'
#                     elif organism == 'mouse':
#                         chrsize = ''
                else:
                    print 'not yet usable', organism
                    continue
                # make sure no duplicates
                inp_f = {'input_bams':exp_bams, 'chromsize':chrsize}           
                run_missing_wfr(wf_dict[1], inp_f, exp, ff)   
            continue
            
        # add bam and pairs to exp proc file
        list_release.extend([exp_com_bam[0],exp_pairs[0]])
        if add_pc:
            add_processed_files(exp, [exp_com_bam[0],exp_pairs[0]], ff)
        
        print exp, 'part2 complete'
        set_pairs.append(exp_pairs[0])
    
    if part3 != 'done':
        print 'Part3 not ready'
        continue
    
    if not set_pairs:
        print 'no pairs can be produced from this set'
        continue
        
    merged_pairs = []
    for set_pair in set_pairs:
        merged_pair = get_wfr_out(set_pair, 'hi-c-processing-pairs 0.2.5', 'pairs')
        hic = get_wfr_out(set_pair, 'hi-c-processing-pairs 0.2.5', 'hic')
        mcool = get_wfr_out(set_pair, 'hi-c-processing-pairs 0.2.5', 'mcool')
        normvec = get_wfr_out(set_pair, 'hi-c-processing-pairs 0.2.5', 'normvector_juicerformat')
        
        if merged_pair.startswith('no') or not merged_pair:
            part3 = 'part3 did not complete'              
        else:
            merged_pairs.append(merged_pair)
                
    if part3 != 'done' or len(list(set(merged_pairs))) != 1:
        print a_set['accession'], 'is missing Part3'
        part3 = 'not ready'
        
        if add_wfr:
            # find the correct chrsize
            if organism == 'human':
                chrsize = '/files-reference/4DNFI823LSII/'
#                     elif organism == 'mouse':
#                         chrsize = ''
            else:
                print 'not yet usable', organism
                continue
            # find enzyme
            res_file = ''
            if organism == 'human':
                res_enzymes = {'MboI':'/files-reference/4DNFI823L812/',
                               'DpnII':'/files-reference/4DNFIBNAPW30/',
                               'HindIII':'/files-reference/4DNFI823MBKE/'}
                res_file = res_enzymes.get(enzyme)
                
            if not res_file:
                print 'restriction enzyme not ready', enzyme
                continue
            inp_f = {'input_pairs':set_pairs, 'chromsizes':chrsize, 'restriction_file': res_file} 
            run_missing_wfr(wf_dict[2], inp_f, a_set['accession'], ff)
        
        continue

    #####
    #add competed flag to experiment
    if add_pc and add_rel:
        ff_utils.patch_metadata({"completed_processes":["HiC_Pipeline_0.2.5"]}, obj_id=a_set['accession'] ,connection=ff)
    
    # add processed files to set
    list_release.extend([merged_pair, hic, mcool, normvec])
    if add_pc:
        add_processed_files(a_set['accession'], [merged_pair, hic, mcool, normvec], ff)
    
    #release files and wfrs
    if add_rel:
        release_files(a_set['accession'], list(set(list_release)), ff)
    
    completed += 1
    completed_acc.append(a_set['accession'])
    print a_set['accession'], 'part3 complete'
    
print completed
print completed_acc

89

1 4DNES18BMU79 skipping non human

2 4DNESH4UTRNL skipping non human

3 4DNESNYBDSLY skipping non human

4 4DNES54YB6TQ skipping non human

5 4DNESRE7AK5U skipping non human

6 4DNES425UDGS skipping non human

7 4DNESEPDL6KY skipping non human

8 4DNES2R6PUEK
DpnII human
4DNEXRAEERUF part1 complete
4DNEXRAEERUF part2 complete
4DNEX7POCO84 part1 complete
4DNEX7POCO84 part2 complete
4DNES2R6PUEK part3 complete

9 4DNESRJ8KV4Q complete

10 4DNESY859VLG skipping not ready NZ NcoI

11 4DNES78Y8Y5K complete

12 4DNESB6MNCFE complete

13 4DNES4DGHDMX skipping not ready NZ NcoI

14 4DNES8J78WV2 complete

15 4DNESAPF27TG complete

16 4DNESVUMGLG2 complete

17 4DNES7L8Z2KV skipping non human

18 4DNES49TDMJM skipping non human

19 4DNESWEF2AHT skipping non human

20 4DNESHGZUBL9 skipping non human

21 4DNES5R3O24W skipping non human

22 4DNESU4BQU4G skipping non human

23 4DNES9L4AK6Q complete

24 4DNES2M5JIGV complete

25 4DNES4296C5Q
HindIII human
4DNEX8QF8KNE part1 complete
4DNEX8QF8KNE p



4DNEXITSTLGI part2 complete
4DNESYPKLMAM is missing Part3

74 4DNESECNR4O8 complete

75 4DNESUB35TII complete

76 4DNESIE5R9HS complete

77 4DNESSM1H92K complete

78 4DNES1ZEJNRU complete

79 4DNES4269GKX complete

80 4DNESZMKWAD3
MboI human
4DNEXHGRTX1A part1 complete
4DNEXHGRTX1A part2 complete
4DNESZMKWAD3 is missing Part3

skipping miseq files /experiments-hi-c/4DNEXVL9RXOG/
skipping miseq files /experiments-hi-c/4DNEXNHE6X77/




skipping miseq files /experiments-hi-c/4DNEXOVQXQZY/
skipping miseq files /experiments-hi-c/4DNEXW3JCBAV/
skipping miseq files /experiments-hi-c/4DNEXGQ2MNUZ/
81 4DNESQWI9K2F
DpnII human
4DNEXVL9RXOG part1 complete
4DNEXVL9RXOG part2 complete
4DNEX17IBYLJ part1 complete
4DNEX17IBYLJ part2 complete
4DNEXAW9SYPS part1 complete
4DNEXAW9SYPS part2 complete
(u'4DNEXOVQXQZY', 'does not have any fastq pairs')
4DNEX7T12MC9 part1 complete
4DNEX7T12MC9 part2 complete
4DNEXNHE6X77 part1 complete
4DNEXNHE6X77 part2 complete
(u'4DNEXGQ2MNUZ', 'does not have any fastq pairs')
(u'4DNEXW3JCBAV', 'does not have any fastq pairs')




4DNESQWI9K2F part3 complete

skipping miseq files /experiments-hi-c/4DNEXJ5OZX1O/
skipping miseq files /experiments-hi-c/4DNEX8G6CDI6/
skipping miseq files /experiments-hi-c/4DNEXNZYARGL/
skipping miseq files /experiments-hi-c/4DNEX9G4FVK5/
skipping miseq files /experiments-hi-c/4DNEXYJQL5PX/
skipping miseq files /experiments-hi-c/4DNEXND9HATI/
skipping miseq files /experiments-hi-c/4DNEX7ZLCUVL/
skipping miseq files /experiments-hi-c/4DNEX2XV7VGV/
skipping miseq files /experiments-hi-c/4DNEX41ZSSQR/
skipping miseq files /experiments-hi-c/4DNEX1EBZK51/
skipping miseq files /experiments-hi-c/4DNEXJ5UB1VB/
skipping miseq files /experiments-hi-c/4DNEXH2AVZCZ/
skipping miseq files /experiments-hi-c/4DNEX573LY53/
skipping miseq files /experiments-hi-c/4DNEXSGJIL3F/
skipping miseq files /experiments-hi-c/4DNEXG994B2I/
skipping miseq files /experiments-hi-c/4DNEX7S8LJQJ/
skipping miseq files /experiments-hi-c/4DNEX2UWFFCL/
skipping miseq files /experiments-hi-c/4DNEXDMYMEL2/
skipping miseq fi

4DNEXS8V14UB part1 complete
4DNEXS8V14UB part2 complete
(u'4DNEXZKBU7YD', 'does not have any fastq pairs')
(u'4DNEX9ROG1T9', 'does not have any fastq pairs')
(u'4DNEXMJINPYT', 'does not have any fastq pairs')
4DNEX1I1E292 part1 complete
4DNEX1I1E292 part2 complete
4DNEX5D543VH part1 complete
4DNEX5D543VH part2 complete
(u'4DNEXGICX1EU', 'does not have any fastq pairs')
(u'4DNEX6JBYI4K', 'does not have any fastq pairs')
4DNEXEJM8STO part1 complete
4DNEXEJM8STO part2 complete
4DNES3JX38V5 is missing Part3

skipping miseq files /experiments-hi-c/4DNEXKSMJTC7/
89 4DNESCIHJOXA
MboI human
4DNEX2G1IW8D part1 complete
4DNEX2G1IW8D part2 complete
4DNEXGPAWPI8 part1 complete
4DNEXGPAWPI8 part2 complete
(u'4DNEXKSMJTC7', 'does not have any fastq pairs')
4DNESCIHJOXA part3 complete
8
[u'4DNES2R6PUEK', u'4DNES4296C5Q', u'4DNESJIYRA44', u'4DNESVKLYDOH', u'4DNESQWI9K2F', u'4DNESJFTAURO', u'4DNESLQG7ZKJ', u'4DNESCIHJOXA']
