From 155f9eb445a92a02d97811cbd8c3096f677e9e0e Mon Sep 17 00:00:00 2001 From: Jin Lee Date: Mon, 16 Mar 2020 18:58:23 -0700 Subject: [PATCH 01/13] fix: presigninig duration type mismatch --- croo/croo_args.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/croo/croo_args.py b/croo/croo_args.py index 51d7a39..9f0a218 100755 --- a/croo/croo_args.py +++ b/croo/croo_args.py @@ -63,11 +63,11 @@ def parse_croo_arguments(): 'Google Cloud Platform (GCP). This key will be used to make ' 'presigned URLs on files on gs://.') p.add_argument( - '--duration-presigned-url-s3', + '--duration-presigned-url-s3', type=int, default=MAX_DURATION_SEC_PRESIGNED_URL_S3, help='Duration for presigned URLs for files on s3:// in seconds. ') p.add_argument( - '--duration-presigned-url-gcs', + '--duration-presigned-url-gcs', type=int, default=MAX_DURATION_SEC_PRESIGNED_URL_GCS, help='Duration for presigned URLs for files on gs:// in seconds. ') p.add_argument( From 52acfcb86801cc1a26e58eb403b9ca77c1bcad09 Mon Sep 17 00:00:00 2001 From: Jin Lee Date: Mon, 16 Mar 2020 18:58:42 -0700 Subject: [PATCH 02/13] ver: 0.3.4 -> 0.3.5 --- croo/croo_args.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/croo/croo_args.py b/croo/croo_args.py index 9f0a218..ad7c9cd 100755 --- a/croo/croo_args.py +++ b/croo/croo_args.py @@ -13,7 +13,7 @@ MAX_DURATION_SEC_PRESIGNED_URL_GCS) -__version__ = '0.3.4' +__version__ = '0.3.5' def parse_croo_arguments(): """Argument parser for Cromwell Output Organizer (COO) From e9ae555380645190a33d7fd4e064b78358aaf2c6 Mon Sep 17 00:00:00 2001 From: Jin Lee Date: Wed, 25 Mar 2020 10:39:00 -0700 Subject: [PATCH 03/13] replace CaperURI with AutoURI --- croo/cromwell_metadata.py | 4 +- croo/croo.py | 119 +++++++++++++++++++--------- croo/croo_args.py | 28 ++++--- croo/croo_html_report.py | 4 +- croo/croo_html_report_file_table.py | 4 +- croo/croo_html_report_task_graph.py | 7 +- croo/croo_html_report_tracks.py | 6 +- setup.py | 2 +- 8 files changed, 110 insertions(+), 64 deletions(-) diff --git a/croo/cromwell_metadata.py b/croo/cromwell_metadata.py index 2772b9c..173a62b 100644 --- a/croo/cromwell_metadata.py +++ b/croo/cromwell_metadata.py @@ -9,7 +9,7 @@ import re import json import caper -from caper.caper_uri import CaperURI +from autouri import AutoURI from collections import OrderedDict, namedtuple from .dag import DAG @@ -61,7 +61,7 @@ def find_files_in_dict(d): elif isinstance(v, str): maybe_files.append((v, (-1,))) for f, shard_idx in maybe_files: - if CaperURI(f).is_valid_uri(): + if AutoURI(f).is_valid: files.append((k, f, shard_idx)) return files diff --git a/croo/croo.py b/croo/croo.py index e0e4d7a..8e06158 100755 --- a/croo/croo.py +++ b/croo/croo.py @@ -11,8 +11,7 @@ import json import re import caper -from caper.caper_uri import init_caper_uri, CaperURI, URI_LOCAL - +from autouri import AutoURI, AbsPath, GCSURI, S3URI, logger from .croo_args import parse_croo_arguments from .croo_html_report import CrooHtmlReport from .cromwell_metadata import CromwellMetadata @@ -29,16 +28,25 @@ class Croo(object): KEY_INPUT = 'inputs' def __init__(self, metadata_json, out_def_json, out_dir, + tmp_dir, soft_link=True, ucsc_genome_db=None, ucsc_genome_pos=None, + use_presigned_url_s3=False, + use_presigned_url_gcs=False, + duration_presigned_url_s3=0, + duration_presigned_url_gcs=0, + public_gcs=False, + gcp_private_key=None, + map_path_to_url=None, no_graph=False): """Initialize croo with output definition JSON """ + self._tmp_dir = tmp_dir if isinstance(metadata_json, dict): self._metadata = metadata_json else: - f = CaperURI(metadata_json).get_local_file() + f = AutoURI(metadata_json).localize_on(self._tmp_dir) with open(f, 'r') as fp: self._metadata = json.loads(fp.read()) if isinstance(self._metadata, list): @@ -53,6 +61,15 @@ def __init__(self, metadata_json, out_def_json, out_dir, self._cm = CromwellMetadata(self._metadata) self._ucsc_genome_db = ucsc_genome_db self._ucsc_genome_pos = ucsc_genome_pos + + self._use_presigned_url_s3 = use_presigned_url_s3 + self._use_presigned_url_gcs = use_presigned_url_gcs + self._duration_presigned_url_s3 = duration_presigned_url_s3 + self._duration_presigned_url_gcs = duration_presigned_url_gcs + self._public_gcs = public_gcs + self._gcp_private_key = gcp_private_key + self._map_path_to_url = map_path_to_url + self._no_graph = no_graph if isinstance(out_def_json, dict): @@ -66,7 +83,7 @@ def __init__(self, metadata_json, out_def_json, out_dir, 'add "#CROO out_def [URL_OR_CLOUD_URI]" ' 'to your WDL') out_def_json = out_def_json_file_from_wdl - f = CaperURI(out_def_json).get_local_file() + f = AutoURI(out_def_json).localize_on(self._tmp_dir) with open(f, 'r') as fp: self._out_def_json = json.loads(fp.read()) @@ -141,26 +158,44 @@ def organize_output(self): if k != output_name: continue + target_uri = full_path if path is not None: interpreted_path = Croo.__interpret_inline_exp( path, full_path, shard_idx) - # write to output directory - target_uri = os.path.join(self._out_dir, - interpreted_path) - # if soft_link, target_uri changes to original source - target_uri = CaperURI(full_path).copy( - target_uri=target_uri, - soft_link=self._soft_link) - else: - target_uri = full_path + u = AutoURI(full_path) + target_path = os.path.join(self._out_dir, interpreted_path) + + if self._soft_link: + if isinstance(u, AbsPath): + u.soft_link(target_path, force=True) + target_uri = target_path + else: + target_uri = u.cp(target_path, make_md5_file=True) # get presigned URLs if possible + target_url = None if path is not None or table_item is not None \ or ucsc_track is not None or node_format is not None: - target_url = CaperURI(target_uri).get_url() - else: - target_url = None + print(target_uri) + u = AutoURI(target_uri) + + if isinstance(u, GCSURI): + if self._public_gcs: + target_url = u.get_public_url() + + elif self._use_presigned_url_gcs: + target_url = u.get_presigned_url( + duration=self._duration_presigned_url_gcs, + private_key_file=self._gcp_private_key) + + elif isinstance(u, S3URI) and self._use_presigned_url_s3: + target_url = u.get_presigned_url( + duration=self._duration_presigned_url_s3) + + elif isinstance(u, AbsPath): + target_url = u.get_mapped_url( + map_path_to_url=self._map_path_to_url) if table_item is not None: interpreted_table_item = Croo.__interpret_inline_exp( @@ -242,8 +277,17 @@ def __interpret_inline_exp(s, full_path, shard_idx): def init_dirs_args(args): """More initialization for out/tmp directories since tmp directory is important for inter-storage transfer using - CaperURI + AutoURI """ + if args.get('tmp_dir') is None: + pass + elif args['tmp_dir'].startswith(('http://', 'https://')): + raise ValueError('URL is not allowed for --tmp-dir') + elif args['tmp_dir'].startswith(('gs://', 's3://')): + raise ValueError('Cloud URI is not allowed for --tmp-dir') + if args.get('tmp_dir') is not None: + args['tmp_dir'] = os.path.abspath(os.path.expanduser(args['tmp_dir'])) + if args['out_dir'].startswith(('http://', 'https://')): raise ValueError('URL is not allowed for --out-dir') elif args['out_dir'].startswith(('gs://', 's3://')): @@ -259,7 +303,6 @@ def init_dirs_args(args): # make temp dir os.makedirs(args['tmp_dir'], exist_ok=True) - mapping_path_to_url = None if args.get('tsv_mapping_path_to_url') is not None: mapping_path_to_url = {} f = os.path.expanduser(args.get('tsv_mapping_path_to_url')) @@ -268,39 +311,39 @@ def init_dirs_args(args): for line in lines: k, v = line.split('\t') mapping_path_to_url[k] = v + args['mapping_path_to_url'] = mapping_path_to_url + else: + args['mapping_path_to_url'] = None + + if args['verbose']: + logger.setLevel('INFO') + elif args['debug']: + logger.setLevel('DEBUG') + + GCSURI.init_gcsuri( + use_gsutil_for_s3=args['use_gsutil_for_s3']) - # init caper uri to transfer files across various storages - # e.g. gs:// to s3://, http:// to local, ... - init_caper_uri( - tmp_dir=args['tmp_dir'], - tmp_s3_bucket=None, - tmp_gcs_bucket=None, - http_user=args.get('http_user'), - http_password=args.get('http_password'), - use_gsutil_over_aws_s3=args.get('use_gsutil_over_aws_s3'), - use_presigned_url_s3=args.get('use_presigned_url_s3'), - use_presigned_url_gcs=args.get('use_presigned_url_gcs'), - gcp_private_key_file=args.get('gcp_private_key'), - public_gcs=args.get('public_gcs'), - duration_sec_presigned_url_s3=args.get('duration_presigned_url_s3'), - duration_sec_presigned_url_gcs=args.get('duration_presigned_url_gcs'), - mapping_path_to_url=mapping_path_to_url, - verbose=True) def main(): # parse arguments. note that args is a dict args = parse_croo_arguments() - - # init out/tmp dirs and CaperURI for inter-storage transfer init_dirs_args(args) co = Croo( metadata_json=args['metadata_json'], out_def_json=args['out_def_json'], out_dir=args['out_dir'], + tmp_dir=args['tmp_dir'], + soft_link=args['method'] == 'link', ucsc_genome_db=args['ucsc_genome_db'], ucsc_genome_pos=args['ucsc_genome_pos'], - soft_link=args['method'] == 'link', + use_presigned_url_s3=args['use_presigned_url_s3'], + use_presigned_url_gcs=args['use_presigned_url_gcs'], + duration_presigned_url_s3=args['duration_presigned_url_s3'], + duration_presigned_url_gcs=args['duration_presigned_url_gcs'], + public_gcs=args['public_gcs'], + gcp_private_key=args['gcp_private_key'], + map_path_to_url=args['mapping_path_to_url'], no_graph=args['no_graph']) co.organize_output() diff --git a/croo/croo_args.py b/croo/croo_args.py index ad7c9cd..5db63f7 100755 --- a/croo/croo_args.py +++ b/croo/croo_args.py @@ -8,9 +8,8 @@ import argparse import sys -from caper.caper_uri import ( - MAX_DURATION_SEC_PRESIGNED_URL_S3, - MAX_DURATION_SEC_PRESIGNED_URL_GCS) +import logging +from autouri import AutoURI, S3URI, GCSURI __version__ = '0.3.5' @@ -64,11 +63,11 @@ def parse_croo_arguments(): 'presigned URLs on files on gs://.') p.add_argument( '--duration-presigned-url-s3', type=int, - default=MAX_DURATION_SEC_PRESIGNED_URL_S3, + default=S3URI.DURATION_PRESIGNED_URL, help='Duration for presigned URLs for files on s3:// in seconds. ') p.add_argument( '--duration-presigned-url-gcs', type=int, - default=MAX_DURATION_SEC_PRESIGNED_URL_GCS, + default=GCSURI.DURATION_PRESIGNED_URL, help='Duration for presigned URLs for files on gs:// in seconds. ') p.add_argument( '--tsv-mapping-path-to-url', @@ -87,17 +86,20 @@ def parse_croo_arguments(): 'stored here. You can clean it up but will lose all cached files ' 'so that remote files will be re-downloaded.') p.add_argument( - '--use-gsutil-over-aws-s3', action='store_true', - help='Use gsutil instead of aws s3 CLI even for S3 buckets.') - p.add_argument( - '--http-user', - help='Username to download data from private URLs') - p.add_argument( - '--http-password', - help='Password to download data from private URLs') + '--use-gsutil-for-s3', action='store_true', + help='Use gsutil for direct tranfer between GCS and S3 buckets. ' + 'Make sure that you have "gsutil" installed and configured ' + 'to have access to credentials for GCS and S3 ' + '(e.g. ~/.boto or ~/.aws/credientials)') p.add_argument('-v', '--version', action='store_true', help='Show version') + group_log_level = p.add_mutually_exclusive_group() + group_log_level.add_argument('-V', '--verbose', action='store_true', + help='Prints all logs >= INFO level') + group_log_level.add_argument('-d', '--debug', action='store_true', + help='Prints all logs >= DEBUG level') + if '-v' in sys.argv or '--version' in sys.argv: print(__version__) p.exit() diff --git a/croo/croo_html_report.py b/croo/croo_html_report.py index 94b946d..f75403a 100755 --- a/croo/croo_html_report.py +++ b/croo/croo_html_report.py @@ -9,7 +9,7 @@ from .croo_html_report_tracks import CrooHtmlReportUCSCTracks from .croo_html_report_file_table import CrooHtmlReportFileTable from .croo_html_report_task_graph import CrooHtmlReportTaskGraph -from caper.caper_uri import CaperURI +from autouri import AutoURI class CrooHtmlReport(object): @@ -79,5 +79,5 @@ def save_to_file(self): self._out_dir, CrooHtmlReport.REPORT_HTML.format( workflow_id=self._workflow_id)) - CaperURI(uri_report).write_str_to_file(html) + AutoURI(uri_report).write(html) return html \ No newline at end of file diff --git a/croo/croo_html_report_file_table.py b/croo/croo_html_report_file_table.py index 8019775..991843b 100755 --- a/croo/croo_html_report_file_table.py +++ b/croo/croo_html_report_file_table.py @@ -5,7 +5,7 @@ """ import os -from caper.caper_uri import CaperURI, URI_LOCAL, URI_URL +from autouri import AutoURI class CrooHtmlReportFileTable(object): @@ -134,5 +134,5 @@ def dir_first(s): self._out_dir, CrooHtmlReportFileTable.FILETABLE_TSV.format( workflow_id=self._workflow_id)) - CaperURI(uri_filetable).write_str_to_file(contents) + AutoURI(uri_filetable).write(contents) return table_contents diff --git a/croo/croo_html_report_task_graph.py b/croo/croo_html_report_task_graph.py index 2f18b4b..a83ffa9 100755 --- a/croo/croo_html_report_task_graph.py +++ b/croo/croo_html_report_task_graph.py @@ -8,7 +8,7 @@ import os from copy import deepcopy from base64 import b64encode -from caper.caper_uri import CaperURI, URI_LOCAL, URI_URL +from autouri import AutoURI class CrooHtmlReportTaskGraph(object): @@ -26,6 +26,7 @@ def __init__(self, out_dir, workflow_id, dag, template_d): A template dict that will be converted to a template dot file for graphviz This dot file will be converted into SVG and finally be embedded in HTML Refer to the function caper.dict_tool.dict_to_dot_str() for details + https://github.com/ENCODE-DCC/caper/blob/master/caper/dict_tool.py#L190 """ self._out_dir = out_dir self._workflow_id = workflow_id @@ -97,7 +98,7 @@ def fnc_subgraph(n): self._out_dir, CrooHtmlReportTaskGraph.TASK_GRAPH_DOT.format( workflow_id=self._workflow_id)) - CaperURI(uri_dot).write_str_to_file(dot_str) + AutoURI(uri_dot).write(dot_str) # save to SVG with open (svg, 'r') as fp: @@ -106,7 +107,7 @@ def fnc_subgraph(n): self._out_dir, CrooHtmlReportTaskGraph.TASK_GRAPH_SVG.format( workflow_id=self._workflow_id)) - CaperURI(uri_svg).write_str_to_file(svg_contents) + AutoURI(uri_svg).write(svg_contents) os.remove(tmp_dot) os.remove(svg) diff --git a/croo/croo_html_report_tracks.py b/croo/croo_html_report_tracks.py index cd58a07..604e79c 100755 --- a/croo/croo_html_report_tracks.py +++ b/croo/croo_html_report_tracks.py @@ -7,7 +7,7 @@ import os import urllib.parse -from caper.caper_uri import CaperURI +from autouri import AutoURI class CrooHtmlReportUCSCTracks(object): @@ -82,14 +82,14 @@ def get_html_body_str(self): self._out_dir, CrooHtmlReportUCSCTracks.UCSC_TRACKS_TXT.format( workflow_id=self._workflow_id)) - CaperURI(uri_txt).write_str_to_file(txt) + AutoURI(uri_txt).write(txt) # save to URL uri_url = os.path.join( self._out_dir, CrooHtmlReportUCSCTracks.UCSC_TRACKS_URL.format( workflow_id=self._workflow_id)) - CaperURI(uri_url).write_str_to_file(url) + AutoURI(uri_url).write(url) return html diff --git a/setup.py b/setup.py index 3dad79c..4d5b49e 100644 --- a/setup.py +++ b/setup.py @@ -21,5 +21,5 @@ 'License :: OSI Approved :: MIT License', 'Operating System :: POSIX :: Linux', ], - install_requires=['caper>=0.6.1', 'graphviz'] + install_requires=['autouri>=0.1.1', 'graphviz'] ) From 0f835a493b3dfa82ede0b8e3801cb8d18fbf8a0f Mon Sep 17 00:00:00 2001 From: Jin Lee Date: Thu, 26 Mar 2020 15:06:13 -0700 Subject: [PATCH 04/13] replace old loc module (CaperURI) with AutoURI --- croo/croo.py | 87 +++++++++++++------------------------------ croo/croo_args.py | 94 ++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 114 insertions(+), 67 deletions(-) diff --git a/croo/croo.py b/croo/croo.py index 8e06158..27b521a 100755 --- a/croo/croo.py +++ b/croo/croo.py @@ -39,8 +39,15 @@ def __init__(self, metadata_json, out_def_json, out_dir, public_gcs=False, gcp_private_key=None, map_path_to_url=None, + no_checksum=False, no_graph=False): """Initialize croo with output definition JSON + Args: + soft_link: + DO NOT MAKE A COPY of original cromwell output + (source) on out_dir (destination). + Try to soft-link it if both src and dest are on local storage. + Otherwise, original cromwell outputs will be just referenced. """ self._tmp_dir = tmp_dir if isinstance(metadata_json, dict): @@ -69,6 +76,7 @@ def __init__(self, metadata_json, out_def_json, out_dir, self._public_gcs = public_gcs self._gcp_private_key = gcp_private_key self._map_path_to_url = map_path_to_url + self._no_checksum = no_checksum self._no_graph = no_graph @@ -163,21 +171,26 @@ def organize_output(self): interpreted_path = Croo.__interpret_inline_exp( path, full_path, shard_idx) - u = AutoURI(full_path) + au = AutoURI(full_path) target_path = os.path.join(self._out_dir, interpreted_path) if self._soft_link: - if isinstance(u, AbsPath): - u.soft_link(target_path, force=True) + au_target = AutoURI(target_path) + if isinstance(au, AbsPath) and isinstance(au_target, AbsPath): + au.soft_link(target_path, force=True) target_uri = target_path + else: + target_uri = full_path else: - target_uri = u.cp(target_path, make_md5_file=True) + target_uri = au.cp( + target_path, + no_checksum=self._no_checksum, + make_md5_file=True) # get presigned URLs if possible target_url = None if path is not None or table_item is not None \ or ucsc_track is not None or node_format is not None: - print(target_uri) u = AutoURI(target_uri) if isinstance(u, GCSURI): @@ -189,13 +202,15 @@ def organize_output(self): duration=self._duration_presigned_url_gcs, private_key_file=self._gcp_private_key) - elif isinstance(u, S3URI) and self._use_presigned_url_s3: - target_url = u.get_presigned_url( - duration=self._duration_presigned_url_s3) + elif isinstance(u, S3URI): + if self._use_presigned_url_s3: + target_url = u.get_presigned_url( + duration=self._duration_presigned_url_s3) elif isinstance(u, AbsPath): - target_url = u.get_mapped_url( - map_path_to_url=self._map_path_to_url) + if self._map_path_to_url: + target_url = u.get_mapped_url( + map_path_to_url=self._map_path_to_url) if table_item is not None: interpreted_table_item = Croo.__interpret_inline_exp( @@ -274,60 +289,9 @@ def __interpret_inline_exp(s, full_path, shard_idx): return result -def init_dirs_args(args): - """More initialization for out/tmp directories since tmp - directory is important for inter-storage transfer using - AutoURI - """ - if args.get('tmp_dir') is None: - pass - elif args['tmp_dir'].startswith(('http://', 'https://')): - raise ValueError('URL is not allowed for --tmp-dir') - elif args['tmp_dir'].startswith(('gs://', 's3://')): - raise ValueError('Cloud URI is not allowed for --tmp-dir') - if args.get('tmp_dir') is not None: - args['tmp_dir'] = os.path.abspath(os.path.expanduser(args['tmp_dir'])) - - if args['out_dir'].startswith(('http://', 'https://')): - raise ValueError('URL is not allowed for --out-dir') - elif args['out_dir'].startswith(('gs://', 's3://')): - if args.get('tmp_dir') is None: - args['tmp_dir'] = os.path.join(os.getcwd(), '.croo_tmp') - else: - args['out_dir'] = os.path.abspath(os.path.expanduser(args['out_dir'])) - os.makedirs(args['out_dir'], exist_ok=True) - - if args.get('tmp_dir') is None: - args['tmp_dir'] = os.path.join(args['out_dir'], '.croo_tmp') - - # make temp dir - os.makedirs(args['tmp_dir'], exist_ok=True) - - if args.get('tsv_mapping_path_to_url') is not None: - mapping_path_to_url = {} - f = os.path.expanduser(args.get('tsv_mapping_path_to_url')) - with open(f, 'r') as fp: - lines = fp.read().strip('\n').split('\n') - for line in lines: - k, v = line.split('\t') - mapping_path_to_url[k] = v - args['mapping_path_to_url'] = mapping_path_to_url - else: - args['mapping_path_to_url'] = None - - if args['verbose']: - logger.setLevel('INFO') - elif args['debug']: - logger.setLevel('DEBUG') - - GCSURI.init_gcsuri( - use_gsutil_for_s3=args['use_gsutil_for_s3']) - def main(): - # parse arguments. note that args is a dict args = parse_croo_arguments() - init_dirs_args(args) co = Croo( metadata_json=args['metadata_json'], @@ -344,6 +308,7 @@ def main(): public_gcs=args['public_gcs'], gcp_private_key=args['gcp_private_key'], map_path_to_url=args['mapping_path_to_url'], + no_checksum=args['no_checksum'], no_graph=args['no_graph']) co.organize_output() diff --git a/croo/croo_args.py b/croo/croo_args.py index 5db63f7..47b8cff 100755 --- a/croo/croo_args.py +++ b/croo/croo_args.py @@ -7,9 +7,11 @@ """ import argparse +import os import sys -import logging -from autouri import AutoURI, S3URI, GCSURI +from autouri import S3URI, GCSURI +from autouri import logger as autouri_logger + __version__ = '0.3.5' @@ -91,6 +93,11 @@ def parse_croo_arguments(): 'Make sure that you have "gsutil" installed and configured ' 'to have access to credentials for GCS and S3 ' '(e.g. ~/.boto or ~/.aws/credientials)') + p.add_argument( + '--no-checksum', action='store_true', + help='Always overwrite on output directory/bucket (--out-dir) ' + 'even if md5-identical files (or soft links) already exist there. ' + 'Md5 hash/filename/filesize checking will be skipped.') p.add_argument('-v', '--version', action='store_true', help='Show version') @@ -113,20 +120,95 @@ def parse_croo_arguments(): if args.version is not None and args.version: print(__version__) p.exit() - check_args(args) # convert to dict - return vars(args) + d_args = vars(args) + + check_args(d_args) + init_dirs(d_args) + init_autouri(d_args) + + return d_args def check_args(args): - if args.use_presigned_url_gcs and args.gcp_private_key is None: + """Check cmd line arguments are valid + + Args: + args: + dict of cmd line arguments + """ + if args['use_presigned_url_gcs'] and args['gcp_private_key'] is None: raise ValueError( 'Define --gcp-private-key to use presigned URLs on GCS' ' (--use-presigned-url-gcs).') - if args.public_gcs and args.use_presigned_url_gcs: + + if args['public_gcs'] and args['use_presigned_url_gcs']: raise ValueError( 'Public GCS bucket (--public-gcs) cannot be presigned ' '(--use-presigned-url-gcs and --gcp-private-key). ' 'Choose one of them.') + if args['tmp_dir'] is None: + pass + elif args['tmp_dir'].startswith(('http://', 'https://')): + raise ValueError('URL is not allowed for --tmp-dir') + elif args['tmp_dir'].startswith(('gs://', 's3://')): + raise ValueError('Cloud URI is not allowed for --tmp-dir') + + if args['out_dir'] is None: + raise ValueError('--out-dir is not valid.') + elif args['out_dir'].startswith(('http://', 'https://')): + raise ValueError('URL is not allowed for --out-dir') + + +def init_dirs(args): + """More initialization for out/tmp directories since tmp + directory is important for inter-storage transfer using + Autouri + + Args: + args: + dict of cmd line arguments + """ + if args['out_dir'].startswith(('gs://', 's3://')): + if args['tmp_dir'] is None: + args['tmp_dir'] = os.path.join(os.getcwd(), '.croo_tmp') + else: + args['out_dir'] = os.path.abspath(os.path.expanduser(args['out_dir'])) + os.makedirs(args['out_dir'], exist_ok=True) + if args['tmp_dir'] is None: + args['tmp_dir'] = os.path.join(args['out_dir'], '.croo_tmp') + + if args['tmp_dir'] is not None: + args['tmp_dir'] = os.path.abspath(os.path.expanduser(args['tmp_dir'])) + + +def init_autouri(args): + """Initialize Autouri and its logger + + Args: + args: + dict of cmd line arguments + """ + GCSURI.init_gcsuri( + use_gsutil_for_s3=args['use_gsutil_for_s3']) + + # autouri's path to url mapping + if args['tsv_mapping_path_to_url'] is not None: + mapping_path_to_url = {} + f = os.path.expanduser(args['tsv_mapping_path_to_url']) + with open(f, 'r') as fp: + lines = fp.read().strip('\n').split('\n') + for line in lines: + k, v = line.split('\t') + mapping_path_to_url[k] = v + args['mapping_path_to_url'] = mapping_path_to_url + else: + args['mapping_path_to_url'] = None + + # autouri's logger + if args['verbose']: + autouri_logger.setLevel('INFO') + elif args['debug']: + autouri_logger.setLevel('DEBUG') From 1e0accc6a7ab7eb8c2f902665b3284d28a2931cd Mon Sep 17 00:00:00 2001 From: Jin Lee Date: Fri, 27 Mar 2020 08:40:47 -0700 Subject: [PATCH 05/13] arg: param -d -> -D (--debug) --- croo/croo_args.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/croo/croo_args.py b/croo/croo_args.py index 47b8cff..145d61e 100755 --- a/croo/croo_args.py +++ b/croo/croo_args.py @@ -104,7 +104,7 @@ def parse_croo_arguments(): group_log_level = p.add_mutually_exclusive_group() group_log_level.add_argument('-V', '--verbose', action='store_true', help='Prints all logs >= INFO level') - group_log_level.add_argument('-d', '--debug', action='store_true', + group_log_level.add_argument('-D', '--debug', action='store_true', help='Prints all logs >= DEBUG level') if '-v' in sys.argv or '--version' in sys.argv: From 1625793d84068c8cc5c289a426ac37f67f140968 Mon Sep 17 00:00:00 2001 From: Jin Lee Date: Fri, 27 Mar 2020 09:09:39 -0700 Subject: [PATCH 06/13] added logger, removed --no-graph (croo skip task graph if graphviz dot is not installed) --- croo/croo.py | 29 ++++++++++++++++++----------- croo/croo_args.py | 4 ---- croo/croo_html_report_task_graph.py | 19 ++++++++++++++++--- 3 files changed, 34 insertions(+), 18 deletions(-) diff --git a/croo/croo.py b/croo/croo.py index 27b521a..19255d0 100755 --- a/croo/croo.py +++ b/croo/croo.py @@ -8,6 +8,7 @@ import os import sys +import logging import json import re import caper @@ -17,6 +18,10 @@ from .cromwell_metadata import CromwellMetadata +logging.basicConfig(level=logging.INFO, format='%(asctime)s|%(name)s|%(levelname)s| %(message)s') +logger = logging.getLogger('croo') + + class Croo(object): """Cromwell output organizer (croo) @@ -39,8 +44,7 @@ def __init__(self, metadata_json, out_def_json, out_dir, public_gcs=False, gcp_private_key=None, map_path_to_url=None, - no_checksum=False, - no_graph=False): + no_checksum=False): """Initialize croo with output definition JSON Args: soft_link: @@ -58,9 +62,10 @@ def __init__(self, metadata_json, out_def_json, out_dir, self._metadata = json.loads(fp.read()) if isinstance(self._metadata, list): if len(self._metadata) > 1: - print('[Croo] Warning: multiple metadata JSON objects ' - 'found in metadata JSON file. Taking the first ' - 'one...') + logger.warning( + 'Multiple metadata JSON objects ' + 'found in metadata JSON file. Taking the first ' + 'one...') elif len(self._metadata) == 0: raise Exception('metadata JSON file is empty') self._metadata = self._metadata[0] @@ -78,8 +83,6 @@ def __init__(self, metadata_json, out_def_json, out_dir, self._map_path_to_url = map_path_to_url self._no_checksum = no_checksum - self._no_graph = no_graph - if isinstance(out_def_json, dict): self._out_def_json = out_def_json else: @@ -130,7 +133,7 @@ def organize_output(self): full_path = node.output_path shard_idx = node.shard_idx - if node_format is not None and not self._no_graph: + if node_format is not None: interpreted_node_format = Croo.__interpret_inline_exp( node_format, full_path, shard_idx) if subgraph is not None: @@ -224,7 +227,7 @@ def organize_output(self): ucsc_track, full_path, shard_idx) report.add_to_ucsc_track(target_url, interpreted_ucsc_track) - if node_format is not None and not self._no_graph: + if node_format is not None: interpreted_node_format = Croo.__interpret_inline_exp( node_format, full_path, shard_idx) if subgraph is not None: @@ -293,6 +296,11 @@ def __interpret_inline_exp(s, full_path, shard_idx): def main(): args = parse_croo_arguments() + if args['verbose']: + logger.setLevel('INFO') + elif args['debug']: + logger.setLevel('DEBUG') + co = Croo( metadata_json=args['metadata_json'], out_def_json=args['out_def_json'], @@ -308,8 +316,7 @@ def main(): public_gcs=args['public_gcs'], gcp_private_key=args['gcp_private_key'], map_path_to_url=args['mapping_path_to_url'], - no_checksum=args['no_checksum'], - no_graph=args['no_graph']) + no_checksum=args['no_checksum']) co.organize_output() diff --git a/croo/croo_args.py b/croo/croo_args.py index 145d61e..17327fa 100755 --- a/croo/croo_args.py +++ b/croo/croo_args.py @@ -13,7 +13,6 @@ from autouri import logger as autouri_logger - __version__ = '0.3.5' def parse_croo_arguments(): @@ -37,9 +36,6 @@ def parse_croo_arguments(): 'Original output files will be kept in Cromwell\'s output ' 'directory. ' '"copy" makes copies of Cromwell\'s original outputs') - p.add_argument( - '--no-graph', action='store_true', - help='No task graph.') p.add_argument( '--ucsc-genome-db', help='UCSC genome browser\'s "db=" parameter. ' diff --git a/croo/croo_html_report_task_graph.py b/croo/croo_html_report_task_graph.py index a83ffa9..5172b56 100755 --- a/croo/croo_html_report_task_graph.py +++ b/croo/croo_html_report_task_graph.py @@ -8,7 +8,10 @@ import os from copy import deepcopy from base64 import b64encode +from graphviz import Source +from graphviz.backend import ExecutableNotFound from autouri import AutoURI +from .croo import logger class CrooHtmlReportTaskGraph(object): @@ -62,7 +65,6 @@ def __make_svg(self): """ if not self._items: return None - from graphviz import Source # define call back functions for node format, href, subgraph def fnc_node_format(n): @@ -90,8 +92,19 @@ def fnc_subgraph(n): template=self._template_d) # temporary dot, svg from graphviz.Source.render tmp_dot = '_tmp_.dot' - svg = Source(dot_str, format='svg').render( - filename=tmp_dot) + + try: + svg = Source(dot_str, format='svg').render( + filename=tmp_dot) + except (ExecutableNotFound, FileNotFoundError): + logger.info( + 'Importing graphviz failed. Task graph will not be available. ' + 'Check if you have installed graphviz correctly so that ' + '"dot" executable exists on your PATH. ' + '"pip install graphviz" does not install such "dot". ' + 'Use apt or system-level installer instead. ' + 'e.g. sudo apt-get install graphviz.') + return None # save to DOT uri_dot = os.path.join( From 008f48bad9846ead9521a06cc5cafa16359ca6ba Mon Sep 17 00:00:00 2001 From: Jin Lee Date: Fri, 27 Mar 2020 09:20:57 -0700 Subject: [PATCH 07/13] fix: task graph label order --- croo/croo_html_report_task_graph.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/croo/croo_html_report_task_graph.py b/croo/croo_html_report_task_graph.py index 5172b56..f19855c 100755 --- a/croo/croo_html_report_task_graph.py +++ b/croo/croo_html_report_task_graph.py @@ -53,9 +53,9 @@ def get_html_body_str(self): if svg_contents is None: return '' else: - head = '
Task graph\n' - tail = '

