From 23bb4f0c58105adfa4a9706dffa9440a26b71518 Mon Sep 17 00:00:00 2001 From: Tural Neymanov Date: Tue, 5 Mar 2019 16:12:57 -0500 Subject: [PATCH 1/7] Implement file-of-patterns functionality.git --- .../options/variant_transform_options.py | 115 +++++++++++++++--- .../options/variant_transform_options_test.py | 96 ++++++++++++++- gcp_variant_transforms/pipeline_common.py | 65 ++++++++-- .../pipeline_common_test.py | 110 ++++++++++++++--- .../testing/data/input_files/empty | 0 .../testing/data/input_files/sample | 3 + .../testing/data/input_files/wrong | 3 + .../integration/run_vcf_to_bq_tests.py | 9 +- .../combine_from_multiple_inputs.json | 24 ++++ .../merge_option_move_to_call_from_file.json | 31 +++++ ...inum_NA12878_hg38_10K_lines_from_file.json | 48 ++++++++ gcp_variant_transforms/vcf_to_bq.py | 39 ++++-- .../vcf_to_bq_preprocess.py | 18 ++- 13 files changed, 498 insertions(+), 63 deletions(-) create mode 100644 gcp_variant_transforms/testing/data/input_files/empty create mode 100644 gcp_variant_transforms/testing/data/input_files/sample create mode 100644 gcp_variant_transforms/testing/data/input_files/wrong create mode 100644 gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/small_tests/combine_from_multiple_inputs.json create mode 100644 gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/small_tests/merge_option_move_to_call_from_file.json create mode 100644 gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/small_tests/platinum_NA12878_hg38_10K_lines_from_file.json diff --git a/gcp_variant_transforms/options/variant_transform_options.py b/gcp_variant_transforms/options/variant_transform_options.py index fd17b8d92..4a0d7ed80 100644 --- a/gcp_variant_transforms/options/variant_transform_options.py +++ b/gcp_variant_transforms/options/variant_transform_options.py @@ -53,9 +53,16 @@ class VcfReadOptions(VariantTransformsOptions): def add_arguments(self, parser): """Adds all options of this transform to parser.""" - parser.add_argument('--input_pattern', - required=True, - help='Input pattern for VCF files to process.') + parser.add_argument( + '--input_pattern', + help=('Input pattern for VCF files to process. Either' + 'this or --input_file flag has to be provided, exclusively.')) + parser.add_argument( + '--input_file', + help=('File that contains the list of VCF file names to input. Either ' + 'this or --input_pattern flag has to be provided, exlusively.' + 'Note that using input_file than input_pattern is slower for ' + 'inputs that contain less than 50k files.')) parser.add_argument( '--allow_malformed_records', type='bool', default=False, nargs='?', const=True, @@ -113,16 +120,45 @@ def validate(self, parsed_args): raise ValueError('Both --infer_headers and --representative_header_file ' 'are passed! Please double check and choose at most one ' 'of them.') - try: - # Gets at most one pattern match result of type `filesystems.MatchResult`. - first_match = filesystems.FileSystems.match( - [parsed_args.input_pattern], [1])[0] - if not first_match.metadata_list: - raise ValueError('Input pattern {} did not match any files.'.format( + if ((parsed_args.input_pattern and parsed_args.input_file) or + (not parsed_args.input_pattern and not parsed_args.input_file)): + raise ValueError('One and only one of input_pattern and input_file has ' + 'to be provided.') + if parsed_args.input_pattern is not None: + try: + # Gets at most 1 pattern match result of type `filesystems.MatchResult`. + first_match = filesystems.FileSystems.match( + [parsed_args.input_pattern], [1])[0] + if not first_match.metadata_list: + raise ValueError('Input pattern {} did not match any files.'.format( + parsed_args.input_pattern)) + except filesystem.BeamIOError: + raise ValueError('Invalid or inaccessible input pattern {}.'.format( parsed_args.input_pattern)) - except filesystem.BeamIOError: - raise ValueError('Invalid or inaccessible input pattern {}.'.format( - parsed_args.input_pattern)) + else: + if not filesystems.FileSystems.exists(parsed_args.input_file): + raise ValueError('Input file {} doesn''t exist'.format( + parsed_args.input_file)) + all_patterns = [] + with filesystems.FileSystems.open(parsed_args.input_file) as f: + for _, l in enumerate(f): + all_patterns.append(l.strip()) + if not all_patterns: + raise ValueError('Input file {} is empty.'.format( + parsed_args.input_file)) + try: + # Gets at most 1 pattern match result of type `filesystems.MatchResult`. + matches = filesystems.FileSystems.match( + all_patterns, [1] * len(all_patterns)) + for match in matches: + if not match.metadata_list: + raise ValueError( + 'Input pattern {} from {} did not match any files.'.format( + match.pattern, parsed_args.input_file)) + except filesystem.BeamIOError: + raise ValueError( + 'Some patterns in {} are invalid or inaccessible.'.format( + parsed_args.input_file)) class AvroWriteOptions(VariantTransformsOptions): @@ -477,9 +513,16 @@ class PreprocessOptions(VariantTransformsOptions): def add_arguments(self, parser): # type: (argparse.ArgumentParser) -> None - parser.add_argument('--input_pattern', - required=True, - help='Input pattern for VCF files to process.') + parser.add_argument( + '--input_pattern', + help='Input pattern for VCF files to process. Either' + 'this or --input_file flag has to be provided, exclusively.') + parser.add_argument( + '--input_file', + help=('File that contains the list of VCF file names to input. Either ' + 'this or --input_pattern flag has to be provided, exlusively. ' + 'Note that using input_file than input_pattern is slower for ' + 'inputs that contain less than 50k files.')) parser.add_argument( '--report_all_conflicts', type='bool', default=False, nargs='?', const=True, @@ -501,6 +544,48 @@ def add_arguments(self, parser): 'generated if unspecified. Otherwise, please provide a local ' 'path if run locally, or a cloud path if run on Dataflow.')) + def validate(self, parsed_args): + if ((parsed_args.input_pattern and parsed_args.input_file) or + (not parsed_args.input_pattern and not parsed_args.input_file)): + raise ValueError('One and only one of input_pattern and input_file has ' + 'to be provided.') + if parsed_args.input_pattern is not None: + try: + # Gets at most 1 pattern match result of type `filesystems.MatchResult`. + first_match = filesystems.FileSystems.match( + [parsed_args.input_pattern], [1])[0] + if not first_match.metadata_list: + raise ValueError('Input pattern {} did not match any files.'.format( + parsed_args.input_pattern)) + except filesystem.BeamIOError: + raise ValueError('Invalid or inaccessible input pattern {}.'.format( + parsed_args.input_pattern)) + else: + if not filesystems.FileSystems.exists(parsed_args.input_file): + raise ValueError('Input file {} doesn''t exist'.format( + parsed_args.input_file)) + all_patterns = [] + with filesystems.FileSystems.open(parsed_args.input_file) as f: + for _, l in enumerate(f): + all_patterns.append(l.strip()) + if not all_patterns: + raise ValueError('Input file {} is empty.'.format( + parsed_args.input_file)) + try: + # Gets at most 1 pattern match result of type `filesystems.MatchResult`. + matches = filesystems.FileSystems.match( + all_patterns, [1] * len(all_patterns)) + for match in matches: + if not match.metadata_list: + raise ValueError( + 'Input pattern {} from {} did not match any files.'.format( + match.pattern, parsed_args.input_file)) + except filesystem.BeamIOError: + raise ValueError( + 'Some patterns in {} are invalid or inaccessible.'.format( + parsed_args.input_file)) + + class PartitionOptions(VariantTransformsOptions): """Options for partitioning Variant records.""" diff --git a/gcp_variant_transforms/options/variant_transform_options_test.py b/gcp_variant_transforms/options/variant_transform_options_test.py index c6df4977b..6fa9a027f 100644 --- a/gcp_variant_transforms/options/variant_transform_options_test.py +++ b/gcp_variant_transforms/options/variant_transform_options_test.py @@ -47,21 +47,55 @@ def _make_args(self, args): # type: (List[str]) -> argparse.Namespace return make_args(self._options, args) - def test_failure_for_conflicting_flags(self): + def test_no_inputs(self): + args = self._make_args([]) + self.assertRaises(ValueError, self._options.validate, args) + + def test_failure_for_conflicting_flags_inputs(self): + args = self._make_args(['--input_pattern', '*', + '--input_file', 'asd']) + self.assertRaises(ValueError, self._options.validate, args) + + def test_failure_for_conflicting_flags_headers(self): args = self._make_args(['--input_pattern', '*', '--infer_headers', '--representative_header_file', 'gs://some_file']) self.assertRaises(ValueError, self._options.validate, args) - def test_failure_for_conflicting_flags_no_errors(self): + def test_failure_for_conflicting_flags_no_errors_with_pattern_input(self): args = self._make_args(['--input_pattern', '*', '--representative_header_file', 'gs://some_file']) self._options.validate(args) + def test_failure_for_conflicting_flags_no_errors_with_file_input(self): + args = self._make_args([ + '--input_file', + 'gcp_variant_transforms/testing/data/input_files/sample', + '--representative_header_file', 'gs://some_file']) + self._options.validate(args) + + def test_failure_for_empty_input_file(self): + args = self._make_args([ + '--input_file', + 'gcp_variant_transforms/testing/data/input_files/empty', + '--representative_header_file', 'gs://some_file']) + self.assertRaises(ValueError, self._options.validate, args) + + def test_failure_for_wrong_pattern_in_input_file(self): + args = self._make_args([ + '--input_file', + 'gcp_variant_transforms/testing/data/input_files/wrong', + '--representative_header_file', 'gs://some_file']) + self.assertRaises(ValueError, self._options.validate, args) + def test_failure_for_invalid_input_pattern(self): args = self._make_args(['--input_pattern', 'nonexistent_file.vcf']) self.assertRaises(ValueError, self._options.validate, args) + def test_failure_for_invalid_input_file(self): + args = self._make_args(['--input_file', 'nonexistent_file.vcf']) + self.assertRaises(ValueError, self._options.validate, args) + class BigQueryWriteOptionsTest(unittest.TestCase): """Tests cases for the BigQueryWriteOptions class.""" @@ -151,3 +185,61 @@ def test_failure_for_invalid_vep_cache(self): '--vep_image_uri', 'AN_IMAGE', '--vep_cache_path', 'VEP_CACHE']) self.assertRaises(ValueError, self._options.validate, args) + + +class PreprocessOptionsTest(unittest.TestCase): + """Tests cases for the PreprocessOptions class.""" + + def setUp(self): + self._options = variant_transform_options.PreprocessOptions() + + def _make_args(self, args): + # type: (List[str]) -> argparse.Namespace + return make_args(self._options, args) + + def test_failure_for_conflicting_flags_inputs(self): + args = self._make_args(['--input_pattern', '*', + '--report_path', 'some_path', + '--input_file', 'asd']) + self.assertRaises(ValueError, self._options.validate, args) + + def test_failure_for_conflicting_flags_no_errors(self): + args = self._make_args(['--input_pattern', '*', + '--report_path', 'some_path']) + self._options.validate(args) + + def test_failure_for_invalid_input_pattern(self): + args = self._make_args(['--input_pattern', 'nonexistent_file.vcf', + '--report_path', 'some_path']) + self.assertRaises(ValueError, self._options.validate, args) + + def test_failure_for_invalid_input_file(self): + args = self._make_args(['--input_file', 'nonexistent_file.vcf', + '--report_path', 'some_path']) + self.assertRaises(ValueError, self._options.validate, args) + + def test_failure_for_conflicting_flags_no_errors_with_pattern_input(self): + args = self._make_args(['--input_pattern', '*', + '--report_path', 'some_path']) + self._options.validate(args) + + def test_failure_for_conflicting_flags_no_errors_with_file_input(self): + args = self._make_args([ + '--input_file', + 'gcp_variant_transforms/testing/data/input_files/sample', + '--report_path', 'some_path']) + self._options.validate(args) + + def test_failure_for_empty_input_file(self): + args = self._make_args([ + '--input_file', + 'gcp_variant_transforms/testing/data/input_files/empty', + '--report_path', 'some_path']) + self.assertRaises(ValueError, self._options.validate, args) + + def test_failure_for_wrong_pattern_in_input_file(self): + args = self._make_args([ + '--input_file', + 'gcp_variant_transforms/testing/data/input_files/wrong', + '--report_path', 'some_path']) + self.assertRaises(ValueError, self._options.validate, args) diff --git a/gcp_variant_transforms/pipeline_common.py b/gcp_variant_transforms/pipeline_common.py index 4a71abafd..ca49c3f48 100644 --- a/gcp_variant_transforms/pipeline_common.py +++ b/gcp_variant_transforms/pipeline_common.py @@ -69,33 +69,80 @@ def parse_args(argv, command_line_options): return known_args, pipeline_args -def get_pipeline_mode(input_pattern, optimize_for_large_inputs=False): +def get_pipeline_mode( + input_pattern, + input_file, + optimize_for_large_inputs=False): # type: (str, bool) -> int """Returns the mode the pipeline should operate in based on input size.""" if optimize_for_large_inputs: return PipelineModes.LARGE - match_results = filesystems.FileSystems.match([input_pattern]) - if not match_results: - raise ValueError('No files matched input_pattern: {}'.format(input_pattern)) + if input_pattern is not None: + total_files = _get_pipeline_mode_for_pattern_input(input_pattern) + else: + total_files = _get_pipeline_mode_for_file_input(input_file) - total_files = len(match_results[0].metadata_list) if total_files > _LARGE_DATA_THRESHOLD: return PipelineModes.LARGE elif total_files > _SMALL_DATA_THRESHOLD: return PipelineModes.MEDIUM return PipelineModes.SMALL - -def read_headers(pipeline, pipeline_mode, input_pattern): +def _get_pipeline_mode_for_pattern_input(input_pattern): + match_results = filesystems.FileSystems.match([input_pattern]) + if not match_results: + raise ValueError('No files matched input_pattern: {}'.format(input_pattern)) + return len(match_results[0].metadata_list) + +def _get_pipeline_mode_for_file_input(input_file): + if not filesystems.FileSystems.exists(input_file): + raise ValueError('Input file {} doesn''t exist'.format(input_file)) + all_patterns = [] + with filesystems.FileSystems.open(input_file) as f: + for _, l in enumerate(f): + all_patterns.append(l.strip()) + if not all_patterns: + raise ValueError('Input file {} is empty.'.format(input_file)) + match_results = filesystems.FileSystems.match(all_patterns) + if not match_results: + raise ValueError( + 'No files matched any input pattern in {} file'.format(input_file)) + total_number = 0 + for match in match_results: + if not match.metadata_list: + raise ValueError( + 'Input pattern {} from {} did not match any files.'.format( + match.pattern, input_file)) + total_number += len(match.metadata_list) + return total_number + +def get_file_names(input_file): + #if (input_pattern): + # return [input_pattern] + result = [] + if not filesystems.FileSystems.exists(input_file): + raise ValueError('Input file {} doesn''t exist'.format(input_file)) + with filesystems.FileSystems.open(input_file) as f: + for _, l in enumerate(f): + result.append(l) + if not result: + raise ValueError('Input file {} is empty.'.format(input_file)) + return result + + +def read_headers(pipeline, pipeline_mode, input_pattern, input_file): # type: (beam.Pipeline, int, str) -> pvalue.PCollection """Creates an initial PCollection by reading the VCF file headers.""" - if pipeline_mode == PipelineModes.LARGE: + if pipeline_mode == PipelineModes.LARGE or input_file: headers = (pipeline - | beam.Create([input_pattern]) + | beam.Create( + [input_pattern] if input_pattern is not None + else get_file_names(input_file)) | vcf_header_io.ReadAllVcfHeaders()) else: headers = pipeline | vcf_header_io.ReadVcfHeaders(input_pattern) + return headers diff --git a/gcp_variant_transforms/pipeline_common_test.py b/gcp_variant_transforms/pipeline_common_test.py index d051ef4c4..c56f995e1 100644 --- a/gcp_variant_transforms/pipeline_common_test.py +++ b/gcp_variant_transforms/pipeline_common_test.py @@ -24,33 +24,34 @@ from gcp_variant_transforms.pipeline_common import PipelineModes -class VcfToBqCommonTest(unittest.TestCase): - """Tests cases for the `pipeline_common` script.""" +class PipelineCommonWithPatternTest(unittest.TestCase): + """Tests cases for the `pipeline_common` script with pattern input.""" def _create_mock_args(self, **args): return collections.namedtuple('MockArgs', args.keys())(*args.values()) def _get_pipeline_mode(self, args): return pipeline_common.get_pipeline_mode(args.input_pattern, + args.input_file, args.optimize_for_large_inputs) def test_get_mode_raises_error_for_no_match(self): args = self._create_mock_args( - input_pattern='', optimize_for_large_inputs=False) + input_pattern='', input_file=None, optimize_for_large_inputs=False) with mock.patch.object(FileSystems, 'match', return_value=None), \ - self.assertRaises(ValueError): + self.assertRaises(ValueError): self._get_pipeline_mode(args) def test_get_mode_optimize_set(self): args = self._create_mock_args( - input_pattern='', optimize_for_large_inputs=True) + input_pattern='', input_file=None, optimize_for_large_inputs=True) self.assertEqual(self._get_pipeline_mode(args), PipelineModes.LARGE) def test_get_mode_small(self): args = self._create_mock_args( - input_pattern='', optimize_for_large_inputs=False) + input_pattern='', input_file=None, optimize_for_large_inputs=False) match_result = collections.namedtuple('MatchResult', ['metadata_list']) match = match_result([None for _ in range(100)]) @@ -59,7 +60,7 @@ def test_get_mode_small(self): def test_get_mode_medium(self): args = self._create_mock_args( - input_pattern='', optimize_for_large_inputs=False) + input_pattern='', input_file=None, optimize_for_large_inputs=False) match_result = collections.namedtuple('MatchResult', ['metadata_list']) match = match_result(range(101)) @@ -72,22 +73,13 @@ def test_get_mode_medium(self): def test_get_mode_large(self): args = self._create_mock_args( - input_pattern='', optimize_for_large_inputs=False) + input_pattern='', input_file=None, optimize_for_large_inputs=False) match_result = collections.namedtuple('MatchResult', ['metadata_list']) match = match_result(range(50001)) with mock.patch.object(FileSystems, 'match', return_value=[match]): self.assertEqual(self._get_pipeline_mode(args), PipelineModes.LARGE) - def test_default_optimize_for_large_inputs(self): - args = self._create_mock_args(input_pattern='') - match_result = collections.namedtuple('MatchResult', ['metadata_list']) - - match = match_result(range(101)) - with mock.patch.object(FileSystems, 'match', return_value=[match]): - self.assertEqual(pipeline_common.get_pipeline_mode(args.input_pattern), - PipelineModes.MEDIUM) - def test_fail_on_invalid_flags(self): # Start with valid flags, without setup.py. pipeline_args = ['--project', @@ -109,3 +101,87 @@ def test_fail_on_invalid_flags(self): pipeline_args.extend(['--unknown_flag', 'somevalue']) with self.assertRaisesRegexp(ValueError, 'Unrecognized.*unknown_flag'): pipeline_common._raise_error_on_invalid_flags(pipeline_args) + +class PipelineCommonWithFileTest(unittest.TestCase): + """Tests cases for the `pipeline_common` script with file input.""" + + def _create_mock_args(self, **args): + return collections.namedtuple('MockArgs', args.keys())(*args.values()) + + def _get_pipeline_mode(self, args): + return pipeline_common.get_pipeline_mode(args.input_pattern, + args.input_file, + args.optimize_for_large_inputs) + + def test_get_mode_raises_error_for_no_absent_file(self): + args = self._create_mock_args( + input_pattern=None, + input_file='nonexistent_file', + optimize_for_large_inputs=False) + + self.assertRaises(ValueError, self._get_pipeline_mode, args) + + def test_get_mode_raises_error_for_empty_file(self): + args = self._create_mock_args( + input_pattern=None, + input_file='gcp_variant_transforms/testing/data/input_files/empty', + optimize_for_large_inputs=False) + + self.assertRaises(ValueError, self._get_pipeline_mode, args) + + def test_get_mode_optimize_set(self): + args = self._create_mock_args( + input_pattern=None, + input_file='gcp_variant_transforms/testing/data/input_files/sample', + optimize_for_large_inputs=True) + + self.assertEqual(self._get_pipeline_mode(args), PipelineModes.LARGE) + + def test_get_mode_small(self): + args = self._create_mock_args( + input_pattern=None, + input_file='gcp_variant_transforms/testing/data/input_files/sample', + optimize_for_large_inputs=False) + match_result = collections.namedtuple('MatchResult', ['metadata_list']) + + match = match_result([None for _ in range(100)]) + with mock.patch.object(FileSystems, 'match', return_value=[match]): + self.assertEqual(self._get_pipeline_mode(args), PipelineModes.SMALL) + + def test_get_mode_medium(self): + args = self._create_mock_args( + input_pattern=None, + input_file='gcp_variant_transforms/testing/data/input_files/sample', + optimize_for_large_inputs=False) + match_result = collections.namedtuple('MatchResult', ['metadata_list']) + + match = match_result(range(101)) + with mock.patch.object(FileSystems, 'match', return_value=[match]): + self.assertEqual(self._get_pipeline_mode(args), PipelineModes.MEDIUM) + + matches = [match_result(range(60)), + match_result(range(40)), + match_result(range(1))] + with mock.patch.object(FileSystems, 'match', return_value=matches): + self.assertEqual(self._get_pipeline_mode(args), PipelineModes.MEDIUM) + + match = match_result(range(50000)) + with mock.patch.object(FileSystems, 'match', return_value=[match]): + self.assertEqual(self._get_pipeline_mode(args), PipelineModes.MEDIUM) + + def test_get_mode_large(self): + args = self._create_mock_args( + input_pattern=None, + input_file='gcp_variant_transforms/testing/data/input_files/sample', + optimize_for_large_inputs=False) + match_result = collections.namedtuple('MatchResult', ['metadata_list']) + + match = match_result(range(50001)) + with mock.patch.object(FileSystems, 'match', return_value=[match]): + self.assertEqual(self._get_pipeline_mode(args), PipelineModes.LARGE) + + matches = [match_result(range(25000)), + match_result(range(25000)), + match_result(range(1))] + with mock.patch.object(FileSystems, 'match', return_value=matches): + self.assertEqual(self._get_pipeline_mode(args), PipelineModes.LARGE) diff --git a/gcp_variant_transforms/testing/data/input_files/empty b/gcp_variant_transforms/testing/data/input_files/empty new file mode 100644 index 000000000..e69de29bb diff --git a/gcp_variant_transforms/testing/data/input_files/sample b/gcp_variant_transforms/testing/data/input_files/sample new file mode 100644 index 000000000..accc8f26a --- /dev/null +++ b/gcp_variant_transforms/testing/data/input_files/sample @@ -0,0 +1,3 @@ +./gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf +./gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf +./gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf diff --git a/gcp_variant_transforms/testing/data/input_files/wrong b/gcp_variant_transforms/testing/data/input_files/wrong new file mode 100644 index 000000000..d42f357af --- /dev/null +++ b/gcp_variant_transforms/testing/data/input_files/wrong @@ -0,0 +1,3 @@ +./gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf +non_existent.vcf +./gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf diff --git a/gcp_variant_transforms/testing/integration/run_vcf_to_bq_tests.py b/gcp_variant_transforms/testing/integration/run_vcf_to_bq_tests.py index 38ed45f47..573f65f4d 100644 --- a/gcp_variant_transforms/testing/integration/run_vcf_to_bq_tests.py +++ b/gcp_variant_transforms/testing/integration/run_vcf_to_bq_tests.py @@ -16,7 +16,7 @@ To define a new integration test case, create a json file in `gcp_variant_transforms/testing/integration/vcf_to_bq_tests` directory and -specify at least test_name, table_name, and input_pattern for the integration +specify at least test_name and table_name for the integration test. You may add multiple test cases (Now at most two are supported) in one json file, and the second test case will run after the first one finishes. @@ -66,7 +66,6 @@ def __init__(self, context, # type: TestContextManager test_name, # type: str table_name, # type: str - input_pattern, # type: str assertion_configs, # type: List[Dict] zones=None, # type: List[str] **kwargs # type: **str @@ -78,8 +77,7 @@ def __init__(self, self._project = context.project output_table = '{}:{}'.format(context.project, self._table_name) self._assertion_configs = assertion_configs - args = ['--input_pattern {}'.format(input_pattern), - '--output_table {}'.format(output_table), + args = ['--output_table {}'.format(output_table), '--project {}'.format(context.project), '--staging_location {}'.format(context.staging_location), '--temp_location {}'.format(context.temp_location), @@ -259,8 +257,7 @@ def _get_args(): def _get_test_configs(run_presubmit_tests, run_all_tests, test_file_suffix=''): # type: (bool, bool, str) -> List[List[Dict]] """Gets all test configs.""" - required_keys = ['test_name', 'table_name', 'input_pattern', - 'assertion_configs'] + required_keys = ['test_name', 'table_name', 'assertion_configs'] test_file_path = _get_test_file_path(run_presubmit_tests, run_all_tests, test_file_suffix) test_configs = run_tests_common.get_configs(test_file_path, diff --git a/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/small_tests/combine_from_multiple_inputs.json b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/small_tests/combine_from_multiple_inputs.json new file mode 100644 index 000000000..d6a011e63 --- /dev/null +++ b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/small_tests/combine_from_multiple_inputs.json @@ -0,0 +1,24 @@ +[ + { + "test_name": "combine-from-multiple-inputs", + "table_name": "combine_from_multiple_inputs", + "input_file": "gs://gcp-variant-transforms-testfiles/small_tests/input_files/combine_input", + "allow_incompatible_records": "True", + "runner": "DirectRunner", + "zones": ["us-west1-b"], + "assertion_configs": [ + { + "query": ["NUM_ROWS_QUERY"], + "expected_result": {"num_rows": 21} + }, + { + "query": ["SELECT SUM(DP) AS sum FROM {TABLE_NAME}"], + "expected_result": {"sum": 241} + }, + { + "query": ["SELECT COUNT(DB) AS cnt FROM {TABLE_NAME}"], + "expected_result": {"cnt": 8} + } + ] + } +] diff --git a/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/small_tests/merge_option_move_to_call_from_file.json b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/small_tests/merge_option_move_to_call_from_file.json new file mode 100644 index 000000000..ddb6e9708 --- /dev/null +++ b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/small_tests/merge_option_move_to_call_from_file.json @@ -0,0 +1,31 @@ +[ + { + "test_name": "merge-option-move-to-calls-from-file", + "table_name": "merge_option_move_to_calls_from_file", + "input_file": "gs://gcp-variant-transforms-testfiles/small_tests/input_files/merge_input", + "runner": "DirectRunner", + "zones": ["us-west1-b"], + "variant_merge_strategy": "MOVE_TO_CALLS", + "assertion_configs": [ + { + "query": ["NUM_ROWS_QUERY"], + "expected_result": {"num_rows": 4} + }, + { + "query": ["SUM_START_QUERY"], + "expected_result": {"sum_start": 1283553} + }, + { + "query": ["SUM_END_QUERY"], + "expected_result": {"sum_end": 1283560} + }, + { + "query": [ + "SELECT COUNT(0) AS num_rows FROM {TABLE_NAME} ", + "WHERE start_position = 14369" + ], + "expected_result": {"num_rows": 1} + } + ] + } +] diff --git a/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/small_tests/platinum_NA12878_hg38_10K_lines_from_file.json b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/small_tests/platinum_NA12878_hg38_10K_lines_from_file.json new file mode 100644 index 000000000..32c860fa1 --- /dev/null +++ b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/small_tests/platinum_NA12878_hg38_10K_lines_from_file.json @@ -0,0 +1,48 @@ +[ + { + "test_name": "platinum-na12877-hg38-10k-lines-from-file", + "table_name": "platinum_NA12877_hg38_10K_lines_from_file", + "input_file": "gs://gcp-variant-transforms-testfiles/small_tests/input_files/platinum_input", + "annotation_fields": "CSQ", + "runner": "DataflowRunner", + "assertion_configs": [ + { + "query": ["NUM_ROWS_QUERY"], + "expected_result": {"num_rows": 9953} + }, + { + "query": [ + "SELECT COUNT(0) AS num_annotation_sets ", + "FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ AS CSQ" + ], + "expected_result": {"num_annotation_sets": 45770} + }, + { + "query": [ + "SELECT SUM(start_position * number_of_annotations) AS hash_sum ", + "FROM ( ", + " SELECT start_position, reference_bases, A.alt, ", + " COUNT(0) AS number_of_annotations ", + " FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ AS CSQ", + " GROUP BY 1, 2, 3", + ")" + ], + "expected_result": {"hash_sum": 143375297338} + }, + { + "query": [ + "SELECT COUNT(DISTINCT CSQ.Feature) AS num_features ", + "FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ AS CSQ" + ], + "expected_result": {"num_features": 1575} + }, + { + "query": [ + "SELECT COUNT(DISTINCT CSQ.SYMBOL) AS num_symbol ", + "FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ AS CSQ" + ], + "expected_result": {"num_symbol": 206} + } + ] + } +] diff --git a/gcp_variant_transforms/vcf_to_bq.py b/gcp_variant_transforms/vcf_to_bq.py index 97e893ed3..869eb8fb4 100644 --- a/gcp_variant_transforms/vcf_to_bq.py +++ b/gcp_variant_transforms/vcf_to_bq.py @@ -93,9 +93,12 @@ def _read_variants(input_pattern, pipeline, known_args, pipeline_mode): representative_header_lines = vcf_header_parser.get_metadata_header_lines( known_args.representative_header_file) - if pipeline_mode == pipeline_common.PipelineModes.LARGE: + if (pipeline_mode == pipeline_common.PipelineModes.LARGE or + known_args.input_file): variants = (pipeline - | 'InputFilePattern' >> beam.Create([input_pattern]) + | 'InputFilePattern' >> beam.Create( + [input_pattern] if input_pattern is not None + else pipeline_common.get_file_names(known_args.input_file)) | 'ReadAllFromVcf' >> vcfio.ReadAllFromVcf( representative_header_lines=representative_header_lines, allow_malformed_records=( @@ -106,6 +109,7 @@ def _read_variants(input_pattern, pipeline, known_args, pipeline_mode): representative_header_lines=representative_header_lines, allow_malformed_records=known_args.allow_malformed_records, vcf_parser_type=vcfio.VcfParserType[known_args.vcf_parser]) + return variants @@ -141,7 +145,10 @@ def _add_inferred_headers(input_pattern, # type: str annotation_fields_to_infer = (known_args.annotation_fields if known_args.infer_annotation_types else []) inferred_headers = ( - _read_variants(input_pattern, pipeline, known_args, pipeline_mode) + _read_variants(input_pattern, + pipeline, + known_args, + pipeline_mode) | 'FilterVariants' >> filter_variants.FilterVariants( reference_names=known_args.reference_names) | 'InferHeaderFields' >> infer_headers.InferHeaderFields( @@ -204,7 +211,9 @@ def _annotate_vcf_files(input_pattern, known_args, pipeline_args): with beam.Pipeline(options=options) as p: _ = (p - | beam.Create([input_pattern]) + | beam.Create( + [input_pattern] if input_pattern + else pipeline_common.get_file_names(known_args.input_file)) | 'AnnotateShards' >> beam.ParDo( annotate_files.AnnotateFile(known_args, pipeline_args))) if known_args.annotation_fields: @@ -229,12 +238,14 @@ def _update_google_cloud_job_name(google_cloud_options, job_name): google_cloud_options.job_name = job_name -def _merge_headers(input_pattern, known_args, pipeline_args, pipeline_mode, - annotated_vcf_pattern=None): +def _merge_headers(known_args, pipeline_args, + pipeline_mode, annotated_vcf_pattern=None): # type: (str, argparse.Namespace, List[str], int, str) -> None """Merges VCF headers using beam based on pipeline_mode.""" if known_args.representative_header_file: return + input_pattern = known_args.input_pattern + input_file = known_args.input_file options = pipeline_options.PipelineOptions(pipeline_args) @@ -258,7 +269,8 @@ def _merge_headers(input_pattern, known_args, pipeline_args, pipeline_mode, temp_directory, temp_merged_headers_file_name) with beam.Pipeline(options=options) as p: - headers = pipeline_common.read_headers(p, pipeline_mode, input_pattern) + headers = pipeline_common.read_headers(p, pipeline_mode, input_pattern, + input_file) merged_header = pipeline_common.get_merged_headers( headers, known_args.split_alternate_allele_info_fields, @@ -302,7 +314,9 @@ def _run_annotation_pipeline(known_args, pipeline_args): files_to_be_annotated = known_args.input_pattern if known_args.shard_variants: pipeline_mode = pipeline_common.get_pipeline_mode( - known_args.input_pattern, known_args.optimize_for_large_inputs) + known_args.input_pattern, + known_args.input_file, + known_args.optimize_for_large_inputs) files_to_be_annotated = _shard_variants(known_args, pipeline_args, pipeline_mode) @@ -322,14 +336,17 @@ def run(argv=None): annotated_vcf_pattern = _run_annotation_pipeline(known_args, pipeline_args) input_pattern = annotated_vcf_pattern or known_args.input_pattern + input_file = known_args.input_file variant_merger = _get_variant_merge_strategy(known_args) + pipeline_mode = pipeline_common.get_pipeline_mode( - input_pattern, known_args.optimize_for_large_inputs) + input_pattern, input_file, known_args.optimize_for_large_inputs) # Starts a pipeline to merge VCF headers in beam if the total files that # match the input pattern exceeds _SMALL_DATA_THRESHOLD - _merge_headers(known_args.input_pattern, known_args, pipeline_args, + _merge_headers(known_args, pipeline_args, pipeline_mode, annotated_vcf_pattern) + # Retrieve merged headers prior to launching the pipeline. This is needed # since the BigQuery schema cannot yet be dynamically created based on input. # See https://issues.apache.org/jira/browse/BEAM-2801. @@ -355,6 +372,8 @@ def run(argv=None): beam_pipeline_options = pipeline_options.PipelineOptions(pipeline_args) pipeline = beam.Pipeline(options=beam_pipeline_options) variants = _read_variants(input_pattern, pipeline, known_args, pipeline_mode) + logging.info('got here') + logging.info(variants) variants |= 'FilterVariants' >> filter_variants.FilterVariants( reference_names=known_args.reference_names) if partitioner: diff --git a/gcp_variant_transforms/vcf_to_bq_preprocess.py b/gcp_variant_transforms/vcf_to_bq_preprocess.py index 777c7372c..1ddca5a56 100644 --- a/gcp_variant_transforms/vcf_to_bq_preprocess.py +++ b/gcp_variant_transforms/vcf_to_bq_preprocess.py @@ -94,18 +94,28 @@ def run(argv=None): known_args, pipeline_args = pipeline_common.parse_args(argv, _COMMAND_LINE_OPTIONS) options = pipeline_options.PipelineOptions(pipeline_args) - pipeline_mode = pipeline_common.get_pipeline_mode(known_args.input_pattern) + pipeline_mode = pipeline_common.get_pipeline_mode(known_args.input_pattern, + known_args.input_file) with beam.Pipeline(options=options) as p: headers = pipeline_common.read_headers(p, pipeline_mode, - known_args.input_pattern) + known_args.input_pattern, + known_args.input_file) merged_headers = pipeline_common.get_merged_headers(headers) merged_definitions = (headers | 'MergeDefinitions' >> merge_header_definitions.MergeDefinitions()) if known_args.report_all_conflicts: - variants = p | 'ReadFromVcf' >> vcfio.ReadFromVcf( - known_args.input_pattern, allow_malformed_records=True) + if known_args.input_pattern: + variants = p | 'ReadFromVcf' >> vcfio.ReadFromVcf( + known_args.input_pattern, allow_malformed_records=True) + else: + variants = (p + | 'InputFilePattern' >> beam.Create( + pipeline_common.get_file_names(known_args.input_file)) + | 'ReadAllFromVcf' >> vcfio.ReadAllFromVcf( + allow_malformed_records=True)) + malformed_records = variants | filter_variants.ExtractMalformedVariants() inferred_headers, merged_headers = (_get_inferred_headers(variants, merged_headers)) From 7e4aecabd7e22e37112519e40cf60ca3e1fbb3e0 Mon Sep 17 00:00:00 2001 From: Tural Neymanov Date: Wed, 6 Mar 2019 16:20:28 -0500 Subject: [PATCH 2/7] Apply the requested changes to the PR. - replace all (input_pattern, input_file) inputs to input_patterns - add "combine" input file - refactor v_t_options file - add integration tests for preprocessor and vcf->bq with annotations --- .../options/variant_transform_options.py | 129 +++++++----------- gcp_variant_transforms/pipeline_common.py | 92 ++++++------- .../pipeline_common_test.py | 8 +- .../testing/data/input_files/combine | 1 + .../no_conflicts_from_file.json | 14 ++ .../integration/run_preprocessor_tests.py | 9 +- gcp_variant_transforms/vcf_to_bq.py | 51 ++++--- .../vcf_to_bq_preprocess.py | 14 +- 8 files changed, 141 insertions(+), 177 deletions(-) create mode 100644 gcp_variant_transforms/testing/data/input_files/combine create mode 100644 gcp_variant_transforms/testing/integration/preprocessor_tests/no_conflicts_from_file.json diff --git a/gcp_variant_transforms/options/variant_transform_options.py b/gcp_variant_transforms/options/variant_transform_options.py index 4a0d7ed80..a8c93c825 100644 --- a/gcp_variant_transforms/options/variant_transform_options.py +++ b/gcp_variant_transforms/options/variant_transform_options.py @@ -60,9 +60,9 @@ def add_arguments(self, parser): parser.add_argument( '--input_file', help=('File that contains the list of VCF file names to input. Either ' - 'this or --input_pattern flag has to be provided, exlusively.' - 'Note that using input_file than input_pattern is slower for ' - 'inputs that contain less than 50k files.')) + 'this or --input_pattern flag has to be provided, exclusively.' + 'Note that using input_file rather than input_pattern is slower ' + 'for inputs that contain less than 50k files.')) parser.add_argument( '--allow_malformed_records', type='bool', default=False, nargs='?', const=True, @@ -114,51 +114,14 @@ def add_arguments(self, parser): 'is not recommended.'.format(vcfio.VcfParserType.PYVCF.name, vcfio.VcfParserType.NUCLEUS.name))) + def validate(self, parsed_args): # type: (argparse.Namespace) -> None if parsed_args.infer_headers and parsed_args.representative_header_file: raise ValueError('Both --infer_headers and --representative_header_file ' 'are passed! Please double check and choose at most one ' 'of them.') - if ((parsed_args.input_pattern and parsed_args.input_file) or - (not parsed_args.input_pattern and not parsed_args.input_file)): - raise ValueError('One and only one of input_pattern and input_file has ' - 'to be provided.') - if parsed_args.input_pattern is not None: - try: - # Gets at most 1 pattern match result of type `filesystems.MatchResult`. - first_match = filesystems.FileSystems.match( - [parsed_args.input_pattern], [1])[0] - if not first_match.metadata_list: - raise ValueError('Input pattern {} did not match any files.'.format( - parsed_args.input_pattern)) - except filesystem.BeamIOError: - raise ValueError('Invalid or inaccessible input pattern {}.'.format( - parsed_args.input_pattern)) - else: - if not filesystems.FileSystems.exists(parsed_args.input_file): - raise ValueError('Input file {} doesn''t exist'.format( - parsed_args.input_file)) - all_patterns = [] - with filesystems.FileSystems.open(parsed_args.input_file) as f: - for _, l in enumerate(f): - all_patterns.append(l.strip()) - if not all_patterns: - raise ValueError('Input file {} is empty.'.format( - parsed_args.input_file)) - try: - # Gets at most 1 pattern match result of type `filesystems.MatchResult`. - matches = filesystems.FileSystems.match( - all_patterns, [1] * len(all_patterns)) - for match in matches: - if not match.metadata_list: - raise ValueError( - 'Input pattern {} from {} did not match any files.'.format( - match.pattern, parsed_args.input_file)) - except filesystem.BeamIOError: - raise ValueError( - 'Some patterns in {} are invalid or inaccessible.'.format( - parsed_args.input_file)) + _validate_inputs(parsed_args) class AvroWriteOptions(VariantTransformsOptions): @@ -545,45 +508,7 @@ def add_arguments(self, parser): 'path if run locally, or a cloud path if run on Dataflow.')) def validate(self, parsed_args): - if ((parsed_args.input_pattern and parsed_args.input_file) or - (not parsed_args.input_pattern and not parsed_args.input_file)): - raise ValueError('One and only one of input_pattern and input_file has ' - 'to be provided.') - if parsed_args.input_pattern is not None: - try: - # Gets at most 1 pattern match result of type `filesystems.MatchResult`. - first_match = filesystems.FileSystems.match( - [parsed_args.input_pattern], [1])[0] - if not first_match.metadata_list: - raise ValueError('Input pattern {} did not match any files.'.format( - parsed_args.input_pattern)) - except filesystem.BeamIOError: - raise ValueError('Invalid or inaccessible input pattern {}.'.format( - parsed_args.input_pattern)) - else: - if not filesystems.FileSystems.exists(parsed_args.input_file): - raise ValueError('Input file {} doesn''t exist'.format( - parsed_args.input_file)) - all_patterns = [] - with filesystems.FileSystems.open(parsed_args.input_file) as f: - for _, l in enumerate(f): - all_patterns.append(l.strip()) - if not all_patterns: - raise ValueError('Input file {} is empty.'.format( - parsed_args.input_file)) - try: - # Gets at most 1 pattern match result of type `filesystems.MatchResult`. - matches = filesystems.FileSystems.match( - all_patterns, [1] * len(all_patterns)) - for match in matches: - if not match.metadata_list: - raise ValueError( - 'Input pattern {} from {} did not match any files.'.format( - match.pattern, parsed_args.input_file)) - except filesystem.BeamIOError: - raise ValueError( - 'Some patterns in {} are invalid or inaccessible.'.format( - parsed_args.input_file)) + _validate_inputs(parsed_args) class PartitionOptions(VariantTransformsOptions): @@ -668,3 +593,45 @@ def add_arguments(self, parser): 'be the same as the BigQuery table, but it requires all ' 'extracted variants to have the same call name ordering (usually ' 'true for tables from single VCF file import).')) + + +def _validate_inputs(parsed_args): + if ((parsed_args.input_pattern and parsed_args.input_file) or + (not parsed_args.input_pattern and not parsed_args.input_file)): + raise ValueError('Exactly one of input_pattern and input_file has to be ' + 'provided.') + if parsed_args.input_pattern is not None: + try: + # Gets at most 1 pattern match result of type `filesystems.MatchResult`. + first_match = filesystems.FileSystems.match( + [parsed_args.input_pattern], [1])[0] + if not first_match.metadata_list: + raise ValueError('Input pattern {} did not match any files.'.format( + parsed_args.input_pattern)) + except filesystem.BeamIOError: + raise ValueError('Invalid or inaccessible input pattern {}.'.format( + parsed_args.input_pattern)) + else: + if not filesystems.FileSystems.exists(parsed_args.input_file): + raise ValueError('Input file {} doesn''t exist'.format( + parsed_args.input_file)) + all_patterns = [] + with filesystems.FileSystems.open(parsed_args.input_file) as f: + for _, l in enumerate(f): + all_patterns.append(l.strip()) + if not all_patterns: + raise ValueError('Input file {} is empty.'.format( + parsed_args.input_file)) + try: + # Gets at most 1 pattern match result of type `filesystems.MatchResult`. + matches = filesystems.FileSystems.match( + all_patterns, [1] * len(all_patterns)) + for match in matches: + if not match.metadata_list: + raise ValueError( + 'Input pattern {} from {} did not match any files.'.format( + match.pattern, parsed_args.input_file)) + except filesystem.BeamIOError: + raise ValueError( + 'Some patterns in {} are invalid or inaccessible.'.format( + parsed_args.input_file)) diff --git a/gcp_variant_transforms/pipeline_common.py b/gcp_variant_transforms/pipeline_common.py index ca49c3f48..304fe784f 100644 --- a/gcp_variant_transforms/pipeline_common.py +++ b/gcp_variant_transforms/pipeline_common.py @@ -66,22 +66,51 @@ def parse_args(argv, command_line_options): for transform_options in options: transform_options.validate(known_args) _raise_error_on_invalid_flags(pipeline_args) + known_args.input_patterns = _get_input_patterns( + known_args.input_pattern, known_args.input_file) return known_args, pipeline_args +def _get_input_patterns(input_pattern, input_file): + return ( + _get_file_names(input_file) if input_pattern is None else [input_pattern]) + +def _get_file_names(input_file): + # type (str) -> List(str) + """ Reads all input file and extracts list of patterns out of it.""" + result = [] + if not filesystems.FileSystems.exists(input_file): + raise ValueError('Input file {} doesn''t exist'.format(input_file)) + with filesystems.FileSystems.open(input_file) as f: + for _, l in enumerate(f): + result.append(l) + if not result: + raise ValueError('Input file {} is empty.'.format(input_file)) + return result def get_pipeline_mode( - input_pattern, - input_file, + input_patterns, + input_file=None, optimize_for_large_inputs=False): # type: (str, bool) -> int """Returns the mode the pipeline should operate in based on input size.""" if optimize_for_large_inputs: return PipelineModes.LARGE - if input_pattern is not None: - total_files = _get_pipeline_mode_for_pattern_input(input_pattern) - else: - total_files = _get_pipeline_mode_for_file_input(input_file) + match_results = filesystems.FileSystems.match(input_patterns) + if not match_results: + if input_file: + raise ValueError( + 'No files matched any input pattern in {} file'.format(input_file)) + else: + raise ValueError( + 'No files matched {} input pattern'.format(input_patterns[0])) + + total_files = 0 + for match in match_results: + if not match.metadata_list: + raise ValueError( + 'Input pattern {} did not match any files.'.format(match.pattern)) + total_files += len(match.metadata_list) if total_files > _LARGE_DATA_THRESHOLD: return PipelineModes.LARGE @@ -89,59 +118,16 @@ def get_pipeline_mode( return PipelineModes.MEDIUM return PipelineModes.SMALL -def _get_pipeline_mode_for_pattern_input(input_pattern): - match_results = filesystems.FileSystems.match([input_pattern]) - if not match_results: - raise ValueError('No files matched input_pattern: {}'.format(input_pattern)) - return len(match_results[0].metadata_list) - -def _get_pipeline_mode_for_file_input(input_file): - if not filesystems.FileSystems.exists(input_file): - raise ValueError('Input file {} doesn''t exist'.format(input_file)) - all_patterns = [] - with filesystems.FileSystems.open(input_file) as f: - for _, l in enumerate(f): - all_patterns.append(l.strip()) - if not all_patterns: - raise ValueError('Input file {} is empty.'.format(input_file)) - match_results = filesystems.FileSystems.match(all_patterns) - if not match_results: - raise ValueError( - 'No files matched any input pattern in {} file'.format(input_file)) - total_number = 0 - for match in match_results: - if not match.metadata_list: - raise ValueError( - 'Input pattern {} from {} did not match any files.'.format( - match.pattern, input_file)) - total_number += len(match.metadata_list) - return total_number - -def get_file_names(input_file): - #if (input_pattern): - # return [input_pattern] - result = [] - if not filesystems.FileSystems.exists(input_file): - raise ValueError('Input file {} doesn''t exist'.format(input_file)) - with filesystems.FileSystems.open(input_file) as f: - for _, l in enumerate(f): - result.append(l) - if not result: - raise ValueError('Input file {} is empty.'.format(input_file)) - return result - -def read_headers(pipeline, pipeline_mode, input_pattern, input_file): +def read_headers(pipeline, pipeline_mode, input_patterns): # type: (beam.Pipeline, int, str) -> pvalue.PCollection """Creates an initial PCollection by reading the VCF file headers.""" - if pipeline_mode == PipelineModes.LARGE or input_file: + if pipeline_mode == PipelineModes.LARGE or len(input_patterns) > 1: headers = (pipeline - | beam.Create( - [input_pattern] if input_pattern is not None - else get_file_names(input_file)) + | beam.Create(input_patterns) | vcf_header_io.ReadAllVcfHeaders()) else: - headers = pipeline | vcf_header_io.ReadVcfHeaders(input_pattern) + headers = pipeline | vcf_header_io.ReadVcfHeaders(input_patterns[0]) return headers diff --git a/gcp_variant_transforms/pipeline_common_test.py b/gcp_variant_transforms/pipeline_common_test.py index c56f995e1..a19fe42d0 100644 --- a/gcp_variant_transforms/pipeline_common_test.py +++ b/gcp_variant_transforms/pipeline_common_test.py @@ -31,7 +31,9 @@ def _create_mock_args(self, **args): return collections.namedtuple('MockArgs', args.keys())(*args.values()) def _get_pipeline_mode(self, args): - return pipeline_common.get_pipeline_mode(args.input_pattern, + input_patterns = pipeline_common._get_input_patterns(args.input_pattern, + args.input_file) + return pipeline_common.get_pipeline_mode(input_patterns, args.input_file, args.optimize_for_large_inputs) @@ -109,7 +111,9 @@ def _create_mock_args(self, **args): return collections.namedtuple('MockArgs', args.keys())(*args.values()) def _get_pipeline_mode(self, args): - return pipeline_common.get_pipeline_mode(args.input_pattern, + input_patterns = pipeline_common._get_input_patterns(args.input_pattern, + args.input_file) + return pipeline_common.get_pipeline_mode(input_patterns, args.input_file, args.optimize_for_large_inputs) diff --git a/gcp_variant_transforms/testing/data/input_files/combine b/gcp_variant_transforms/testing/data/input_files/combine new file mode 100644 index 000000000..81a49aca0 --- /dev/null +++ b/gcp_variant_transforms/testing/data/input_files/combine @@ -0,0 +1 @@ +gs://gcp-variant-transforms-testfiles/small_tests/valid-4.0*.vcf diff --git a/gcp_variant_transforms/testing/integration/preprocessor_tests/no_conflicts_from_file.json b/gcp_variant_transforms/testing/integration/preprocessor_tests/no_conflicts_from_file.json new file mode 100644 index 000000000..5c3d2d74f --- /dev/null +++ b/gcp_variant_transforms/testing/integration/preprocessor_tests/no_conflicts_from_file.json @@ -0,0 +1,14 @@ +[ + { + "test_name": "no-conflicts", + "input_pattern": "gs://gcp-variant-transforms-testfiles/small_tests/input_files/combine_input", + "report_blob_name": "temp/report_no_conflicts.tsv", + "runner": "DirectRunner", + "zones": ["us-west1-b"], + "expected_contents": [ + "No Header Conflicts Found.", + "", + "" + ] + } +] diff --git a/gcp_variant_transforms/testing/integration/run_preprocessor_tests.py b/gcp_variant_transforms/testing/integration/run_preprocessor_tests.py index 60fbbc5ca..da2e0018a 100644 --- a/gcp_variant_transforms/testing/integration/run_preprocessor_tests.py +++ b/gcp_variant_transforms/testing/integration/run_preprocessor_tests.py @@ -16,7 +16,7 @@ To define a new preprocessor_tests integration test case, create a json file in gcp_variant_transforms/testing/integration/preprocessor_tests directory and -specify at least test_name, input_pattern, blob_name and expected_contents +specify at least test_name, blob_name and expected_contents for the integration test. Execute the following command from the root source directory: @@ -55,7 +55,6 @@ class PreprocessorTestCase(run_tests_common.TestCaseInterface): def __init__(self, parser_args, # type: Namespace test_name, # type: str - input_pattern, # type: str expected_contents, # type: List[str] report_blob_name, # type: str header_blob_name=None, # type: str @@ -71,8 +70,7 @@ def __init__(self, self._report_blob_name = self._append_suffix(report_blob_name, suffix) self._report_path = '/'.join(['gs:/', _BUCKET_NAME, self._report_blob_name]) self._project = parser_args.project - args = ['--input_pattern {}'.format(input_pattern), - '--report_path {}'.format(self._report_path), + args = ['--report_path {}'.format(self._report_path), '--project {}'.format(parser_args.project), '--staging_location {}'.format(parser_args.staging_location), '--temp_location {}'.format(parser_args.temp_location), @@ -150,8 +148,7 @@ def _get_args(): def _get_test_configs(): # type: () -> List[List[Dict]] """Gets all test configs in preprocessor_tests.""" - required_keys = ['test_name', 'report_blob_name', 'input_pattern', - 'expected_contents'] + required_keys = ['test_name', 'report_blob_name', 'expected_contents'] test_file_path = os.path.join(os.getcwd(), _TEST_FOLDER) return run_tests_common.get_configs(test_file_path, required_keys) diff --git a/gcp_variant_transforms/vcf_to_bq.py b/gcp_variant_transforms/vcf_to_bq.py index 869eb8fb4..e73a31cd1 100644 --- a/gcp_variant_transforms/vcf_to_bq.py +++ b/gcp_variant_transforms/vcf_to_bq.py @@ -85,7 +85,7 @@ _SHARDS_FOLDER = 'shards' -def _read_variants(input_pattern, pipeline, known_args, pipeline_mode): +def _read_variants(input_patterns, pipeline, known_args, pipeline_mode): # type: (str, beam.Pipeline, argparse.Namespace, int) -> pvalue.PCollection """Helper method for returning a PCollection of Variants from VCFs.""" representative_header_lines = None @@ -94,18 +94,16 @@ def _read_variants(input_pattern, pipeline, known_args, pipeline_mode): known_args.representative_header_file) if (pipeline_mode == pipeline_common.PipelineModes.LARGE or - known_args.input_file): + len(input_patterns) > 1): variants = (pipeline - | 'InputFilePattern' >> beam.Create( - [input_pattern] if input_pattern is not None - else pipeline_common.get_file_names(known_args.input_file)) + | 'InputFilePattern' >> beam.Create(input_patterns) | 'ReadAllFromVcf' >> vcfio.ReadAllFromVcf( representative_header_lines=representative_header_lines, allow_malformed_records=( known_args.allow_malformed_records))) else: variants = pipeline | 'ReadFromVcf' >> vcfio.ReadFromVcf( - input_pattern, + input_patterns[0], representative_header_lines=representative_header_lines, allow_malformed_records=known_args.allow_malformed_records, vcf_parser_type=vcfio.VcfParserType[known_args.vcf_parser]) @@ -135,7 +133,7 @@ def _get_variant_merge_strategy(known_args # type: argparse.Namespace raise ValueError('Merge strategy is not supported.') -def _add_inferred_headers(input_pattern, # type: str +def _add_inferred_headers(input_patterns, # type: str pipeline, # type: beam.Pipeline known_args, # type: argparse.Namespace merged_header, # type: pvalue.PCollection @@ -145,7 +143,7 @@ def _add_inferred_headers(input_pattern, # type: str annotation_fields_to_infer = (known_args.annotation_fields if known_args.infer_annotation_types else []) inferred_headers = ( - _read_variants(input_pattern, + _read_variants(input_patterns, pipeline, known_args, pipeline_mode) @@ -179,9 +177,9 @@ def _shard_variants(known_args, pipeline_args, pipeline_mode): _update_google_cloud_job_name(google_cloud_options, shard_files_job_name) vcf_shards_output_dir = filesystems.FileSystems.join( known_args.annotation_output_dir, _SHARDS_FOLDER) + input_patterns = known_args.input_patterns with beam.Pipeline(options=options) as p: - variants = _read_variants(known_args.input_pattern, p, known_args, - pipeline_mode) + variants = _read_variants(input_patterns, p, known_args, pipeline_mode) call_names = (variants | 'CombineCallNames' >> combine_call_names.CallNamesCombiner()) @@ -193,10 +191,10 @@ def _shard_variants(known_args, pipeline_args, pipeline_mode): beam.pvalue.AsSingleton(call_names), known_args.number_of_variants_per_shard)) - return vep_runner_util.format_dir_path(vcf_shards_output_dir) + return [vep_runner_util.format_dir_path(vcf_shards_output_dir)] -def _annotate_vcf_files(input_pattern, known_args, pipeline_args): +def _annotate_vcf_files(input_patterns, known_args, pipeline_args): # type: (str, argparse.Namespace, List[str]) -> str """Annotates the VCF files using VEP. @@ -211,9 +209,7 @@ def _annotate_vcf_files(input_pattern, known_args, pipeline_args): with beam.Pipeline(options=options) as p: _ = (p - | beam.Create( - [input_pattern] if input_pattern - else pipeline_common.get_file_names(known_args.input_file)) + | beam.Create(input_patterns) | 'AnnotateShards' >> beam.ParDo( annotate_files.AnnotateFile(known_args, pipeline_args))) if known_args.annotation_fields: @@ -244,9 +240,7 @@ def _merge_headers(known_args, pipeline_args, """Merges VCF headers using beam based on pipeline_mode.""" if known_args.representative_header_file: return - input_pattern = known_args.input_pattern - input_file = known_args.input_file - + input_patterns = known_args.input_patterns options = pipeline_options.PipelineOptions(pipeline_args) # Always run pipeline locally if data is small. @@ -269,8 +263,7 @@ def _merge_headers(known_args, pipeline_args, temp_directory, temp_merged_headers_file_name) with beam.Pipeline(options=options) as p: - headers = pipeline_common.read_headers(p, pipeline_mode, input_pattern, - input_file) + headers = pipeline_common.read_headers(p, pipeline_mode, input_patterns) merged_header = pipeline_common.get_merged_headers( headers, known_args.split_alternate_allele_info_fields, @@ -280,7 +273,9 @@ def _merge_headers(known_args, pipeline_args, p, known_args, pipeline_mode, merged_header, annotated_vcf_pattern) if known_args.infer_headers or known_args.infer_annotation_types: - infer_headers_input_pattern = annotated_vcf_pattern or input_pattern + infer_headers_input_pattern = ( + [annotated_vcf_pattern] if + annotated_vcf_pattern else known_args.input_patterns) merged_header = _add_inferred_headers(infer_headers_input_pattern, p, known_args, merged_header, pipeline_mode) @@ -311,10 +306,10 @@ def _run_annotation_pipeline(known_args, pipeline_args): _validate_annotation_pipeline_args(known_args, pipeline_args) known_args.omit_empty_sample_calls = True - files_to_be_annotated = known_args.input_pattern + files_to_be_annotated = known_args.input_patterns if known_args.shard_variants: pipeline_mode = pipeline_common.get_pipeline_mode( - known_args.input_pattern, + files_to_be_annotated, known_args.input_file, known_args.optimize_for_large_inputs) files_to_be_annotated = _shard_variants(known_args, @@ -335,12 +330,14 @@ def run(argv=None): annotated_vcf_pattern = _run_annotation_pipeline(known_args, pipeline_args) - input_pattern = annotated_vcf_pattern or known_args.input_pattern - input_file = known_args.input_file + input_patterns = known_args.input_patterns + variant_merger = _get_variant_merge_strategy(known_args) pipeline_mode = pipeline_common.get_pipeline_mode( - input_pattern, input_file, known_args.optimize_for_large_inputs) + input_patterns, + known_args.input_file, + known_args.optimize_for_large_inputs) # Starts a pipeline to merge VCF headers in beam if the total files that # match the input pattern exceeds _SMALL_DATA_THRESHOLD _merge_headers(known_args, pipeline_args, @@ -371,7 +368,7 @@ def run(argv=None): beam_pipeline_options = pipeline_options.PipelineOptions(pipeline_args) pipeline = beam.Pipeline(options=beam_pipeline_options) - variants = _read_variants(input_pattern, pipeline, known_args, pipeline_mode) + variants = _read_variants(input_patterns, pipeline, known_args, pipeline_mode) logging.info('got here') logging.info(variants) variants |= 'FilterVariants' >> filter_variants.FilterVariants( diff --git a/gcp_variant_transforms/vcf_to_bq_preprocess.py b/gcp_variant_transforms/vcf_to_bq_preprocess.py index 1ddca5a56..ed3f2a0cb 100644 --- a/gcp_variant_transforms/vcf_to_bq_preprocess.py +++ b/gcp_variant_transforms/vcf_to_bq_preprocess.py @@ -94,25 +94,23 @@ def run(argv=None): known_args, pipeline_args = pipeline_common.parse_args(argv, _COMMAND_LINE_OPTIONS) options = pipeline_options.PipelineOptions(pipeline_args) - pipeline_mode = pipeline_common.get_pipeline_mode(known_args.input_pattern, + input_patterns = known_args.input_patterns + pipeline_mode = pipeline_common.get_pipeline_mode(input_patterns, known_args.input_file) with beam.Pipeline(options=options) as p: - headers = pipeline_common.read_headers(p, pipeline_mode, - known_args.input_pattern, - known_args.input_file) + headers = pipeline_common.read_headers(p, pipeline_mode, input_patterns) merged_headers = pipeline_common.get_merged_headers(headers) merged_definitions = (headers | 'MergeDefinitions' >> merge_header_definitions.MergeDefinitions()) if known_args.report_all_conflicts: - if known_args.input_pattern: + if len(input_patterns) == 1: variants = p | 'ReadFromVcf' >> vcfio.ReadFromVcf( - known_args.input_pattern, allow_malformed_records=True) + input_patterns[0], allow_malformed_records=True) else: variants = (p - | 'InputFilePattern' >> beam.Create( - pipeline_common.get_file_names(known_args.input_file)) + | 'InputFilePattern' >> beam.Create(input_patterns) | 'ReadAllFromVcf' >> vcfio.ReadAllFromVcf( allow_malformed_records=True)) From 9a072391965f84f38311edebcff3cb8b179f88f5 Mon Sep 17 00:00:00 2001 From: Tural Neymanov Date: Tue, 12 Mar 2019 17:31:45 -0400 Subject: [PATCH 3/7] Address comments: - Remove validation from pipeline_common. - Adjust tests accordingly. - Change input_pattern checks to treat '' as absent input, to simplify the code. --- .../options/variant_transform_options.py | 2 +- gcp_variant_transforms/pipeline_common.py | 14 +------- .../pipeline_common_test.py | 34 +++---------------- gcp_variant_transforms/vcf_to_bq.py | 2 -- .../vcf_to_bq_preprocess.py | 3 +- 5 files changed, 7 insertions(+), 48 deletions(-) diff --git a/gcp_variant_transforms/options/variant_transform_options.py b/gcp_variant_transforms/options/variant_transform_options.py index a8c93c825..0daa12a85 100644 --- a/gcp_variant_transforms/options/variant_transform_options.py +++ b/gcp_variant_transforms/options/variant_transform_options.py @@ -600,7 +600,7 @@ def _validate_inputs(parsed_args): (not parsed_args.input_pattern and not parsed_args.input_file)): raise ValueError('Exactly one of input_pattern and input_file has to be ' 'provided.') - if parsed_args.input_pattern is not None: + if parsed_args.input_pattern: try: # Gets at most 1 pattern match result of type `filesystems.MatchResult`. first_match = filesystems.FileSystems.match( diff --git a/gcp_variant_transforms/pipeline_common.py b/gcp_variant_transforms/pipeline_common.py index 304fe784f..f685a2ccd 100644 --- a/gcp_variant_transforms/pipeline_common.py +++ b/gcp_variant_transforms/pipeline_common.py @@ -71,8 +71,7 @@ def parse_args(argv, command_line_options): return known_args, pipeline_args def _get_input_patterns(input_pattern, input_file): - return ( - _get_file_names(input_file) if input_pattern is None else [input_pattern]) + return [input_pattern] if input_pattern else _get_file_names(input_file) def _get_file_names(input_file): # type (str) -> List(str) @@ -89,7 +88,6 @@ def _get_file_names(input_file): def get_pipeline_mode( input_patterns, - input_file=None, optimize_for_large_inputs=False): # type: (str, bool) -> int """Returns the mode the pipeline should operate in based on input size.""" @@ -97,19 +95,9 @@ def get_pipeline_mode( return PipelineModes.LARGE match_results = filesystems.FileSystems.match(input_patterns) - if not match_results: - if input_file: - raise ValueError( - 'No files matched any input pattern in {} file'.format(input_file)) - else: - raise ValueError( - 'No files matched {} input pattern'.format(input_patterns[0])) total_files = 0 for match in match_results: - if not match.metadata_list: - raise ValueError( - 'Input pattern {} did not match any files.'.format(match.pattern)) total_files += len(match.metadata_list) if total_files > _LARGE_DATA_THRESHOLD: diff --git a/gcp_variant_transforms/pipeline_common_test.py b/gcp_variant_transforms/pipeline_common_test.py index a19fe42d0..3935b6394 100644 --- a/gcp_variant_transforms/pipeline_common_test.py +++ b/gcp_variant_transforms/pipeline_common_test.py @@ -34,26 +34,17 @@ def _get_pipeline_mode(self, args): input_patterns = pipeline_common._get_input_patterns(args.input_pattern, args.input_file) return pipeline_common.get_pipeline_mode(input_patterns, - args.input_file, args.optimize_for_large_inputs) - def test_get_mode_raises_error_for_no_match(self): - args = self._create_mock_args( - input_pattern='', input_file=None, optimize_for_large_inputs=False) - - with mock.patch.object(FileSystems, 'match', return_value=None), \ - self.assertRaises(ValueError): - self._get_pipeline_mode(args) - def test_get_mode_optimize_set(self): args = self._create_mock_args( - input_pattern='', input_file=None, optimize_for_large_inputs=True) + input_pattern='test', input_file=None, optimize_for_large_inputs=True) self.assertEqual(self._get_pipeline_mode(args), PipelineModes.LARGE) def test_get_mode_small(self): args = self._create_mock_args( - input_pattern='', input_file=None, optimize_for_large_inputs=False) + input_pattern='test', input_file=None, optimize_for_large_inputs=False) match_result = collections.namedtuple('MatchResult', ['metadata_list']) match = match_result([None for _ in range(100)]) @@ -62,7 +53,7 @@ def test_get_mode_small(self): def test_get_mode_medium(self): args = self._create_mock_args( - input_pattern='', input_file=None, optimize_for_large_inputs=False) + input_pattern='test', input_file=None, optimize_for_large_inputs=False) match_result = collections.namedtuple('MatchResult', ['metadata_list']) match = match_result(range(101)) @@ -75,7 +66,7 @@ def test_get_mode_medium(self): def test_get_mode_large(self): args = self._create_mock_args( - input_pattern='', input_file=None, optimize_for_large_inputs=False) + input_pattern='test', input_file=None, optimize_for_large_inputs=False) match_result = collections.namedtuple('MatchResult', ['metadata_list']) match = match_result(range(50001)) @@ -114,25 +105,8 @@ def _get_pipeline_mode(self, args): input_patterns = pipeline_common._get_input_patterns(args.input_pattern, args.input_file) return pipeline_common.get_pipeline_mode(input_patterns, - args.input_file, args.optimize_for_large_inputs) - def test_get_mode_raises_error_for_no_absent_file(self): - args = self._create_mock_args( - input_pattern=None, - input_file='nonexistent_file', - optimize_for_large_inputs=False) - - self.assertRaises(ValueError, self._get_pipeline_mode, args) - - def test_get_mode_raises_error_for_empty_file(self): - args = self._create_mock_args( - input_pattern=None, - input_file='gcp_variant_transforms/testing/data/input_files/empty', - optimize_for_large_inputs=False) - - self.assertRaises(ValueError, self._get_pipeline_mode, args) - def test_get_mode_optimize_set(self): args = self._create_mock_args( input_pattern=None, diff --git a/gcp_variant_transforms/vcf_to_bq.py b/gcp_variant_transforms/vcf_to_bq.py index e73a31cd1..a5ba16f48 100644 --- a/gcp_variant_transforms/vcf_to_bq.py +++ b/gcp_variant_transforms/vcf_to_bq.py @@ -310,7 +310,6 @@ def _run_annotation_pipeline(known_args, pipeline_args): if known_args.shard_variants: pipeline_mode = pipeline_common.get_pipeline_mode( files_to_be_annotated, - known_args.input_file, known_args.optimize_for_large_inputs) files_to_be_annotated = _shard_variants(known_args, pipeline_args, @@ -336,7 +335,6 @@ def run(argv=None): pipeline_mode = pipeline_common.get_pipeline_mode( input_patterns, - known_args.input_file, known_args.optimize_for_large_inputs) # Starts a pipeline to merge VCF headers in beam if the total files that # match the input pattern exceeds _SMALL_DATA_THRESHOLD diff --git a/gcp_variant_transforms/vcf_to_bq_preprocess.py b/gcp_variant_transforms/vcf_to_bq_preprocess.py index ed3f2a0cb..9ed0f7698 100644 --- a/gcp_variant_transforms/vcf_to_bq_preprocess.py +++ b/gcp_variant_transforms/vcf_to_bq_preprocess.py @@ -95,8 +95,7 @@ def run(argv=None): _COMMAND_LINE_OPTIONS) options = pipeline_options.PipelineOptions(pipeline_args) input_patterns = known_args.input_patterns - pipeline_mode = pipeline_common.get_pipeline_mode(input_patterns, - known_args.input_file) + pipeline_mode = pipeline_common.get_pipeline_mode(input_patterns) with beam.Pipeline(options=options) as p: headers = pipeline_common.read_headers(p, pipeline_mode, input_patterns) From bf5f4725c9e86d15eac0890158ce5fee082ea7fd Mon Sep 17 00:00:00 2001 From: Tural Neymanov Date: Fri, 15 Mar 2019 11:41:51 -0400 Subject: [PATCH 4/7] Apply the requested changes: - Removed merge test duplicate for _from_file option. - Modified combine and gnomad tests. - Updated the gs storage for said tests. - Remove legacy debugging code. - Refactor validation to use pipeline_common for extracting file names - Create files for unit tests on the fly. - Set PipelineMode to Large, when multiple input patterns are available. - Move input pattern/file validation to pipeline_common "file reading" stage to remove duplication. --- .../options/variant_transform_options.py | 37 ----- .../options/variant_transform_options_test.py | 73 ++-------- gcp_variant_transforms/pipeline_common.py | 38 +++-- .../pipeline_common_test.py | 134 ++++++++++-------- .../testing/data/input_files/combine | 3 +- .../testing/data/input_files/empty | 0 .../testing/data/input_files/sample | 3 - .../testing/data/input_files/wrong | 3 - ...RCh38_chrX_head2500_run_vep_from_file.json | 86 +++++++++++ .../combine_from_multiple_inputs.json | 6 +- .../merge_option_move_to_call_from_file.json | 31 ---- gcp_variant_transforms/vcf_to_bq.py | 5 +- 12 files changed, 214 insertions(+), 205 deletions(-) delete mode 100644 gcp_variant_transforms/testing/data/input_files/empty delete mode 100644 gcp_variant_transforms/testing/data/input_files/sample delete mode 100644 gcp_variant_transforms/testing/data/input_files/wrong create mode 100644 gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/medium_tests/gnomad_genomes_GRCh38_chrX_head2500_run_vep_from_file.json delete mode 100644 gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/small_tests/merge_option_move_to_call_from_file.json diff --git a/gcp_variant_transforms/options/variant_transform_options.py b/gcp_variant_transforms/options/variant_transform_options.py index 0daa12a85..2d90098b2 100644 --- a/gcp_variant_transforms/options/variant_transform_options.py +++ b/gcp_variant_transforms/options/variant_transform_options.py @@ -17,8 +17,6 @@ import argparse # pylint: disable=unused-import import re -from apache_beam.io import filesystem -from apache_beam.io import filesystems from apache_beam.io.gcp.internal.clients import bigquery from apitools.base.py import exceptions from oauth2client.client import GoogleCredentials @@ -600,38 +598,3 @@ def _validate_inputs(parsed_args): (not parsed_args.input_pattern and not parsed_args.input_file)): raise ValueError('Exactly one of input_pattern and input_file has to be ' 'provided.') - if parsed_args.input_pattern: - try: - # Gets at most 1 pattern match result of type `filesystems.MatchResult`. - first_match = filesystems.FileSystems.match( - [parsed_args.input_pattern], [1])[0] - if not first_match.metadata_list: - raise ValueError('Input pattern {} did not match any files.'.format( - parsed_args.input_pattern)) - except filesystem.BeamIOError: - raise ValueError('Invalid or inaccessible input pattern {}.'.format( - parsed_args.input_pattern)) - else: - if not filesystems.FileSystems.exists(parsed_args.input_file): - raise ValueError('Input file {} doesn''t exist'.format( - parsed_args.input_file)) - all_patterns = [] - with filesystems.FileSystems.open(parsed_args.input_file) as f: - for _, l in enumerate(f): - all_patterns.append(l.strip()) - if not all_patterns: - raise ValueError('Input file {} is empty.'.format( - parsed_args.input_file)) - try: - # Gets at most 1 pattern match result of type `filesystems.MatchResult`. - matches = filesystems.FileSystems.match( - all_patterns, [1] * len(all_patterns)) - for match in matches: - if not match.metadata_list: - raise ValueError( - 'Input pattern {} from {} did not match any files.'.format( - match.pattern, parsed_args.input_file)) - except filesystem.BeamIOError: - raise ValueError( - 'Some patterns in {} are invalid or inaccessible.'.format( - parsed_args.input_file)) diff --git a/gcp_variant_transforms/options/variant_transform_options_test.py b/gcp_variant_transforms/options/variant_transform_options_test.py index 6fa9a027f..0001d42c4 100644 --- a/gcp_variant_transforms/options/variant_transform_options_test.py +++ b/gcp_variant_transforms/options/variant_transform_options_test.py @@ -26,6 +26,7 @@ from apitools.base.py import exceptions from gcp_variant_transforms.options import variant_transform_options +from gcp_variant_transforms.testing import temp_dir def make_args(options, args): @@ -36,6 +37,13 @@ def make_args(options, args): assert not remaining_args return namespace +SAMPLE_LINES = ['./gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf\n', + './gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf\n', + './gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf\n'] +WRONG_LINES = ['./gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf\n', + 'non_existent.vcf\n', + './gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf\n'] +EMPTY_LINES = [] class VcfReadOptionsTest(unittest.TestCase): """Tests cases for the VcfReadOptions class.""" @@ -68,33 +76,13 @@ def test_failure_for_conflicting_flags_no_errors_with_pattern_input(self): self._options.validate(args) def test_failure_for_conflicting_flags_no_errors_with_file_input(self): - args = self._make_args([ - '--input_file', - 'gcp_variant_transforms/testing/data/input_files/sample', - '--representative_header_file', 'gs://some_file']) - self._options.validate(args) - - def test_failure_for_empty_input_file(self): - args = self._make_args([ - '--input_file', - 'gcp_variant_transforms/testing/data/input_files/empty', - '--representative_header_file', 'gs://some_file']) - self.assertRaises(ValueError, self._options.validate, args) - - def test_failure_for_wrong_pattern_in_input_file(self): - args = self._make_args([ - '--input_file', - 'gcp_variant_transforms/testing/data/input_files/wrong', - '--representative_header_file', 'gs://some_file']) - self.assertRaises(ValueError, self._options.validate, args) - - def test_failure_for_invalid_input_pattern(self): - args = self._make_args(['--input_pattern', 'nonexistent_file.vcf']) - self.assertRaises(ValueError, self._options.validate, args) - - def test_failure_for_invalid_input_file(self): - args = self._make_args(['--input_file', 'nonexistent_file.vcf']) - self.assertRaises(ValueError, self._options.validate, args) + with temp_dir.TempDir() as tempdir: + filename = tempdir.create_temp_file(lines=SAMPLE_LINES) + args = self._make_args([ + '--input_file', + filename, + '--representative_header_file', 'gs://some_file']) + self._options.validate(args) class BigQueryWriteOptionsTest(unittest.TestCase): @@ -208,38 +196,7 @@ def test_failure_for_conflicting_flags_no_errors(self): '--report_path', 'some_path']) self._options.validate(args) - def test_failure_for_invalid_input_pattern(self): - args = self._make_args(['--input_pattern', 'nonexistent_file.vcf', - '--report_path', 'some_path']) - self.assertRaises(ValueError, self._options.validate, args) - - def test_failure_for_invalid_input_file(self): - args = self._make_args(['--input_file', 'nonexistent_file.vcf', - '--report_path', 'some_path']) - self.assertRaises(ValueError, self._options.validate, args) - def test_failure_for_conflicting_flags_no_errors_with_pattern_input(self): args = self._make_args(['--input_pattern', '*', '--report_path', 'some_path']) self._options.validate(args) - - def test_failure_for_conflicting_flags_no_errors_with_file_input(self): - args = self._make_args([ - '--input_file', - 'gcp_variant_transforms/testing/data/input_files/sample', - '--report_path', 'some_path']) - self._options.validate(args) - - def test_failure_for_empty_input_file(self): - args = self._make_args([ - '--input_file', - 'gcp_variant_transforms/testing/data/input_files/empty', - '--report_path', 'some_path']) - self.assertRaises(ValueError, self._options.validate, args) - - def test_failure_for_wrong_pattern_in_input_file(self): - args = self._make_args([ - '--input_file', - 'gcp_variant_transforms/testing/data/input_files/wrong', - '--report_path', 'some_path']) - self.assertRaises(ValueError, self._options.validate, args) diff --git a/gcp_variant_transforms/pipeline_common.py b/gcp_variant_transforms/pipeline_common.py index f685a2ccd..7c241ef79 100644 --- a/gcp_variant_transforms/pipeline_common.py +++ b/gcp_variant_transforms/pipeline_common.py @@ -26,6 +26,7 @@ import apache_beam as beam from apache_beam import pvalue # pylint: disable=unused-import +from apache_beam.io import filesystem from apache_beam.io import filesystems from apache_beam.options import pipeline_options from apache_beam.runners.direct import direct_runner @@ -66,22 +67,44 @@ def parse_args(argv, command_line_options): for transform_options in options: transform_options.validate(known_args) _raise_error_on_invalid_flags(pipeline_args) - known_args.input_patterns = _get_input_patterns( + known_args.input_patterns = get_input_patterns( known_args.input_pattern, known_args.input_file) return known_args, pipeline_args -def _get_input_patterns(input_pattern, input_file): - return [input_pattern] if input_pattern else _get_file_names(input_file) +def get_input_patterns(input_pattern, input_file): + patterns = [input_pattern] if input_pattern else _get_file_names(input_file) + + # Validate inputs. + try: + # Gets at most 1 pattern match result of type `filesystems.MatchResult`. + matches = filesystems.FileSystems.match(patterns, [1] * len(patterns)) + for match in matches: + if input_file and not match.metadata_list: + raise ValueError( + 'Input pattern {} from {} did not match any files.'.format( + match.pattern, input_file)) + elif not match.metadata_list: + raise ValueError( + 'Input pattern {} did not match any files.'.format(match.pattern)) + except filesystem.BeamIOError: + if input_file: + raise ValueError( + 'Some patterns in {} are invalid or inaccessible.'.format( + input_file)) + else: + raise ValueError('Invalid or inaccessible input pattern {}.'.format( + input_pattern)) + return patterns def _get_file_names(input_file): # type (str) -> List(str) """ Reads all input file and extracts list of patterns out of it.""" result = [] if not filesystems.FileSystems.exists(input_file): - raise ValueError('Input file {} doesn''t exist'.format(input_file)) + raise ValueError('Input file {} doesn\'t exist'.format(input_file)) with filesystems.FileSystems.open(input_file) as f: for _, l in enumerate(f): - result.append(l) + result.append(l.strip()) if not result: raise ValueError('Input file {} is empty.'.format(input_file)) return result @@ -91,7 +114,7 @@ def get_pipeline_mode( optimize_for_large_inputs=False): # type: (str, bool) -> int """Returns the mode the pipeline should operate in based on input size.""" - if optimize_for_large_inputs: + if optimize_for_large_inputs or len(input_patterns) > 1: return PipelineModes.LARGE match_results = filesystems.FileSystems.match(input_patterns) @@ -106,11 +129,10 @@ def get_pipeline_mode( return PipelineModes.MEDIUM return PipelineModes.SMALL - def read_headers(pipeline, pipeline_mode, input_patterns): # type: (beam.Pipeline, int, str) -> pvalue.PCollection """Creates an initial PCollection by reading the VCF file headers.""" - if pipeline_mode == PipelineModes.LARGE or len(input_patterns) > 1: + if pipeline_mode == PipelineModes.LARGE: headers = (pipeline | beam.Create(input_patterns) | vcf_header_io.ReadAllVcfHeaders()) diff --git a/gcp_variant_transforms/pipeline_common_test.py b/gcp_variant_transforms/pipeline_common_test.py index 3935b6394..12de81a0f 100644 --- a/gcp_variant_transforms/pipeline_common_test.py +++ b/gcp_variant_transforms/pipeline_common_test.py @@ -22,8 +22,17 @@ from gcp_variant_transforms import pipeline_common from gcp_variant_transforms.pipeline_common import PipelineModes +from gcp_variant_transforms.testing import temp_dir +SAMPLE_LINES = ['./gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf\n', + './gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf\n', + './gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf\n'] +WRONG_LINES = ['./gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf\n', + 'non_existent.vcf\n', + './gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf\n'] +EMPTY_LINES = [] + class PipelineCommonWithPatternTest(unittest.TestCase): """Tests cases for the `pipeline_common` script with pattern input.""" @@ -31,20 +40,26 @@ def _create_mock_args(self, **args): return collections.namedtuple('MockArgs', args.keys())(*args.values()) def _get_pipeline_mode(self, args): - input_patterns = pipeline_common._get_input_patterns(args.input_pattern, - args.input_file) + input_patterns = pipeline_common.get_input_patterns(args.input_pattern, + args.input_file) return pipeline_common.get_pipeline_mode(input_patterns, args.optimize_for_large_inputs) + def test_validation_failure_for_invalid_input_pattern(self): + with self.assertRaisesRegexp( + ValueError, 'Input pattern .* did not match any files.'): + pipeline_common.get_input_patterns( + input_pattern='nonexistent_file.vcf', input_file=None) + def test_get_mode_optimize_set(self): args = self._create_mock_args( - input_pattern='test', input_file=None, optimize_for_large_inputs=True) + input_pattern='*', input_file=None, optimize_for_large_inputs=True) self.assertEqual(self._get_pipeline_mode(args), PipelineModes.LARGE) def test_get_mode_small(self): args = self._create_mock_args( - input_pattern='test', input_file=None, optimize_for_large_inputs=False) + input_pattern='*', input_file=None, optimize_for_large_inputs=False) match_result = collections.namedtuple('MatchResult', ['metadata_list']) match = match_result([None for _ in range(100)]) @@ -53,7 +68,7 @@ def test_get_mode_small(self): def test_get_mode_medium(self): args = self._create_mock_args( - input_pattern='test', input_file=None, optimize_for_large_inputs=False) + input_pattern='*', input_file=None, optimize_for_large_inputs=False) match_result = collections.namedtuple('MatchResult', ['metadata_list']) match = match_result(range(101)) @@ -102,64 +117,69 @@ def _create_mock_args(self, **args): return collections.namedtuple('MockArgs', args.keys())(*args.values()) def _get_pipeline_mode(self, args): - input_patterns = pipeline_common._get_input_patterns(args.input_pattern, - args.input_file) + input_patterns = pipeline_common.get_input_patterns(args.input_pattern, + args.input_file) return pipeline_common.get_pipeline_mode(input_patterns, args.optimize_for_large_inputs) def test_get_mode_optimize_set(self): - args = self._create_mock_args( - input_pattern=None, - input_file='gcp_variant_transforms/testing/data/input_files/sample', - optimize_for_large_inputs=True) - - self.assertEqual(self._get_pipeline_mode(args), PipelineModes.LARGE) - - def test_get_mode_small(self): - args = self._create_mock_args( - input_pattern=None, - input_file='gcp_variant_transforms/testing/data/input_files/sample', - optimize_for_large_inputs=False) - match_result = collections.namedtuple('MatchResult', ['metadata_list']) + with temp_dir.TempDir() as tempdir: + filename = tempdir.create_temp_file(lines=SAMPLE_LINES) + args = self._create_mock_args( + input_pattern=None, + input_file=filename, + optimize_for_large_inputs=True) - match = match_result([None for _ in range(100)]) - with mock.patch.object(FileSystems, 'match', return_value=[match]): - self.assertEqual(self._get_pipeline_mode(args), PipelineModes.SMALL) - - def test_get_mode_medium(self): - args = self._create_mock_args( - input_pattern=None, - input_file='gcp_variant_transforms/testing/data/input_files/sample', - optimize_for_large_inputs=False) - match_result = collections.namedtuple('MatchResult', ['metadata_list']) + self.assertEqual(self._get_pipeline_mode(args), PipelineModes.LARGE) - match = match_result(range(101)) - with mock.patch.object(FileSystems, 'match', return_value=[match]): - self.assertEqual(self._get_pipeline_mode(args), PipelineModes.MEDIUM) + def test_get_mode_small_still_large(self): + with temp_dir.TempDir() as tempdir: + filename = tempdir.create_temp_file(lines=SAMPLE_LINES) + args = self._create_mock_args( + input_pattern=None, + input_file=filename, + optimize_for_large_inputs=False) + match_result = collections.namedtuple('MatchResult', ['metadata_list']) - matches = [match_result(range(60)), - match_result(range(40)), - match_result(range(1))] - with mock.patch.object(FileSystems, 'match', return_value=matches): - self.assertEqual(self._get_pipeline_mode(args), PipelineModes.MEDIUM) - - match = match_result(range(50000)) - with mock.patch.object(FileSystems, 'match', return_value=[match]): - self.assertEqual(self._get_pipeline_mode(args), PipelineModes.MEDIUM) + match = match_result([None for _ in range(100)]) + with mock.patch.object(FileSystems, 'match', return_value=[match]): + self.assertEqual(self._get_pipeline_mode(args), PipelineModes.LARGE) def test_get_mode_large(self): - args = self._create_mock_args( - input_pattern=None, - input_file='gcp_variant_transforms/testing/data/input_files/sample', - optimize_for_large_inputs=False) - match_result = collections.namedtuple('MatchResult', ['metadata_list']) - - match = match_result(range(50001)) - with mock.patch.object(FileSystems, 'match', return_value=[match]): - self.assertEqual(self._get_pipeline_mode(args), PipelineModes.LARGE) - - matches = [match_result(range(25000)), - match_result(range(25000)), - match_result(range(1))] - with mock.patch.object(FileSystems, 'match', return_value=matches): - self.assertEqual(self._get_pipeline_mode(args), PipelineModes.LARGE) + with temp_dir.TempDir() as tempdir: + filename = tempdir.create_temp_file(lines=SAMPLE_LINES) + args = self._create_mock_args( + input_pattern=None, + input_file=filename, + optimize_for_large_inputs=False) + match_result = collections.namedtuple('MatchResult', ['metadata_list']) + + match = match_result(range(50001)) + with mock.patch.object(FileSystems, 'match', return_value=[match]): + self.assertEqual(self._get_pipeline_mode(args), PipelineModes.LARGE) + + matches = [match_result(range(25000)), + match_result(range(25000)), + match_result(range(1))] + with mock.patch.object(FileSystems, 'match', return_value=matches): + self.assertEqual(self._get_pipeline_mode(args), PipelineModes.LARGE) + + def test_validation_failure_for_invalid_input_file(self): + with self.assertRaisesRegexp(ValueError, 'Input file .* doesn\'t exist'): + pipeline_common.get_input_patterns( + input_pattern=None, input_file='nonexistent_file.vcf') + + def test_validation_failure_for_empty_input_file(self): + with temp_dir.TempDir() as tempdir: + filename = tempdir.create_temp_file(lines=EMPTY_LINES) + with self.assertRaisesRegexp(ValueError, 'Input file .* is empty.'): + pipeline_common.get_input_patterns( + input_pattern=None, input_file=filename) + + def test_validation_failure_for_wrong_pattern_in_input_file(self): + with temp_dir.TempDir() as tempdir: + filename = tempdir.create_temp_file(lines=WRONG_LINES) + with self.assertRaisesRegexp( + ValueError, 'Input pattern .* from .* did not match any files.'): + pipeline_common.get_input_patterns( + input_pattern=None, input_file=filename) diff --git a/gcp_variant_transforms/testing/data/input_files/combine b/gcp_variant_transforms/testing/data/input_files/combine index 81a49aca0..fbc60dde7 100644 --- a/gcp_variant_transforms/testing/data/input_files/combine +++ b/gcp_variant_transforms/testing/data/input_files/combine @@ -1 +1,2 @@ -gs://gcp-variant-transforms-testfiles/small_tests/valid-4.0*.vcf +gs://tural-test-runner/vcf_files/small/valid-4.1* +gs://tural-test-runner/vcf_files/small/valid-4.2.* diff --git a/gcp_variant_transforms/testing/data/input_files/empty b/gcp_variant_transforms/testing/data/input_files/empty deleted file mode 100644 index e69de29bb..000000000 diff --git a/gcp_variant_transforms/testing/data/input_files/sample b/gcp_variant_transforms/testing/data/input_files/sample deleted file mode 100644 index accc8f26a..000000000 --- a/gcp_variant_transforms/testing/data/input_files/sample +++ /dev/null @@ -1,3 +0,0 @@ -./gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf -./gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf -./gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf diff --git a/gcp_variant_transforms/testing/data/input_files/wrong b/gcp_variant_transforms/testing/data/input_files/wrong deleted file mode 100644 index d42f357af..000000000 --- a/gcp_variant_transforms/testing/data/input_files/wrong +++ /dev/null @@ -1,3 +0,0 @@ -./gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf -non_existent.vcf -./gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf diff --git a/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/medium_tests/gnomad_genomes_GRCh38_chrX_head2500_run_vep_from_file.json b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/medium_tests/gnomad_genomes_GRCh38_chrX_head2500_run_vep_from_file.json new file mode 100644 index 000000000..4cfd78145 --- /dev/null +++ b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/medium_tests/gnomad_genomes_GRCh38_chrX_head2500_run_vep_from_file.json @@ -0,0 +1,86 @@ +[ + { + "test_name": "gnomad-genomes-grch37-chr-x-head2500-run-vep", + "table_name": "gnomad_genomes_GRCh37_chrX_head2500_run_vep", + "input_pattern": "gs://gcp-variant-transforms-testfiles/small_tests/input_files/gnomad_input", + "annotation_fields": "CSQ", + "vep_assembly": "GRCh37", + "runner": "DataflowRunner", + "run_annotation_pipeline": "True", + "annotation_output_dir": "gs://integration_test_runs/temp/vep_output/{TABLE_NAME}", + "shard_variants": "False", + "num_workers": 2, + "assertion_configs": [ + { + "query": ["NUM_ROWS_QUERY"], + "expected_result": {"num_rows": 2337} + }, + { + "query": [ + "SELECT COUNT(0) AS num_annotation_sets ", + "FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ AS CSQ" + ], + "expected_result": {"num_annotation_sets": 21167} + }, + { + "query": [ + "SELECT COUNT(0) AS num_annotation_sets ", + "FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ_VT AS CSQ_VT" + ], + "expected_result": {"num_annotation_sets": 21059} + }, + { + "query": [ + "SELECT SUM(start_position * number_of_annotations) AS hash_sum ", + "FROM ( ", + " SELECT start_position, reference_bases, A.alt, ", + " COUNT(0) AS number_of_annotations ", + " FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ AS CSQ", + " GROUP BY 1, 2, 3", + ")" + ], + "expected_result": {"hash_sum": 17184695290} + }, + { + "query": [ + "SELECT SUM(start_position * number_of_annotations) AS hash_sum ", + "FROM ( ", + " SELECT start_position, reference_bases, A.alt, ", + " COUNT(0) AS number_of_annotations ", + " FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ_VT AS CSQ_VT", + " GROUP BY 1, 2, 3", + ")" + ], + "expected_result": {"hash_sum": 17180709457} + }, + { + "query": [ + "SELECT COUNT(DISTINCT CSQ.Feature) AS num_features ", + "FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ AS CSQ" + ], + "expected_result": {"num_features": 74} + }, + { + "query": [ + "SELECT COUNT(DISTINCT CSQ_VT.Feature) AS num_features ", + "FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ_VT AS CSQ_VT" + ], + "expected_result": {"num_features": 71} + }, + { + "query": [ + "SELECT COUNT(DISTINCT CSQ.SYMBOL) AS num_symbol ", + "FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ AS CSQ" + ], + "expected_result": {"num_symbol": 12} + }, + { + "query": [ + "SELECT COUNT(DISTINCT CSQ_VT.SYMBOL) AS num_symbol ", + "FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ_VT AS CSQ_VT" + ], + "expected_result": {"num_symbol": 12} + } + ] + } +] diff --git a/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/small_tests/combine_from_multiple_inputs.json b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/small_tests/combine_from_multiple_inputs.json index d6a011e63..ac7b0ba48 100644 --- a/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/small_tests/combine_from_multiple_inputs.json +++ b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/small_tests/combine_from_multiple_inputs.json @@ -9,15 +9,15 @@ "assertion_configs": [ { "query": ["NUM_ROWS_QUERY"], - "expected_result": {"num_rows": 21} + "expected_result": {"num_rows": 19790} }, { "query": ["SELECT SUM(DP) AS sum FROM {TABLE_NAME}"], - "expected_result": {"sum": 241} + "expected_result": {"sum": 262} }, { "query": ["SELECT COUNT(DB) AS cnt FROM {TABLE_NAME}"], - "expected_result": {"cnt": 8} + "expected_result": {"cnt": 4} } ] } diff --git a/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/small_tests/merge_option_move_to_call_from_file.json b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/small_tests/merge_option_move_to_call_from_file.json deleted file mode 100644 index ddb6e9708..000000000 --- a/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/small_tests/merge_option_move_to_call_from_file.json +++ /dev/null @@ -1,31 +0,0 @@ -[ - { - "test_name": "merge-option-move-to-calls-from-file", - "table_name": "merge_option_move_to_calls_from_file", - "input_file": "gs://gcp-variant-transforms-testfiles/small_tests/input_files/merge_input", - "runner": "DirectRunner", - "zones": ["us-west1-b"], - "variant_merge_strategy": "MOVE_TO_CALLS", - "assertion_configs": [ - { - "query": ["NUM_ROWS_QUERY"], - "expected_result": {"num_rows": 4} - }, - { - "query": ["SUM_START_QUERY"], - "expected_result": {"sum_start": 1283553} - }, - { - "query": ["SUM_END_QUERY"], - "expected_result": {"sum_end": 1283560} - }, - { - "query": [ - "SELECT COUNT(0) AS num_rows FROM {TABLE_NAME} ", - "WHERE start_position = 14369" - ], - "expected_result": {"num_rows": 1} - } - ] - } -] diff --git a/gcp_variant_transforms/vcf_to_bq.py b/gcp_variant_transforms/vcf_to_bq.py index a5ba16f48..64b909178 100644 --- a/gcp_variant_transforms/vcf_to_bq.py +++ b/gcp_variant_transforms/vcf_to_bq.py @@ -93,8 +93,7 @@ def _read_variants(input_patterns, pipeline, known_args, pipeline_mode): representative_header_lines = vcf_header_parser.get_metadata_header_lines( known_args.representative_header_file) - if (pipeline_mode == pipeline_common.PipelineModes.LARGE or - len(input_patterns) > 1): + if pipeline_mode == pipeline_common.PipelineModes.LARGE: variants = (pipeline | 'InputFilePattern' >> beam.Create(input_patterns) | 'ReadAllFromVcf' >> vcfio.ReadAllFromVcf( @@ -367,8 +366,6 @@ def run(argv=None): beam_pipeline_options = pipeline_options.PipelineOptions(pipeline_args) pipeline = beam.Pipeline(options=beam_pipeline_options) variants = _read_variants(input_patterns, pipeline, known_args, pipeline_mode) - logging.info('got here') - logging.info(variants) variants |= 'FilterVariants' >> filter_variants.FilterVariants( reference_names=known_args.reference_names) if partitioner: From dcee9a8334c8745a9297bdb72e2295b670ee7bb5 Mon Sep 17 00:00:00 2001 From: Tural Neymanov Date: Tue, 19 Mar 2019 14:08:32 -0400 Subject: [PATCH 5/7] Applied the requested changes. --- .../options/variant_transform_options_test.py | 13 ++-- gcp_variant_transforms/pipeline_common.py | 62 ++++++++++--------- .../pipeline_common_test.py | 46 +++++++------- .../testing/data/input_files/combine | 5 +- .../no_conflicts_from_file.json | 2 +- ...RCh38_chrX_head2500_run_vep_from_file.json | 2 +- ...inum_NA12878_hg38_10K_lines_from_file.json | 48 -------------- gcp_variant_transforms/vcf_to_bq.py | 36 ++++++----- .../vcf_to_bq_preprocess.py | 12 ++-- 9 files changed, 89 insertions(+), 137 deletions(-) delete mode 100644 gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/small_tests/platinum_NA12878_hg38_10K_lines_from_file.json diff --git a/gcp_variant_transforms/options/variant_transform_options_test.py b/gcp_variant_transforms/options/variant_transform_options_test.py index 0001d42c4..902922b30 100644 --- a/gcp_variant_transforms/options/variant_transform_options_test.py +++ b/gcp_variant_transforms/options/variant_transform_options_test.py @@ -37,14 +37,6 @@ def make_args(options, args): assert not remaining_args return namespace -SAMPLE_LINES = ['./gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf\n', - './gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf\n', - './gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf\n'] -WRONG_LINES = ['./gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf\n', - 'non_existent.vcf\n', - './gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf\n'] -EMPTY_LINES = [] - class VcfReadOptionsTest(unittest.TestCase): """Tests cases for the VcfReadOptions class.""" @@ -76,8 +68,11 @@ def test_failure_for_conflicting_flags_no_errors_with_pattern_input(self): self._options.validate(args) def test_failure_for_conflicting_flags_no_errors_with_file_input(self): + lines = ['./gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf\n', + './gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf\n', + './gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf\n'] with temp_dir.TempDir() as tempdir: - filename = tempdir.create_temp_file(lines=SAMPLE_LINES) + filename = tempdir.create_temp_file(lines=lines) args = self._make_args([ '--input_file', filename, diff --git a/gcp_variant_transforms/pipeline_common.py b/gcp_variant_transforms/pipeline_common.py index 7c241ef79..7eb46ea05 100644 --- a/gcp_variant_transforms/pipeline_common.py +++ b/gcp_variant_transforms/pipeline_common.py @@ -67,11 +67,13 @@ def parse_args(argv, command_line_options): for transform_options in options: transform_options.validate(known_args) _raise_error_on_invalid_flags(pipeline_args) - known_args.input_patterns = get_input_patterns( + known_args.all_patterns = _get_all_patterns( known_args.input_pattern, known_args.input_file) return known_args, pipeline_args -def get_input_patterns(input_pattern, input_file): + +def _get_all_patterns(input_pattern, input_file): + # type: (str, str) -> List[str] patterns = [input_pattern] if input_pattern else _get_file_names(input_file) # Validate inputs. @@ -79,13 +81,14 @@ def get_input_patterns(input_pattern, input_file): # Gets at most 1 pattern match result of type `filesystems.MatchResult`. matches = filesystems.FileSystems.match(patterns, [1] * len(patterns)) for match in matches: - if input_file and not match.metadata_list: - raise ValueError( - 'Input pattern {} from {} did not match any files.'.format( - match.pattern, input_file)) - elif not match.metadata_list: - raise ValueError( - 'Input pattern {} did not match any files.'.format(match.pattern)) + if not match.metadata_list: + if input_file: + raise ValueError( + 'Input pattern {} from {} did not match any files.'.format( + match.pattern, input_file)) + else: + raise ValueError( + 'Input pattern {} did not match any files.'.format(match.pattern)) except filesystem.BeamIOError: if input_file: raise ValueError( @@ -96,48 +99,47 @@ def get_input_patterns(input_pattern, input_file): input_pattern)) return patterns + def _get_file_names(input_file): - # type (str) -> List(str) - """ Reads all input file and extracts list of patterns out of it.""" - result = [] + # type (str) -> List[str] + """Reads the input file and extracts list of patterns out of it.""" if not filesystems.FileSystems.exists(input_file): raise ValueError('Input file {} doesn\'t exist'.format(input_file)) with filesystems.FileSystems.open(input_file) as f: - for _, l in enumerate(f): - result.append(l.strip()) - if not result: + contents = map(str.strip, f.readlines()) + if not contents: raise ValueError('Input file {} is empty.'.format(input_file)) - return result + return contents + -def get_pipeline_mode( - input_patterns, - optimize_for_large_inputs=False): - # type: (str, bool) -> int +def get_pipeline_mode(all_patterns, optimize_for_large_inputs=False): + # type: (List[str], bool) -> int """Returns the mode the pipeline should operate in based on input size.""" - if optimize_for_large_inputs or len(input_patterns) > 1: + if optimize_for_large_inputs or len(all_patterns) > 1: return PipelineModes.LARGE - match_results = filesystems.FileSystems.match(input_patterns) - - total_files = 0 - for match in match_results: - total_files += len(match.metadata_list) + match_results = filesystems.FileSystems.match(all_patterns) + if not match_results: + raise ValueError( + 'No files matched input_pattern: {}'.format(all_patterns[0])) + total_files = len(match_results[0].metadata_list) if total_files > _LARGE_DATA_THRESHOLD: return PipelineModes.LARGE elif total_files > _SMALL_DATA_THRESHOLD: return PipelineModes.MEDIUM return PipelineModes.SMALL -def read_headers(pipeline, pipeline_mode, input_patterns): - # type: (beam.Pipeline, int, str) -> pvalue.PCollection + +def read_headers(pipeline, pipeline_mode, all_patterns): + # type: (beam.Pipeline, int, List[str]) -> pvalue.PCollection """Creates an initial PCollection by reading the VCF file headers.""" if pipeline_mode == PipelineModes.LARGE: headers = (pipeline - | beam.Create(input_patterns) + | beam.Create(all_patterns) | vcf_header_io.ReadAllVcfHeaders()) else: - headers = pipeline | vcf_header_io.ReadVcfHeaders(input_patterns[0]) + headers = pipeline | vcf_header_io.ReadVcfHeaders(all_patterns[0]) return headers diff --git a/gcp_variant_transforms/pipeline_common_test.py b/gcp_variant_transforms/pipeline_common_test.py index 12de81a0f..1df354386 100644 --- a/gcp_variant_transforms/pipeline_common_test.py +++ b/gcp_variant_transforms/pipeline_common_test.py @@ -25,14 +25,6 @@ from gcp_variant_transforms.testing import temp_dir -SAMPLE_LINES = ['./gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf\n', - './gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf\n', - './gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf\n'] -WRONG_LINES = ['./gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf\n', - 'non_existent.vcf\n', - './gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf\n'] -EMPTY_LINES = [] - class PipelineCommonWithPatternTest(unittest.TestCase): """Tests cases for the `pipeline_common` script with pattern input.""" @@ -40,15 +32,15 @@ def _create_mock_args(self, **args): return collections.namedtuple('MockArgs', args.keys())(*args.values()) def _get_pipeline_mode(self, args): - input_patterns = pipeline_common.get_input_patterns(args.input_pattern, - args.input_file) - return pipeline_common.get_pipeline_mode(input_patterns, + all_patterns = pipeline_common._get_all_patterns(args.input_pattern, + args.input_file) + return pipeline_common.get_pipeline_mode(all_patterns, args.optimize_for_large_inputs) def test_validation_failure_for_invalid_input_pattern(self): with self.assertRaisesRegexp( ValueError, 'Input pattern .* did not match any files.'): - pipeline_common.get_input_patterns( + pipeline_common._get_all_patterns( input_pattern='nonexistent_file.vcf', input_file=None) def test_get_mode_optimize_set(self): @@ -113,18 +105,23 @@ def test_fail_on_invalid_flags(self): class PipelineCommonWithFileTest(unittest.TestCase): """Tests cases for the `pipeline_common` script with file input.""" + SAMPLE_LINES = ['./gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf\n', + './gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf\n', + './gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf\n'] + + def _create_mock_args(self, **args): return collections.namedtuple('MockArgs', args.keys())(*args.values()) def _get_pipeline_mode(self, args): - input_patterns = pipeline_common.get_input_patterns(args.input_pattern, - args.input_file) - return pipeline_common.get_pipeline_mode(input_patterns, + all_patterns = pipeline_common._get_all_patterns(args.input_pattern, + args.input_file) + return pipeline_common.get_pipeline_mode(all_patterns, args.optimize_for_large_inputs) def test_get_mode_optimize_set(self): with temp_dir.TempDir() as tempdir: - filename = tempdir.create_temp_file(lines=SAMPLE_LINES) + filename = tempdir.create_temp_file(lines=self.SAMPLE_LINES) args = self._create_mock_args( input_pattern=None, input_file=filename, @@ -134,7 +131,7 @@ def test_get_mode_optimize_set(self): def test_get_mode_small_still_large(self): with temp_dir.TempDir() as tempdir: - filename = tempdir.create_temp_file(lines=SAMPLE_LINES) + filename = tempdir.create_temp_file(lines=self.SAMPLE_LINES) args = self._create_mock_args( input_pattern=None, input_file=filename, @@ -147,7 +144,7 @@ def test_get_mode_small_still_large(self): def test_get_mode_large(self): with temp_dir.TempDir() as tempdir: - filename = tempdir.create_temp_file(lines=SAMPLE_LINES) + filename = tempdir.create_temp_file(lines=self.SAMPLE_LINES) args = self._create_mock_args( input_pattern=None, input_file=filename, @@ -166,20 +163,23 @@ def test_get_mode_large(self): def test_validation_failure_for_invalid_input_file(self): with self.assertRaisesRegexp(ValueError, 'Input file .* doesn\'t exist'): - pipeline_common.get_input_patterns( + pipeline_common._get_all_patterns( input_pattern=None, input_file='nonexistent_file.vcf') def test_validation_failure_for_empty_input_file(self): with temp_dir.TempDir() as tempdir: - filename = tempdir.create_temp_file(lines=EMPTY_LINES) + filename = tempdir.create_temp_file(lines=[]) with self.assertRaisesRegexp(ValueError, 'Input file .* is empty.'): - pipeline_common.get_input_patterns( + pipeline_common._get_all_patterns( input_pattern=None, input_file=filename) def test_validation_failure_for_wrong_pattern_in_input_file(self): + lines = ['./gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf\n', + 'non_existent.vcf\n', + './gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf\n'] with temp_dir.TempDir() as tempdir: - filename = tempdir.create_temp_file(lines=WRONG_LINES) + filename = tempdir.create_temp_file(lines=lines) with self.assertRaisesRegexp( ValueError, 'Input pattern .* from .* did not match any files.'): - pipeline_common.get_input_patterns( + pipeline_common._get_all_patterns( input_pattern=None, input_file=filename) diff --git a/gcp_variant_transforms/testing/data/input_files/combine b/gcp_variant_transforms/testing/data/input_files/combine index fbc60dde7..14fa2f7e4 100644 --- a/gcp_variant_transforms/testing/data/input_files/combine +++ b/gcp_variant_transforms/testing/data/input_files/combine @@ -1,2 +1,3 @@ -gs://tural-test-runner/vcf_files/small/valid-4.1* -gs://tural-test-runner/vcf_files/small/valid-4.2.* +gs://gcp-variant-transforms-testfiles/small_tests/valid-4.1* +gs://gcp-variant-transforms-testfiles/small_tests/valid-4.2.* + diff --git a/gcp_variant_transforms/testing/integration/preprocessor_tests/no_conflicts_from_file.json b/gcp_variant_transforms/testing/integration/preprocessor_tests/no_conflicts_from_file.json index 5c3d2d74f..6e58f6905 100644 --- a/gcp_variant_transforms/testing/integration/preprocessor_tests/no_conflicts_from_file.json +++ b/gcp_variant_transforms/testing/integration/preprocessor_tests/no_conflicts_from_file.json @@ -1,7 +1,7 @@ [ { "test_name": "no-conflicts", - "input_pattern": "gs://gcp-variant-transforms-testfiles/small_tests/input_files/combine_input", + "input_file": "gs://gcp-variant-transforms-testfiles/small_tests/input_files/combine_input", "report_blob_name": "temp/report_no_conflicts.tsv", "runner": "DirectRunner", "zones": ["us-west1-b"], diff --git a/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/medium_tests/gnomad_genomes_GRCh38_chrX_head2500_run_vep_from_file.json b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/medium_tests/gnomad_genomes_GRCh38_chrX_head2500_run_vep_from_file.json index 4cfd78145..344462a2a 100644 --- a/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/medium_tests/gnomad_genomes_GRCh38_chrX_head2500_run_vep_from_file.json +++ b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/medium_tests/gnomad_genomes_GRCh38_chrX_head2500_run_vep_from_file.json @@ -2,7 +2,7 @@ { "test_name": "gnomad-genomes-grch37-chr-x-head2500-run-vep", "table_name": "gnomad_genomes_GRCh37_chrX_head2500_run_vep", - "input_pattern": "gs://gcp-variant-transforms-testfiles/small_tests/input_files/gnomad_input", + "input_file": "gs://gcp-variant-transforms-testfiles/small_tests/input_files/gnomad_input", "annotation_fields": "CSQ", "vep_assembly": "GRCh37", "runner": "DataflowRunner", diff --git a/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/small_tests/platinum_NA12878_hg38_10K_lines_from_file.json b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/small_tests/platinum_NA12878_hg38_10K_lines_from_file.json deleted file mode 100644 index 32c860fa1..000000000 --- a/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/small_tests/platinum_NA12878_hg38_10K_lines_from_file.json +++ /dev/null @@ -1,48 +0,0 @@ -[ - { - "test_name": "platinum-na12877-hg38-10k-lines-from-file", - "table_name": "platinum_NA12877_hg38_10K_lines_from_file", - "input_file": "gs://gcp-variant-transforms-testfiles/small_tests/input_files/platinum_input", - "annotation_fields": "CSQ", - "runner": "DataflowRunner", - "assertion_configs": [ - { - "query": ["NUM_ROWS_QUERY"], - "expected_result": {"num_rows": 9953} - }, - { - "query": [ - "SELECT COUNT(0) AS num_annotation_sets ", - "FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ AS CSQ" - ], - "expected_result": {"num_annotation_sets": 45770} - }, - { - "query": [ - "SELECT SUM(start_position * number_of_annotations) AS hash_sum ", - "FROM ( ", - " SELECT start_position, reference_bases, A.alt, ", - " COUNT(0) AS number_of_annotations ", - " FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ AS CSQ", - " GROUP BY 1, 2, 3", - ")" - ], - "expected_result": {"hash_sum": 143375297338} - }, - { - "query": [ - "SELECT COUNT(DISTINCT CSQ.Feature) AS num_features ", - "FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ AS CSQ" - ], - "expected_result": {"num_features": 1575} - }, - { - "query": [ - "SELECT COUNT(DISTINCT CSQ.SYMBOL) AS num_symbol ", - "FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ AS CSQ" - ], - "expected_result": {"num_symbol": 206} - } - ] - } -] diff --git a/gcp_variant_transforms/vcf_to_bq.py b/gcp_variant_transforms/vcf_to_bq.py index 64b909178..b0c9b3c73 100644 --- a/gcp_variant_transforms/vcf_to_bq.py +++ b/gcp_variant_transforms/vcf_to_bq.py @@ -85,7 +85,7 @@ _SHARDS_FOLDER = 'shards' -def _read_variants(input_patterns, pipeline, known_args, pipeline_mode): +def _read_variants(all_patterns, pipeline, known_args, pipeline_mode): # type: (str, beam.Pipeline, argparse.Namespace, int) -> pvalue.PCollection """Helper method for returning a PCollection of Variants from VCFs.""" representative_header_lines = None @@ -95,14 +95,14 @@ def _read_variants(input_patterns, pipeline, known_args, pipeline_mode): if pipeline_mode == pipeline_common.PipelineModes.LARGE: variants = (pipeline - | 'InputFilePattern' >> beam.Create(input_patterns) + | 'InputFilePattern' >> beam.Create(all_patterns) | 'ReadAllFromVcf' >> vcfio.ReadAllFromVcf( representative_header_lines=representative_header_lines, allow_malformed_records=( known_args.allow_malformed_records))) else: variants = pipeline | 'ReadFromVcf' >> vcfio.ReadFromVcf( - input_patterns[0], + all_patterns[0], representative_header_lines=representative_header_lines, allow_malformed_records=known_args.allow_malformed_records, vcf_parser_type=vcfio.VcfParserType[known_args.vcf_parser]) @@ -132,7 +132,7 @@ def _get_variant_merge_strategy(known_args # type: argparse.Namespace raise ValueError('Merge strategy is not supported.') -def _add_inferred_headers(input_patterns, # type: str +def _add_inferred_headers(all_patterns, # type: str pipeline, # type: beam.Pipeline known_args, # type: argparse.Namespace merged_header, # type: pvalue.PCollection @@ -142,7 +142,7 @@ def _add_inferred_headers(input_patterns, # type: str annotation_fields_to_infer = (known_args.annotation_fields if known_args.infer_annotation_types else []) inferred_headers = ( - _read_variants(input_patterns, + _read_variants(all_patterns, pipeline, known_args, pipeline_mode) @@ -163,7 +163,7 @@ def _add_inferred_headers(input_patterns, # type: str def _shard_variants(known_args, pipeline_args, pipeline_mode): - # type: (argparse.Namespace, List[str], int) -> str + # type: (argparse.Namespace, List[str], int) -> List[str] """Reads the variants and writes them to VCF shards. Returns: @@ -176,9 +176,9 @@ def _shard_variants(known_args, pipeline_args, pipeline_mode): _update_google_cloud_job_name(google_cloud_options, shard_files_job_name) vcf_shards_output_dir = filesystems.FileSystems.join( known_args.annotation_output_dir, _SHARDS_FOLDER) - input_patterns = known_args.input_patterns with beam.Pipeline(options=options) as p: - variants = _read_variants(input_patterns, p, known_args, pipeline_mode) + variants = _read_variants( + known_args.all_patterns, p, known_args, pipeline_mode) call_names = (variants | 'CombineCallNames' >> combine_call_names.CallNamesCombiner()) @@ -193,7 +193,7 @@ def _shard_variants(known_args, pipeline_args, pipeline_mode): return [vep_runner_util.format_dir_path(vcf_shards_output_dir)] -def _annotate_vcf_files(input_patterns, known_args, pipeline_args): +def _annotate_vcf_files(all_patterns, known_args, pipeline_args): # type: (str, argparse.Namespace, List[str]) -> str """Annotates the VCF files using VEP. @@ -208,7 +208,7 @@ def _annotate_vcf_files(input_patterns, known_args, pipeline_args): with beam.Pipeline(options=options) as p: _ = (p - | beam.Create(input_patterns) + | beam.Create(all_patterns) | 'AnnotateShards' >> beam.ParDo( annotate_files.AnnotateFile(known_args, pipeline_args))) if known_args.annotation_fields: @@ -239,7 +239,6 @@ def _merge_headers(known_args, pipeline_args, """Merges VCF headers using beam based on pipeline_mode.""" if known_args.representative_header_file: return - input_patterns = known_args.input_patterns options = pipeline_options.PipelineOptions(pipeline_args) # Always run pipeline locally if data is small. @@ -262,7 +261,8 @@ def _merge_headers(known_args, pipeline_args, temp_directory, temp_merged_headers_file_name) with beam.Pipeline(options=options) as p: - headers = pipeline_common.read_headers(p, pipeline_mode, input_patterns) + headers = pipeline_common.read_headers( + p, pipeline_mode, known_args.all_patterns) merged_header = pipeline_common.get_merged_headers( headers, known_args.split_alternate_allele_info_fields, @@ -274,7 +274,7 @@ def _merge_headers(known_args, pipeline_args, if known_args.infer_headers or known_args.infer_annotation_types: infer_headers_input_pattern = ( [annotated_vcf_pattern] if - annotated_vcf_pattern else known_args.input_patterns) + annotated_vcf_pattern else known_args.all_patterns) merged_header = _add_inferred_headers(infer_headers_input_pattern, p, known_args, merged_header, pipeline_mode) @@ -305,7 +305,7 @@ def _run_annotation_pipeline(known_args, pipeline_args): _validate_annotation_pipeline_args(known_args, pipeline_args) known_args.omit_empty_sample_calls = True - files_to_be_annotated = known_args.input_patterns + files_to_be_annotated = known_args.all_patterns if known_args.shard_variants: pipeline_mode = pipeline_common.get_pipeline_mode( files_to_be_annotated, @@ -328,12 +328,14 @@ def run(argv=None): annotated_vcf_pattern = _run_annotation_pipeline(known_args, pipeline_args) - input_patterns = known_args.input_patterns + all_patterns = ( + [annotated_vcf_pattern] if annotated_vcf_pattern + else known_args.all_patterns) variant_merger = _get_variant_merge_strategy(known_args) pipeline_mode = pipeline_common.get_pipeline_mode( - input_patterns, + all_patterns, known_args.optimize_for_large_inputs) # Starts a pipeline to merge VCF headers in beam if the total files that # match the input pattern exceeds _SMALL_DATA_THRESHOLD @@ -365,7 +367,7 @@ def run(argv=None): beam_pipeline_options = pipeline_options.PipelineOptions(pipeline_args) pipeline = beam.Pipeline(options=beam_pipeline_options) - variants = _read_variants(input_patterns, pipeline, known_args, pipeline_mode) + variants = _read_variants(all_patterns, pipeline, known_args, pipeline_mode) variants |= 'FilterVariants' >> filter_variants.FilterVariants( reference_names=known_args.reference_names) if partitioner: diff --git a/gcp_variant_transforms/vcf_to_bq_preprocess.py b/gcp_variant_transforms/vcf_to_bq_preprocess.py index 9ed0f7698..05c644ebe 100644 --- a/gcp_variant_transforms/vcf_to_bq_preprocess.py +++ b/gcp_variant_transforms/vcf_to_bq_preprocess.py @@ -94,22 +94,22 @@ def run(argv=None): known_args, pipeline_args = pipeline_common.parse_args(argv, _COMMAND_LINE_OPTIONS) options = pipeline_options.PipelineOptions(pipeline_args) - input_patterns = known_args.input_patterns - pipeline_mode = pipeline_common.get_pipeline_mode(input_patterns) + all_patterns = known_args.all_patterns + pipeline_mode = pipeline_common.get_pipeline_mode(all_patterns) with beam.Pipeline(options=options) as p: - headers = pipeline_common.read_headers(p, pipeline_mode, input_patterns) + headers = pipeline_common.read_headers(p, pipeline_mode, all_patterns) merged_headers = pipeline_common.get_merged_headers(headers) merged_definitions = (headers | 'MergeDefinitions' >> merge_header_definitions.MergeDefinitions()) if known_args.report_all_conflicts: - if len(input_patterns) == 1: + if len(all_patterns) == 1: variants = p | 'ReadFromVcf' >> vcfio.ReadFromVcf( - input_patterns[0], allow_malformed_records=True) + all_patterns[0], allow_malformed_records=True) else: variants = (p - | 'InputFilePattern' >> beam.Create(input_patterns) + | 'InputFilePattern' >> beam.Create(all_patterns) | 'ReadAllFromVcf' >> vcfio.ReadAllFromVcf( allow_malformed_records=True)) From 83d6ccf03d73fde6c816bed2415f98d53a67f807 Mon Sep 17 00:00:00 2001 From: Tural Neymanov Date: Wed, 20 Mar 2019 14:21:06 -0400 Subject: [PATCH 6/7] Applied the requested changes. - type definitions - preprocessor integration test --- .../options/variant_transform_options.py | 1 - .../options/variant_transform_options_test.py | 1 + gcp_variant_transforms/pipeline_common.py | 2 +- .../input_files/{combine => combine_input} | 0 .../testing/data/input_files/error_input | 2 ++ .../header_conflicts_from_file.json | 19 +++++++++++++++++++ .../no_conflicts_from_file.json | 14 -------------- ...RCh38_chrX_head2500_run_vep_from_file.json | 6 +++--- gcp_variant_transforms/vcf_to_bq.py | 12 ++++++++---- 9 files changed, 34 insertions(+), 23 deletions(-) rename gcp_variant_transforms/testing/data/input_files/{combine => combine_input} (100%) create mode 100644 gcp_variant_transforms/testing/data/input_files/error_input create mode 100644 gcp_variant_transforms/testing/integration/preprocessor_tests/header_conflicts_from_file.json delete mode 100644 gcp_variant_transforms/testing/integration/preprocessor_tests/no_conflicts_from_file.json diff --git a/gcp_variant_transforms/options/variant_transform_options.py b/gcp_variant_transforms/options/variant_transform_options.py index 2d90098b2..473ef6348 100644 --- a/gcp_variant_transforms/options/variant_transform_options.py +++ b/gcp_variant_transforms/options/variant_transform_options.py @@ -112,7 +112,6 @@ def add_arguments(self, parser): 'is not recommended.'.format(vcfio.VcfParserType.PYVCF.name, vcfio.VcfParserType.NUCLEUS.name))) - def validate(self, parsed_args): # type: (argparse.Namespace) -> None if parsed_args.infer_headers and parsed_args.representative_header_file: diff --git a/gcp_variant_transforms/options/variant_transform_options_test.py b/gcp_variant_transforms/options/variant_transform_options_test.py index 902922b30..410484305 100644 --- a/gcp_variant_transforms/options/variant_transform_options_test.py +++ b/gcp_variant_transforms/options/variant_transform_options_test.py @@ -37,6 +37,7 @@ def make_args(options, args): assert not remaining_args return namespace + class VcfReadOptionsTest(unittest.TestCase): """Tests cases for the VcfReadOptions class.""" diff --git a/gcp_variant_transforms/pipeline_common.py b/gcp_variant_transforms/pipeline_common.py index 7eb46ea05..03fee18d0 100644 --- a/gcp_variant_transforms/pipeline_common.py +++ b/gcp_variant_transforms/pipeline_common.py @@ -101,7 +101,7 @@ def _get_all_patterns(input_pattern, input_file): def _get_file_names(input_file): - # type (str) -> List[str] + # type: (str) -> List[str] """Reads the input file and extracts list of patterns out of it.""" if not filesystems.FileSystems.exists(input_file): raise ValueError('Input file {} doesn\'t exist'.format(input_file)) diff --git a/gcp_variant_transforms/testing/data/input_files/combine b/gcp_variant_transforms/testing/data/input_files/combine_input similarity index 100% rename from gcp_variant_transforms/testing/data/input_files/combine rename to gcp_variant_transforms/testing/data/input_files/combine_input diff --git a/gcp_variant_transforms/testing/data/input_files/error_input b/gcp_variant_transforms/testing/data/input_files/error_input new file mode 100644 index 000000000..78426bd2f --- /dev/null +++ b/gcp_variant_transforms/testing/data/input_files/error_input @@ -0,0 +1,2 @@ +gs://gcp-variant-transforms-testfiles/small_tests/valid-4.0.vcf +gs://gcp-variant-transforms-testfiles/small_tests/valid-4.2.vcf diff --git a/gcp_variant_transforms/testing/integration/preprocessor_tests/header_conflicts_from_file.json b/gcp_variant_transforms/testing/integration/preprocessor_tests/header_conflicts_from_file.json new file mode 100644 index 000000000..de8971658 --- /dev/null +++ b/gcp_variant_transforms/testing/integration/preprocessor_tests/header_conflicts_from_file.json @@ -0,0 +1,19 @@ +[ + { + "test_name": "header-conflicts-from-file", + "input_file": "gs://gcp-variant-transforms-testfiles/small_tests/input_files/error_input", + "report_blob_name": "temp/report_no_conflicts.tsv", + "runner": "DirectRunner", + "zones": ["us-west1-b"], + "expected_contents": [ + "Header Conflicts", + "ID\tCategory\tConflicts\tFile\tPaths\tProposed Resolution", + "AF\tINFO\tnum=None type=Float\tgs://gcp-variant-transforms-testfiles/small_tests/valid-4.0.vcf\tnum=None type=Float", + " \t \tnum=-1 type=Float\tgs://gcp-variant-transforms-testfiles/small_tests/valid-4.2.vcf\t ", + "", + "No Inferred Headers Found.", + "", + "No Malformed Records Found." + ] + } +] diff --git a/gcp_variant_transforms/testing/integration/preprocessor_tests/no_conflicts_from_file.json b/gcp_variant_transforms/testing/integration/preprocessor_tests/no_conflicts_from_file.json deleted file mode 100644 index 6e58f6905..000000000 --- a/gcp_variant_transforms/testing/integration/preprocessor_tests/no_conflicts_from_file.json +++ /dev/null @@ -1,14 +0,0 @@ -[ - { - "test_name": "no-conflicts", - "input_file": "gs://gcp-variant-transforms-testfiles/small_tests/input_files/combine_input", - "report_blob_name": "temp/report_no_conflicts.tsv", - "runner": "DirectRunner", - "zones": ["us-west1-b"], - "expected_contents": [ - "No Header Conflicts Found.", - "", - "" - ] - } -] diff --git a/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/medium_tests/gnomad_genomes_GRCh38_chrX_head2500_run_vep_from_file.json b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/medium_tests/gnomad_genomes_GRCh38_chrX_head2500_run_vep_from_file.json index 344462a2a..efe0f5721 100644 --- a/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/medium_tests/gnomad_genomes_GRCh38_chrX_head2500_run_vep_from_file.json +++ b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/medium_tests/gnomad_genomes_GRCh38_chrX_head2500_run_vep_from_file.json @@ -1,14 +1,14 @@ [ { - "test_name": "gnomad-genomes-grch37-chr-x-head2500-run-vep", - "table_name": "gnomad_genomes_GRCh37_chrX_head2500_run_vep", + "test_name": "gnomad-genomes-grch37-chr-x-head2500-run-vep-from-file", + "table_name": "gnomad_genomes_GRCh37_chrX_head2500_run_vep_from_file", "input_file": "gs://gcp-variant-transforms-testfiles/small_tests/input_files/gnomad_input", "annotation_fields": "CSQ", "vep_assembly": "GRCh37", "runner": "DataflowRunner", "run_annotation_pipeline": "True", "annotation_output_dir": "gs://integration_test_runs/temp/vep_output/{TABLE_NAME}", - "shard_variants": "False", + "shard_variants": "True", "num_workers": 2, "assertion_configs": [ { diff --git a/gcp_variant_transforms/vcf_to_bq.py b/gcp_variant_transforms/vcf_to_bq.py index b0c9b3c73..8a5e2fb80 100644 --- a/gcp_variant_transforms/vcf_to_bq.py +++ b/gcp_variant_transforms/vcf_to_bq.py @@ -85,8 +85,12 @@ _SHARDS_FOLDER = 'shards' -def _read_variants(all_patterns, pipeline, known_args, pipeline_mode): - # type: (str, beam.Pipeline, argparse.Namespace, int) -> pvalue.PCollection +def _read_variants(all_patterns, # type: List[str] + pipeline, # type: beam.Pipeline + known_args, # type: argparse.Namespace + pipeline_mode # type: int + ): + # type: (...) -> pvalue.PCollection """Helper method for returning a PCollection of Variants from VCFs.""" representative_header_lines = None if known_args.representative_header_file: @@ -132,7 +136,7 @@ def _get_variant_merge_strategy(known_args # type: argparse.Namespace raise ValueError('Merge strategy is not supported.') -def _add_inferred_headers(all_patterns, # type: str +def _add_inferred_headers(all_patterns, # type: List[str] pipeline, # type: beam.Pipeline known_args, # type: argparse.Namespace merged_header, # type: pvalue.PCollection @@ -194,7 +198,7 @@ def _shard_variants(known_args, pipeline_args, pipeline_mode): def _annotate_vcf_files(all_patterns, known_args, pipeline_args): - # type: (str, argparse.Namespace, List[str]) -> str + # type: (List[str], argparse.Namespace, List[str]) -> str """Annotates the VCF files using VEP. Returns: From f01ff8a4c0df6379c4d93332e9263fdda0e8d288 Mon Sep 17 00:00:00 2001 From: Tural Neymanov Date: Fri, 22 Mar 2019 11:20:29 -0400 Subject: [PATCH 7/7] Add integration tests for annotation pipeline with and without sharding. --- .../testing/data/input_files/combine_input | 1 - ...tion_pipeline_from_file_with_sharding.json | 53 +++++++++++++++++++ ...n_pipeline_from_file_without_sharding.json | 53 +++++++++++++++++++ 3 files changed, 106 insertions(+), 1 deletion(-) create mode 100644 gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/medium_tests/test_annotation_pipeline_from_file_with_sharding.json create mode 100644 gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/medium_tests/test_annotation_pipeline_from_file_without_sharding.json diff --git a/gcp_variant_transforms/testing/data/input_files/combine_input b/gcp_variant_transforms/testing/data/input_files/combine_input index 14fa2f7e4..79fcca073 100644 --- a/gcp_variant_transforms/testing/data/input_files/combine_input +++ b/gcp_variant_transforms/testing/data/input_files/combine_input @@ -1,3 +1,2 @@ gs://gcp-variant-transforms-testfiles/small_tests/valid-4.1* gs://gcp-variant-transforms-testfiles/small_tests/valid-4.2.* - diff --git a/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/medium_tests/test_annotation_pipeline_from_file_with_sharding.json b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/medium_tests/test_annotation_pipeline_from_file_with_sharding.json new file mode 100644 index 000000000..6eecca17f --- /dev/null +++ b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/medium_tests/test_annotation_pipeline_from_file_with_sharding.json @@ -0,0 +1,53 @@ +[ + { + "test_name": "test-annotation-pipeline-from-file-with-sharding", + "table_name": "test_annotation_pipeline_from_file_with_sharding", + "input_file": "gs://gcp-variant-transforms-testfiles/small_tests/input_files/combine_input", + "vep_assembly": "GRCh37", + "runner": "DataflowRunner", + "run_annotation_pipeline": "True", + "annotation_output_dir": "gs://integration_test_runs/temp/vep_output/{TABLE_NAME}", + "shard_variants": "True", + "infer_headers": "True", + "num_workers": 2, + "assertion_configs": [ + { + "query": ["NUM_ROWS_QUERY"], + "expected_result": {"num_rows": 19778} + }, + { + "query": [ + "SELECT COUNT(0) AS num_annotation_sets ", + "FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ_VT AS CSQ_VT" + ], + "expected_result": {"num_annotation_sets": 16898} + }, + { + "query": [ + "SELECT SUM(start_position * number_of_annotations) AS hash_sum ", + "FROM ( ", + " SELECT start_position, reference_bases, A.alt, ", + " COUNT(0) AS number_of_annotations ", + " FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ_VT AS CSQ_VT", + " GROUP BY 1, 2, 3", + ")" + ], + "expected_result": {"hash_sum": 13631192958} + }, + { + "query": [ + "SELECT COUNT(DISTINCT CSQ_VT.Feature) AS num_features ", + "FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ_VT AS CSQ_VT" + ], + "expected_result": {"num_features": 417} + }, + { + "query": [ + "SELECT COUNT(DISTINCT CSQ_VT.SYMBOL) AS num_symbol ", + "FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ_VT AS CSQ_VT" + ], + "expected_result": {"num_symbol": 92} + } + ] + } +] diff --git a/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/medium_tests/test_annotation_pipeline_from_file_without_sharding.json b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/medium_tests/test_annotation_pipeline_from_file_without_sharding.json new file mode 100644 index 000000000..fde8e58eb --- /dev/null +++ b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/medium_tests/test_annotation_pipeline_from_file_without_sharding.json @@ -0,0 +1,53 @@ +[ + { + "test_name": "test-annotation-pipeline-from-file-without-sharding", + "table_name": "test_annotation_pipeline_from_file_without_sharding", + "input_file": "gs://gcp-variant-transforms-testfiles/small_tests/input_files/combine_input", + "vep_assembly": "GRCh37", + "runner": "DataflowRunner", + "run_annotation_pipeline": "True", + "annotation_output_dir": "gs://integration_test_runs/temp/vep_output/{TABLE_NAME}", + "shard_variants": "False", + "infer_headers": "True", + "num_workers": 2, + "assertion_configs": [ + { + "query": ["NUM_ROWS_QUERY"], + "expected_result": {"num_rows": 19778} + }, + { + "query": [ + "SELECT COUNT(0) AS num_annotation_sets ", + "FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ_VT AS CSQ_VT" + ], + "expected_result": {"num_annotation_sets": 16898} + }, + { + "query": [ + "SELECT SUM(start_position * number_of_annotations) AS hash_sum ", + "FROM ( ", + " SELECT start_position, reference_bases, A.alt, ", + " COUNT(0) AS number_of_annotations ", + " FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ_VT AS CSQ_VT", + " GROUP BY 1, 2, 3", + ")" + ], + "expected_result": {"hash_sum": 13631192958} + }, + { + "query": [ + "SELECT COUNT(DISTINCT CSQ_VT.Feature) AS num_features ", + "FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ_VT AS CSQ_VT" + ], + "expected_result": {"num_features": 417} + }, + { + "query": [ + "SELECT COUNT(DISTINCT CSQ_VT.SYMBOL) AS num_symbol ", + "FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ_VT AS CSQ_VT" + ], + "expected_result": {"num_symbol": 92} + } + ] + } +]