Skip to content

Commit

Permalink
Merge pull request #16 from ENCODE-DCC/dev
Browse files Browse the repository at this point in the history
v0.4.0
  • Loading branch information
leepc12 committed Mar 30, 2020
2 parents 04ab396 + b3eef39 commit 533c24a
Show file tree
Hide file tree
Showing 8 changed files with 308 additions and 137 deletions.
4 changes: 2 additions & 2 deletions croo/cromwell_metadata.py
Expand Up @@ -9,7 +9,7 @@
import re
import json
import caper
from caper.caper_uri import CaperURI
from autouri import AutoURI
from collections import OrderedDict, namedtuple
from .dag import DAG

Expand Down Expand Up @@ -61,7 +61,7 @@ def find_files_in_dict(d):
elif isinstance(v, str):
maybe_files.append((v, (-1,)))
for f, shard_idx in maybe_files:
if CaperURI(f).is_valid_uri():
if AutoURI(f).is_valid:
files.append((k, f, shard_idx))
return files

Expand Down
172 changes: 97 additions & 75 deletions croo/croo.py
Expand Up @@ -8,16 +8,20 @@

import os
import sys
import logging
import json
import re
import caper
from caper.caper_uri import init_caper_uri, CaperURI, URI_LOCAL

from autouri import AutoURI, AbsPath, GCSURI, S3URI, logger
from .croo_args import parse_croo_arguments
from .croo_html_report import CrooHtmlReport
from .cromwell_metadata import CromwellMetadata


logging.basicConfig(level=logging.INFO, format='%(asctime)s|%(name)s|%(levelname)s| %(message)s')
logger = logging.getLogger(__name__)