' + head = 'Task graph
\n' img = svg_contents + tail = '

' return head + img + tail def __make_svg(self): From 826ac71fca54920447f9b2b747327b08d4842d97 Mon Sep 17 00:00:00 2001 From: Jin Lee Date: Fri, 27 Mar 2020 14:15:39 -0700 Subject: [PATCH 08/13] fix: UCSC browser no longer takes long URL, make an alternative link --- croo/croo.py | 7 +++ croo/croo_html_report.py | 19 +++++++ croo/croo_html_report_tracks.py | 88 +++++++++++++++++++++++---------- 3 files changed, 89 insertions(+), 25 deletions(-) diff --git a/croo/croo.py b/croo/croo.py index 19255d0..cfd75b1 100755 --- a/croo/croo.py +++ b/croo/croo.py @@ -117,6 +117,13 @@ def organize_output(self): workflow_id=self._cm.get_workflow_id(), dag=self._task_graph, task_graph_template=self._task_graph_template, + public_gcs=self._public_gcs, + gcp_private_key=self._gcp_private_key, + use_presigned_url_gcs=self._use_presigned_url_gcs, + use_presigned_url_s3=self._use_presigned_url_s3, + duration_presigned_url_s3 = self._duration_presigned_url_s3, + duration_presigned_url_gcs = self._duration_presigned_url_gcs, + map_path_to_url=self._map_path_to_url, ucsc_genome_db=self._ucsc_genome_db, ucsc_genome_pos=self._ucsc_genome_pos) diff --git a/croo/croo_html_report.py b/croo/croo_html_report.py index f75403a..66dd781 100755 --- a/croo/croo_html_report.py +++ b/croo/croo_html_report.py @@ -31,13 +31,32 @@ class CrooHtmlReport(object): def __init__(self, out_dir, workflow_id, dag, task_graph_template=None, + public_gcs=None, + gcp_private_key=None, + use_presigned_url_gcs=False, + use_presigned_url_s3=False, + duration_presigned_url_s3=None, + duration_presigned_url_gcs=None, + map_path_to_url=None, ucsc_genome_db=None, ucsc_genome_pos=None): self._out_dir = out_dir self._workflow_id = workflow_id + self._public_gcs = public_gcs + self._gcp_private_key = gcp_private_key + self._use_presigned_url_gcs = use_presigned_url_gcs + self._use_presigned_url_s3 = use_presigned_url_s3 + self._duration_presigned_url_s3 = duration_presigned_url_s3 + self._duration_presigned_url_gcs = duration_presigned_url_gcs + self._map_path_to_url = map_path_to_url self._ucsc_tracks = CrooHtmlReportUCSCTracks( out_dir=out_dir, workflow_id=workflow_id, + public_gcs=public_gcs, + gcp_private_key=gcp_private_key, + use_presigned_url_gcs=use_presigned_url_gcs, + use_presigned_url_s3=use_presigned_url_s3, + map_path_to_url=map_path_to_url, ucsc_genome_db=ucsc_genome_db, ucsc_genome_pos=ucsc_genome_pos) self._file_table = CrooHtmlReportFileTable( diff --git a/croo/croo_html_report_tracks.py b/croo/croo_html_report_tracks.py index 604e79c..e7cc80e 100755 --- a/croo/croo_html_report_tracks.py +++ b/croo/croo_html_report_tracks.py @@ -7,7 +7,7 @@ import os import urllib.parse -from autouri import AutoURI +from autouri import AutoURI, GCSURI, S3URI, AbsPath class CrooHtmlReportUCSCTracks(object): @@ -18,7 +18,7 @@ class CrooHtmlReportUCSCTracks(object): UCSC_BROWSER_QUERY_URL = 'http://genome.ucsc.edu/cgi-bin/hgTracks?db={db}&ignoreCookie=1{extra_param}&hgct_customText={encoded}' UCSC_BROWSER_TEXT_FORMAT = '{track_line} bigDataUrl="{url}"\n' HTML_TRACK_HUB_LINK = """ -UCSC browser tracks +{title}