class Croo(object):
"""Cromwell output organizer (croo)
Expand All @@ -29,31 +33,55 @@ class Croo(object):
KEY_INPUT = 'inputs'

def __init__(self, metadata_json, out_def_json, out_dir,
tmp_dir,
soft_link=True,
ucsc_genome_db=None,
ucsc_genome_pos=None,
no_graph=False):
use_presigned_url_s3=False,
use_presigned_url_gcs=False,
duration_presigned_url_s3=0,
duration_presigned_url_gcs=0,
public_gcs=False,
gcp_private_key=None,
map_path_to_url=None,
no_checksum=False):
"""Initialize croo with output definition JSON
Args:
soft_link:
DO NOT MAKE A COPY of original cromwell output
(source) on out_dir (destination).
Try to soft-link it if both src and dest are on local storage.
Otherwise, original cromwell outputs will be just referenced.
"""
self._tmp_dir = tmp_dir
if isinstance(metadata_json, dict):
self._metadata = metadata_json
else:
f = CaperURI(metadata_json).get_local_file()
f = AutoURI(metadata_json).localize_on(self._tmp_dir)
with open(f, 'r') as fp:
self._metadata = json.loads(fp.read())
if isinstance(self._metadata, list):
if len(self._metadata) > 1:
print('[Croo] Warning: multiple metadata JSON objects '
'found in metadata JSON file. Taking the first '
'one...')
logger.warning(
'Multiple metadata JSON objects '
'found in metadata JSON file. Taking the first '
'one...')
elif len(self._metadata) == 0:
raise Exception('metadata JSON file is empty')
self._metadata = self._metadata[0]
self._out_dir = out_dir
self._cm = CromwellMetadata(self._metadata)
self._ucsc_genome_db = ucsc_genome_db
self._ucsc_genome_pos = ucsc_genome_pos
self._no_graph = no_graph

self._use_presigned_url_s3 = use_presigned_url_s3
self._use_presigned_url_gcs = use_presigned_url_gcs
self._duration_presigned_url_s3 = duration_presigned_url_s3
self._duration_presigned_url_gcs = duration_presigned_url_gcs
self._public_gcs = public_gcs
self._gcp_private_key = gcp_private_key
self._map_path_to_url = map_path_to_url
self._no_checksum = no_checksum

if isinstance(out_def_json, dict):
self._out_def_json = out_def_json
Expand All @@ -66,7 +94,7 @@ def __init__(self, metadata_json, out_def_json, out_dir,
'add "#CROO out_def [URL_OR_CLOUD_URI]" '
'to your WDL')
out_def_json = out_def_json_file_from_wdl
f = CaperURI(out_def_json).get_local_file()
f = AutoURI(out_def_json).localize_on(self._tmp_dir)
with open(f, 'r') as fp:
self._out_def_json = json.loads(fp.read())

Expand All @@ -89,6 +117,13 @@ def organize_output(self):
workflow_id=self._cm.get_workflow_id(),
dag=self._task_graph,
task_graph_template=self._task_graph_template,
public_gcs=self._public_gcs,
gcp_private_key=self._gcp_private_key,
use_presigned_url_gcs=self._use_presigned_url_gcs,
use_presigned_url_s3=self._use_presigned_url_s3,
duration_presigned_url_s3 = self._duration_presigned_url_s3,
duration_presigned_url_gcs = self._duration_presigned_url_gcs,
map_path_to_url=self._map_path_to_url,
ucsc_genome_db=self._ucsc_genome_db,
ucsc_genome_pos=self._ucsc_genome_pos)

Expand All @@ -105,7 +140,7 @@ def organize_output(self):
full_path = node.output_path
shard_idx = node.shard_idx

if node_format is not None and not self._no_graph:
if node_format is not None:
interpreted_node_format = Croo.__interpret_inline_exp(
node_format, full_path, shard_idx)
if subgraph is not None:
Expand Down Expand Up @@ -141,26 +176,51 @@ def organize_output(self):
if k != output_name:
continue

target_uri = full_path
if path is not None:
interpreted_path = Croo.__interpret_inline_exp(
path, full_path, shard_idx)

# write to output directory
target_uri = os.path.join(self._out_dir,
interpreted_path)
# if soft_link, target_uri changes to original source
target_uri = CaperURI(full_path).copy(
target_uri=target_uri,
soft_link=self._soft_link)
else:
target_uri = full_path
au = AutoURI(full_path)
target_path = os.path.join(self._out_dir, interpreted_path)

if self._soft_link:
au_target = AutoURI(target_path)
if isinstance(au, AbsPath) and isinstance(au_target, AbsPath):
au.soft_link(target_path, force=True)
target_uri = target_path
else:
target_uri = full_path
else:
target_uri = au.cp(
target_path,
no_checksum=self._no_checksum,
make_md5_file=True)

# get presigned URLs if possible
target_url = None
if path is not None or table_item is not None \
or ucsc_track is not None or node_format is not None:
target_url = CaperURI(target_uri).get_url()
else:
target_url = None
u = AutoURI(target_uri)

if isinstance(u, GCSURI):
if self._public_gcs:
target_url = u.get_public_url()

elif self._use_presigned_url_gcs:
target_url = u.get_presigned_url(
duration=self._duration_presigned_url_gcs,
private_key_file=self._gcp_private_key)

elif isinstance(u, S3URI):
if self._use_presigned_url_s3:
target_url = u.get_presigned_url(
duration=self._duration_presigned_url_s3)

elif isinstance(u, AbsPath):
if self._map_path_to_url:
target_url = u.get_mapped_url(
map_path_to_url=self._map_path_to_url)

if table_item is not None:
interpreted_table_item = Croo.__interpret_inline_exp(
Expand All @@ -174,7 +234,7 @@ def organize_output(self):
ucsc_track, full_path, shard_idx)
report.add_to_ucsc_track(target_url,
interpreted_ucsc_track)
if node_format is not None and not self._no_graph:
if node_format is not None:
interpreted_node_format = Croo.__interpret_inline_exp(
node_format, full_path, shard_idx)
if subgraph is not None:
Expand Down Expand Up @@ -239,69 +299,31 @@ def __interpret_inline_exp(s, full_path, shard_idx):
return result


def init_dirs_args(args):
"""More initialization for out/tmp directories since tmp
directory is important for inter-storage transfer using
CaperURI
"""
if args['out_dir'].startswith(('http://', 'https://')):
raise ValueError('URL is not allowed for --out-dir')
elif args['out_dir'].startswith(('gs://', 's3://')):
if args.get('tmp_dir') is None:
args['tmp_dir'] = os.path.join(os.getcwd(), '.croo_tmp')
else:
args['out_dir'] = os.path.abspath(os.path.expanduser(args['out_dir']))
os.makedirs(args['out_dir'], exist_ok=True)

if args.get('tmp_dir') is None:
args['tmp_dir'] = os.path.join(args['out_dir'], '.croo_tmp')

# make temp dir
os.makedirs(args['tmp_dir'], exist_ok=True)

mapping_path_to_url = None
if args.get('tsv_mapping_path_to_url') is not None:
mapping_path_to_url = {}
f = os.path.expanduser(args.get('tsv_mapping_path_to_url'))
with open(f, 'r') as fp:
lines = fp.read().strip('\n').split('\n')
for line in lines:
k, v = line.split('\t')
mapping_path_to_url[k] = v

# init caper uri to transfer files across various storages
# e.g. gs:// to s3://, http:// to local, ...
init_caper_uri(
tmp_dir=args['tmp_dir'],
tmp_s3_bucket=None,
tmp_gcs_bucket=None,
http_user=args.get('http_user'),
http_password=args.get('http_password'),
use_gsutil_over_aws_s3=args.get('use_gsutil_over_aws_s3'),
use_presigned_url_s3=args.get('use_presigned_url_s3'),
use_presigned_url_gcs=args.get('use_presigned_url_gcs'),
gcp_private_key_file=args.get('gcp_private_key'),
public_gcs=args.get('public_gcs'),
duration_sec_presigned_url_s3=args.get('duration_presigned_url_s3'),
duration_sec_presigned_url_gcs=args.get('duration_presigned_url_gcs'),
mapping_path_to_url=mapping_path_to_url,
verbose=True)

def main():
# parse arguments. note that args is a dict
args = parse_croo_arguments()

# init out/tmp dirs and CaperURI for inter-storage transfer
init_dirs_args(args)
if args['verbose']:
logger.setLevel('INFO')
elif args['debug']:
logger.setLevel('DEBUG')

co = Croo(
metadata_json=args['metadata_json'],
out_def_json=args['out_def_json'],
out_dir=args['out_dir'],
tmp_dir=args['tmp_dir'],
soft_link=args['method'] == 'link',
ucsc_genome_db=args['ucsc_genome_db'],
ucsc_genome_pos=args['ucsc_genome_pos'],
soft_link=args['method'] == 'link',
no_graph=args['no_graph'])
use_presigned_url_s3=args['use_presigned_url_s3'],
use_presigned_url_gcs=args['use_presigned_url_gcs'],
duration_presigned_url_s3=args['duration_presigned_url_s3'],
duration_presigned_url_gcs=args['duration_presigned_url_gcs'],
public_gcs=args['public_gcs'],
gcp_private_key=args['gcp_private_key'],
map_path_to_url=args['mapping_path_to_url'],
no_checksum=args['no_checksum'])

co.organize_output()

Expand Down

0 comments on commit 533c24a

Please sign in to comment.