""" HTML_TRACK_HUB_TEXT = """ @@ -32,10 +32,24 @@ class CrooHtmlReportUCSCTracks(object):
""" def __init__(self, out_dir, workflow_id, + public_gcs=None, + gcp_private_key=None, + use_presigned_url_gcs=False, + use_presigned_url_s3=False, + duration_presigned_url_s3=None, + duration_presigned_url_gcs=None, + map_path_to_url=None, ucsc_genome_db=None, ucsc_genome_pos=None): self._out_dir = out_dir self._workflow_id = workflow_id + self._public_gcs = public_gcs + self._gcp_private_key = gcp_private_key + self._use_presigned_url_gcs = use_presigned_url_gcs + self._use_presigned_url_s3 = use_presigned_url_s3 + self._duration_presigned_url_s3 = duration_presigned_url_s3 + self._duration_presigned_url_gcs = duration_presigned_url_gcs + self._map_path_to_url = map_path_to_url self._ucsc_genome_db = ucsc_genome_db self._ucsc_genome_pos = ucsc_genome_pos self._items = [] @@ -60,36 +74,60 @@ def get_html_body_str(self): else: extra_param = '' - encoded = urllib.parse.quote(txt) + # save to TXT + uri_txt = os.path.join( + self._out_dir, + CrooHtmlReportUCSCTracks.UCSC_TRACKS_TXT.format( + workflow_id=self._workflow_id)) + + # localize TXT + # long URL doesn't work + u = AutoURI(uri_txt) + u.write(txt) + + url_trackhub_txt_file = None + if isinstance(u, GCSURI): + if self._public_gcs: + url_trackhub_txt_file = u.get_public_url() + + elif self._use_presigned_url_gcs: + url_trackhub_txt_file = u.get_presigned_url( + duration=self._duration_presigned_url_gcs, + private_key_file=self._gcp_private_key) + + elif isinstance(u, S3URI): + if self._use_presigned_url_s3: + url_trackhub_txt_file = u.get_presigned_url( + duration=self._duration_presigned_url_s3) + + elif isinstance(u, AbsPath): + if self._map_path_to_url: + url_trackhub_txt_file = u.get_mapped_url( + map_path_to_url=self._map_path_to_url) + html = '' + url = CrooHtmlReportUCSCTracks.UCSC_BROWSER_QUERY_URL.format( db=self._ucsc_genome_db, extra_param=extra_param, - encoded=encoded) - - html = '' + encoded=urllib.parse.quote(txt)) html += CrooHtmlReportUCSCTracks.HTML_TRACK_HUB_LINK.format( + title='UCSC browser tracks', url=url) - html += CrooHtmlReportUCSCTracks.HTML_TRACK_HUB_TEXT.format( - title='UCSC track hub plain text', - txt=txt) - html += CrooHtmlReportUCSCTracks.HTML_TRACK_HUB_TEXT.format( - title='UCSC track hub encoded URL ' - '(Use this for browser\'s parameter &hgct_customText=)', - txt=encoded) - # save to TXT - uri_txt = os.path.join( - self._out_dir, - CrooHtmlReportUCSCTracks.UCSC_TRACKS_TXT.format( - workflow_id=self._workflow_id)) - AutoURI(uri_txt).write(txt) + if url_trackhub_txt_file is not None: + url = CrooHtmlReportUCSCTracks.UCSC_BROWSER_QUERY_URL.format( + db=self._ucsc_genome_db, + extra_param=extra_param, + encoded=urllib.parse.quote(url_trackhub_txt_file)) - # save to URL - uri_url = os.path.join( - self._out_dir, - CrooHtmlReportUCSCTracks.UCSC_TRACKS_URL.format( - workflow_id=self._workflow_id)) - AutoURI(uri_url).write(url) + html += CrooHtmlReportUCSCTracks.HTML_TRACK_HUB_LINK.format( + title='UCSC browser tracks (if the above link does not work)', + url=url) + + html += CrooHtmlReportUCSCTracks.HTML_TRACK_HUB_TEXT.format( + title='UCSC track hub plain text. Paste it directly to custom track edit box ' + 'on UCSC genome browser.', + txt=txt) return html From 123c77c7c4c9b6496bcea575ec320c0632c06546 Mon Sep 17 00:00:00 2001 From: Jin Lee Date: Fri, 27 Mar 2020 15:53:08 -0700 Subject: [PATCH 09/13] Update logger name to __name__ Co-Authored-By: Paul Sud <41386393+p-sud@users.noreply.github.com> --- croo/croo.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/croo/croo.py b/croo/croo.py index cfd75b1..551d103 100755 --- a/croo/croo.py +++ b/croo/croo.py @@ -19,7 +19,7 @@ logging.basicConfig(level=logging.INFO, format='%(asctime)s|%(name)s|%(levelname)s| %(message)s') -logger = logging.getLogger('croo') +logger = logging.getLogger(__name__) class Croo(object): From 6685ba283c7a9fcf9c1d52b33946ef9418ba2d84 Mon Sep 17 00:00:00 2001 From: Jin Lee Date: Fri, 27 Mar 2020 15:55:39 -0700 Subject: [PATCH 10/13] Use python module csv to parse TSV files Co-Authored-By: Paul Sud <41386393+p-sud@users.noreply.github.com> --- croo/croo_args.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/croo/croo_args.py b/croo/croo_args.py index 17327fa..67a05c2 100755 --- a/croo/croo_args.py +++ b/croo/croo_args.py @@ -194,11 +194,12 @@ def init_autouri(args): if args['tsv_mapping_path_to_url'] is not None: mapping_path_to_url = {} f = os.path.expanduser(args['tsv_mapping_path_to_url']) - with open(f, 'r') as fp: - lines = fp.read().strip('\n').split('\n') - for line in lines: - k, v = line.split('\t') - mapping_path_to_url[k] = v +import csv +... + with open(f, newline="") as fp: + reader = csv.reader(fp, delimiter="\t") + for line in reader: + mapping_path_to_url[line[0]] = line[1] args['mapping_path_to_url'] = mapping_path_to_url else: args['mapping_path_to_url'] = None From 6213f5293d97a95fc659710de20334c4aefc5335 Mon Sep 17 00:00:00 2001 From: Jin Lee Date: Fri, 27 Mar 2020 15:59:24 -0700 Subject: [PATCH 11/13] fix: parsing TSV using csv --- croo/croo_args.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/croo/croo_args.py b/croo/croo_args.py index 67a05c2..9f9b7d5 100755 --- a/croo/croo_args.py +++ b/croo/croo_args.py @@ -7,6 +7,7 @@ """ import argparse +import csv import os import sys from autouri import S3URI, GCSURI @@ -194,10 +195,8 @@ def init_autouri(args): if args['tsv_mapping_path_to_url'] is not None: mapping_path_to_url = {} f = os.path.expanduser(args['tsv_mapping_path_to_url']) -import csv -... - with open(f, newline="") as fp: - reader = csv.reader(fp, delimiter="\t") + with open(f, newline='') as fp: + reader = csv.reader(fp, delimiter='\t') for line in reader: mapping_path_to_url[line[0]] = line[1] args['mapping_path_to_url'] = mapping_path_to_url From 54a4a015ed1352fafc47c95f0e28e7bbb9b50713 Mon Sep 17 00:00:00 2001 From: Jin Lee Date: Fri, 27 Mar 2020 16:00:23 -0700 Subject: [PATCH 12/13] remove unnessary error checking for --out-dir --- croo/croo_args.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/croo/croo_args.py b/croo/croo_args.py index 9f9b7d5..c645a80 100755 --- a/croo/croo_args.py +++ b/croo/croo_args.py @@ -153,9 +153,7 @@ def check_args(args): elif args['tmp_dir'].startswith(('gs://', 's3://')): raise ValueError('Cloud URI is not allowed for --tmp-dir') - if args['out_dir'] is None: - raise ValueError('--out-dir is not valid.') - elif args['out_dir'].startswith(('http://', 'https://')): + if args['out_dir'].startswith(('http://', 'https://')): raise ValueError('URL is not allowed for --out-dir') From a15e054bfaceb734259961d3d5c4427e8c3f0db8 Mon Sep 17 00:00:00 2001 From: Jin Lee Date: Fri, 27 Mar 2020 16:06:55 -0700 Subject: [PATCH 13/13] ver: 0.3.5 -> 0.4.0 --- croo/croo_args.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/croo/croo_args.py b/croo/croo_args.py index c645a80..fd56f23 100755 --- a/croo/croo_args.py +++ b/croo/croo_args.py @@ -14,7 +14,7 @@ from autouri import logger as autouri_logger -__version__ = '0.3.5' +__version__ = '0.4.0' def parse_croo_arguments(): """Argument parser for Cromwell Output Organizer (COO)