diff --git a/gcp_variant_transforms/options/variant_transform_options.py b/gcp_variant_transforms/options/variant_transform_options.py index fd17b8d92..473ef6348 100644 --- a/gcp_variant_transforms/options/variant_transform_options.py +++ b/gcp_variant_transforms/options/variant_transform_options.py @@ -17,8 +17,6 @@ import argparse # pylint: disable=unused-import import re -from apache_beam.io import filesystem -from apache_beam.io import filesystems from apache_beam.io.gcp.internal.clients import bigquery from apitools.base.py import exceptions from oauth2client.client import GoogleCredentials @@ -53,9 +51,16 @@ class VcfReadOptions(VariantTransformsOptions): def add_arguments(self, parser): """Adds all options of this transform to parser.""" - parser.add_argument('--input_pattern', - required=True, - help='Input pattern for VCF files to process.') + parser.add_argument( + '--input_pattern', + help=('Input pattern for VCF files to process. Either' + 'this or --input_file flag has to be provided, exclusively.')) + parser.add_argument( + '--input_file', + help=('File that contains the list of VCF file names to input. Either ' + 'this or --input_pattern flag has to be provided, exclusively.' + 'Note that using input_file rather than input_pattern is slower ' + 'for inputs that contain less than 50k files.')) parser.add_argument( '--allow_malformed_records', type='bool', default=False, nargs='?', const=True, @@ -113,16 +118,7 @@ def validate(self, parsed_args): raise ValueError('Both --infer_headers and --representative_header_file ' 'are passed! Please double check and choose at most one ' 'of them.') - try: - # Gets at most one pattern match result of type `filesystems.MatchResult`. - first_match = filesystems.FileSystems.match( - [parsed_args.input_pattern], [1])[0] - if not first_match.metadata_list: - raise ValueError('Input pattern {} did not match any files.'.format( - parsed_args.input_pattern)) - except filesystem.BeamIOError: - raise ValueError('Invalid or inaccessible input pattern {}.'.format( - parsed_args.input_pattern)) + _validate_inputs(parsed_args) class AvroWriteOptions(VariantTransformsOptions): @@ -477,9 +473,16 @@ class PreprocessOptions(VariantTransformsOptions): def add_arguments(self, parser): # type: (argparse.ArgumentParser) -> None - parser.add_argument('--input_pattern', - required=True, - help='Input pattern for VCF files to process.') + parser.add_argument( + '--input_pattern', + help='Input pattern for VCF files to process. Either' + 'this or --input_file flag has to be provided, exclusively.') + parser.add_argument( + '--input_file', + help=('File that contains the list of VCF file names to input. Either ' + 'this or --input_pattern flag has to be provided, exlusively. ' + 'Note that using input_file than input_pattern is slower for ' + 'inputs that contain less than 50k files.')) parser.add_argument( '--report_all_conflicts', type='bool', default=False, nargs='?', const=True, @@ -501,6 +504,10 @@ def add_arguments(self, parser): 'generated if unspecified. Otherwise, please provide a local ' 'path if run locally, or a cloud path if run on Dataflow.')) + def validate(self, parsed_args): + _validate_inputs(parsed_args) + + class PartitionOptions(VariantTransformsOptions): """Options for partitioning Variant records.""" @@ -583,3 +590,10 @@ def add_arguments(self, parser): 'be the same as the BigQuery table, but it requires all ' 'extracted variants to have the same call name ordering (usually ' 'true for tables from single VCF file import).')) + + +def _validate_inputs(parsed_args): + if ((parsed_args.input_pattern and parsed_args.input_file) or + (not parsed_args.input_pattern and not parsed_args.input_file)): + raise ValueError('Exactly one of input_pattern and input_file has to be ' + 'provided.') diff --git a/gcp_variant_transforms/options/variant_transform_options_test.py b/gcp_variant_transforms/options/variant_transform_options_test.py index c6df4977b..410484305 100644 --- a/gcp_variant_transforms/options/variant_transform_options_test.py +++ b/gcp_variant_transforms/options/variant_transform_options_test.py @@ -26,6 +26,7 @@ from apitools.base.py import exceptions from gcp_variant_transforms.options import variant_transform_options +from gcp_variant_transforms.testing import temp_dir def make_args(options, args): @@ -47,20 +48,37 @@ def _make_args(self, args): # type: (List[str]) -> argparse.Namespace return make_args(self._options, args) - def test_failure_for_conflicting_flags(self): + def test_no_inputs(self): + args = self._make_args([]) + self.assertRaises(ValueError, self._options.validate, args) + + def test_failure_for_conflicting_flags_inputs(self): + args = self._make_args(['--input_pattern', '*', + '--input_file', 'asd']) + self.assertRaises(ValueError, self._options.validate, args) + + def test_failure_for_conflicting_flags_headers(self): args = self._make_args(['--input_pattern', '*', '--infer_headers', '--representative_header_file', 'gs://some_file']) self.assertRaises(ValueError, self._options.validate, args) - def test_failure_for_conflicting_flags_no_errors(self): + def test_failure_for_conflicting_flags_no_errors_with_pattern_input(self): args = self._make_args(['--input_pattern', '*', '--representative_header_file', 'gs://some_file']) self._options.validate(args) - def test_failure_for_invalid_input_pattern(self): - args = self._make_args(['--input_pattern', 'nonexistent_file.vcf']) - self.assertRaises(ValueError, self._options.validate, args) + def test_failure_for_conflicting_flags_no_errors_with_file_input(self): + lines = ['./gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf\n', + './gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf\n', + './gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf\n'] + with temp_dir.TempDir() as tempdir: + filename = tempdir.create_temp_file(lines=lines) + args = self._make_args([ + '--input_file', + filename, + '--representative_header_file', 'gs://some_file']) + self._options.validate(args) class BigQueryWriteOptionsTest(unittest.TestCase): @@ -151,3 +169,30 @@ def test_failure_for_invalid_vep_cache(self): '--vep_image_uri', 'AN_IMAGE', '--vep_cache_path', 'VEP_CACHE']) self.assertRaises(ValueError, self._options.validate, args) + + +class PreprocessOptionsTest(unittest.TestCase): + """Tests cases for the PreprocessOptions class.""" + + def setUp(self): + self._options = variant_transform_options.PreprocessOptions() + + def _make_args(self, args): + # type: (List[str]) -> argparse.Namespace + return make_args(self._options, args) + + def test_failure_for_conflicting_flags_inputs(self): + args = self._make_args(['--input_pattern', '*', + '--report_path', 'some_path', + '--input_file', 'asd']) + self.assertRaises(ValueError, self._options.validate, args) + + def test_failure_for_conflicting_flags_no_errors(self): + args = self._make_args(['--input_pattern', '*', + '--report_path', 'some_path']) + self._options.validate(args) + + def test_failure_for_conflicting_flags_no_errors_with_pattern_input(self): + args = self._make_args(['--input_pattern', '*', + '--report_path', 'some_path']) + self._options.validate(args) diff --git a/gcp_variant_transforms/pipeline_common.py b/gcp_variant_transforms/pipeline_common.py index 4a71abafd..03fee18d0 100644 --- a/gcp_variant_transforms/pipeline_common.py +++ b/gcp_variant_transforms/pipeline_common.py @@ -26,6 +26,7 @@ import apache_beam as beam from apache_beam import pvalue # pylint: disable=unused-import +from apache_beam.io import filesystem from apache_beam.io import filesystems from apache_beam.options import pipeline_options from apache_beam.runners.direct import direct_runner @@ -66,18 +67,61 @@ def parse_args(argv, command_line_options): for transform_options in options: transform_options.validate(known_args) _raise_error_on_invalid_flags(pipeline_args) + known_args.all_patterns = _get_all_patterns( + known_args.input_pattern, known_args.input_file) return known_args, pipeline_args -def get_pipeline_mode(input_pattern, optimize_for_large_inputs=False): - # type: (str, bool) -> int +def _get_all_patterns(input_pattern, input_file): + # type: (str, str) -> List[str] + patterns = [input_pattern] if input_pattern else _get_file_names(input_file) + + # Validate inputs. + try: + # Gets at most 1 pattern match result of type `filesystems.MatchResult`. + matches = filesystems.FileSystems.match(patterns, [1] * len(patterns)) + for match in matches: + if not match.metadata_list: + if input_file: + raise ValueError( + 'Input pattern {} from {} did not match any files.'.format( + match.pattern, input_file)) + else: + raise ValueError( + 'Input pattern {} did not match any files.'.format(match.pattern)) + except filesystem.BeamIOError: + if input_file: + raise ValueError( + 'Some patterns in {} are invalid or inaccessible.'.format( + input_file)) + else: + raise ValueError('Invalid or inaccessible input pattern {}.'.format( + input_pattern)) + return patterns + + +def _get_file_names(input_file): + # type: (str) -> List[str] + """Reads the input file and extracts list of patterns out of it.""" + if not filesystems.FileSystems.exists(input_file): + raise ValueError('Input file {} doesn\'t exist'.format(input_file)) + with filesystems.FileSystems.open(input_file) as f: + contents = map(str.strip, f.readlines()) + if not contents: + raise ValueError('Input file {} is empty.'.format(input_file)) + return contents + + +def get_pipeline_mode(all_patterns, optimize_for_large_inputs=False): + # type: (List[str], bool) -> int """Returns the mode the pipeline should operate in based on input size.""" - if optimize_for_large_inputs: + if optimize_for_large_inputs or len(all_patterns) > 1: return PipelineModes.LARGE - match_results = filesystems.FileSystems.match([input_pattern]) + match_results = filesystems.FileSystems.match(all_patterns) if not match_results: - raise ValueError('No files matched input_pattern: {}'.format(input_pattern)) + raise ValueError( + 'No files matched input_pattern: {}'.format(all_patterns[0])) total_files = len(match_results[0].metadata_list) if total_files > _LARGE_DATA_THRESHOLD: @@ -87,15 +131,16 @@ def get_pipeline_mode(input_pattern, optimize_for_large_inputs=False): return PipelineModes.SMALL -def read_headers(pipeline, pipeline_mode, input_pattern): - # type: (beam.Pipeline, int, str) -> pvalue.PCollection +def read_headers(pipeline, pipeline_mode, all_patterns): + # type: (beam.Pipeline, int, List[str]) -> pvalue.PCollection """Creates an initial PCollection by reading the VCF file headers.""" if pipeline_mode == PipelineModes.LARGE: headers = (pipeline - | beam.Create([input_pattern]) + | beam.Create(all_patterns) | vcf_header_io.ReadAllVcfHeaders()) else: - headers = pipeline | vcf_header_io.ReadVcfHeaders(input_pattern) + headers = pipeline | vcf_header_io.ReadVcfHeaders(all_patterns[0]) + return headers diff --git a/gcp_variant_transforms/pipeline_common_test.py b/gcp_variant_transforms/pipeline_common_test.py index d051ef4c4..1df354386 100644 --- a/gcp_variant_transforms/pipeline_common_test.py +++ b/gcp_variant_transforms/pipeline_common_test.py @@ -22,35 +22,36 @@ from gcp_variant_transforms import pipeline_common from gcp_variant_transforms.pipeline_common import PipelineModes +from gcp_variant_transforms.testing import temp_dir -class VcfToBqCommonTest(unittest.TestCase): - """Tests cases for the `pipeline_common` script.""" +class PipelineCommonWithPatternTest(unittest.TestCase): + """Tests cases for the `pipeline_common` script with pattern input.""" def _create_mock_args(self, **args): return collections.namedtuple('MockArgs', args.keys())(*args.values()) def _get_pipeline_mode(self, args): - return pipeline_common.get_pipeline_mode(args.input_pattern, + all_patterns = pipeline_common._get_all_patterns(args.input_pattern, + args.input_file) + return pipeline_common.get_pipeline_mode(all_patterns, args.optimize_for_large_inputs) - def test_get_mode_raises_error_for_no_match(self): - args = self._create_mock_args( - input_pattern='', optimize_for_large_inputs=False) - - with mock.patch.object(FileSystems, 'match', return_value=None), \ - self.assertRaises(ValueError): - self._get_pipeline_mode(args) + def test_validation_failure_for_invalid_input_pattern(self): + with self.assertRaisesRegexp( + ValueError, 'Input pattern .* did not match any files.'): + pipeline_common._get_all_patterns( + input_pattern='nonexistent_file.vcf', input_file=None) def test_get_mode_optimize_set(self): args = self._create_mock_args( - input_pattern='', optimize_for_large_inputs=True) + input_pattern='*', input_file=None, optimize_for_large_inputs=True) self.assertEqual(self._get_pipeline_mode(args), PipelineModes.LARGE) def test_get_mode_small(self): args = self._create_mock_args( - input_pattern='', optimize_for_large_inputs=False) + input_pattern='*', input_file=None, optimize_for_large_inputs=False) match_result = collections.namedtuple('MatchResult', ['metadata_list']) match = match_result([None for _ in range(100)]) @@ -59,7 +60,7 @@ def test_get_mode_small(self): def test_get_mode_medium(self): args = self._create_mock_args( - input_pattern='', optimize_for_large_inputs=False) + input_pattern='*', input_file=None, optimize_for_large_inputs=False) match_result = collections.namedtuple('MatchResult', ['metadata_list']) match = match_result(range(101)) @@ -72,22 +73,13 @@ def test_get_mode_medium(self): def test_get_mode_large(self): args = self._create_mock_args( - input_pattern='', optimize_for_large_inputs=False) + input_pattern='test', input_file=None, optimize_for_large_inputs=False) match_result = collections.namedtuple('MatchResult', ['metadata_list']) match = match_result(range(50001)) with mock.patch.object(FileSystems, 'match', return_value=[match]): self.assertEqual(self._get_pipeline_mode(args), PipelineModes.LARGE) - def test_default_optimize_for_large_inputs(self): - args = self._create_mock_args(input_pattern='') - match_result = collections.namedtuple('MatchResult', ['metadata_list']) - - match = match_result(range(101)) - with mock.patch.object(FileSystems, 'match', return_value=[match]): - self.assertEqual(pipeline_common.get_pipeline_mode(args.input_pattern), - PipelineModes.MEDIUM) - def test_fail_on_invalid_flags(self): # Start with valid flags, without setup.py. pipeline_args = ['--project', @@ -109,3 +101,85 @@ def test_fail_on_invalid_flags(self): pipeline_args.extend(['--unknown_flag', 'somevalue']) with self.assertRaisesRegexp(ValueError, 'Unrecognized.*unknown_flag'): pipeline_common._raise_error_on_invalid_flags(pipeline_args) + +class PipelineCommonWithFileTest(unittest.TestCase): + """Tests cases for the `pipeline_common` script with file input.""" + + SAMPLE_LINES = ['./gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf\n', + './gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf\n', + './gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf\n'] + + + def _create_mock_args(self, **args): + return collections.namedtuple('MockArgs', args.keys())(*args.values()) + + def _get_pipeline_mode(self, args): + all_patterns = pipeline_common._get_all_patterns(args.input_pattern, + args.input_file) + return pipeline_common.get_pipeline_mode(all_patterns, + args.optimize_for_large_inputs) + + def test_get_mode_optimize_set(self): + with temp_dir.TempDir() as tempdir: + filename = tempdir.create_temp_file(lines=self.SAMPLE_LINES) + args = self._create_mock_args( + input_pattern=None, + input_file=filename, + optimize_for_large_inputs=True) + + self.assertEqual(self._get_pipeline_mode(args), PipelineModes.LARGE) + + def test_get_mode_small_still_large(self): + with temp_dir.TempDir() as tempdir: + filename = tempdir.create_temp_file(lines=self.SAMPLE_LINES) + args = self._create_mock_args( + input_pattern=None, + input_file=filename, + optimize_for_large_inputs=False) + match_result = collections.namedtuple('MatchResult', ['metadata_list']) + + match = match_result([None for _ in range(100)]) + with mock.patch.object(FileSystems, 'match', return_value=[match]): + self.assertEqual(self._get_pipeline_mode(args), PipelineModes.LARGE) + + def test_get_mode_large(self): + with temp_dir.TempDir() as tempdir: + filename = tempdir.create_temp_file(lines=self.SAMPLE_LINES) + args = self._create_mock_args( + input_pattern=None, + input_file=filename, + optimize_for_large_inputs=False) + match_result = collections.namedtuple('MatchResult', ['metadata_list']) + + match = match_result(range(50001)) + with mock.patch.object(FileSystems, 'match', return_value=[match]): + self.assertEqual(self._get_pipeline_mode(args), PipelineModes.LARGE) + + matches = [match_result(range(25000)), + match_result(range(25000)), + match_result(range(1))] + with mock.patch.object(FileSystems, 'match', return_value=matches): + self.assertEqual(self._get_pipeline_mode(args), PipelineModes.LARGE) + + def test_validation_failure_for_invalid_input_file(self): + with self.assertRaisesRegexp(ValueError, 'Input file .* doesn\'t exist'): + pipeline_common._get_all_patterns( + input_pattern=None, input_file='nonexistent_file.vcf') + + def test_validation_failure_for_empty_input_file(self): + with temp_dir.TempDir() as tempdir: + filename = tempdir.create_temp_file(lines=[]) + with self.assertRaisesRegexp(ValueError, 'Input file .* is empty.'): + pipeline_common._get_all_patterns( + input_pattern=None, input_file=filename) + + def test_validation_failure_for_wrong_pattern_in_input_file(self): + lines = ['./gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf\n', + 'non_existent.vcf\n', + './gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf\n'] + with temp_dir.TempDir() as tempdir: + filename = tempdir.create_temp_file(lines=lines) + with self.assertRaisesRegexp( + ValueError, 'Input pattern .* from .* did not match any files.'): + pipeline_common._get_all_patterns( + input_pattern=None, input_file=filename) diff --git a/gcp_variant_transforms/testing/data/input_files/combine_input b/gcp_variant_transforms/testing/data/input_files/combine_input new file mode 100644 index 000000000..79fcca073 --- /dev/null +++ b/gcp_variant_transforms/testing/data/input_files/combine_input @@ -0,0 +1,2 @@ +gs://gcp-variant-transforms-testfiles/small_tests/valid-4.1* +gs://gcp-variant-transforms-testfiles/small_tests/valid-4.2.* diff --git a/gcp_variant_transforms/testing/data/input_files/error_input b/gcp_variant_transforms/testing/data/input_files/error_input new file mode 100644 index 000000000..78426bd2f --- /dev/null +++ b/gcp_variant_transforms/testing/data/input_files/error_input @@ -0,0 +1,2 @@ +gs://gcp-variant-transforms-testfiles/small_tests/valid-4.0.vcf +gs://gcp-variant-transforms-testfiles/small_tests/valid-4.2.vcf diff --git a/gcp_variant_transforms/testing/integration/preprocessor_tests/header_conflicts_from_file.json b/gcp_variant_transforms/testing/integration/preprocessor_tests/header_conflicts_from_file.json new file mode 100644 index 000000000..de8971658 --- /dev/null +++ b/gcp_variant_transforms/testing/integration/preprocessor_tests/header_conflicts_from_file.json @@ -0,0 +1,19 @@ +[ + { + "test_name": "header-conflicts-from-file", + "input_file": "gs://gcp-variant-transforms-testfiles/small_tests/input_files/error_input", + "report_blob_name": "temp/report_no_conflicts.tsv", + "runner": "DirectRunner", + "zones": ["us-west1-b"], + "expected_contents": [ + "Header Conflicts", + "ID\tCategory\tConflicts\tFile\tPaths\tProposed Resolution", + "AF\tINFO\tnum=None type=Float\tgs://gcp-variant-transforms-testfiles/small_tests/valid-4.0.vcf\tnum=None type=Float", + " \t \tnum=-1 type=Float\tgs://gcp-variant-transforms-testfiles/small_tests/valid-4.2.vcf\t ", + "", + "No Inferred Headers Found.", + "", + "No Malformed Records Found." + ] + } +] diff --git a/gcp_variant_transforms/testing/integration/run_preprocessor_tests.py b/gcp_variant_transforms/testing/integration/run_preprocessor_tests.py index 60fbbc5ca..da2e0018a 100644 --- a/gcp_variant_transforms/testing/integration/run_preprocessor_tests.py +++ b/gcp_variant_transforms/testing/integration/run_preprocessor_tests.py @@ -16,7 +16,7 @@ To define a new preprocessor_tests integration test case, create a json file in gcp_variant_transforms/testing/integration/preprocessor_tests directory and -specify at least test_name, input_pattern, blob_name and expected_contents +specify at least test_name, blob_name and expected_contents for the integration test. Execute the following command from the root source directory: @@ -55,7 +55,6 @@ class PreprocessorTestCase(run_tests_common.TestCaseInterface): def __init__(self, parser_args, # type: Namespace test_name, # type: str - input_pattern, # type: str expected_contents, # type: List[str] report_blob_name, # type: str header_blob_name=None, # type: str @@ -71,8 +70,7 @@ def __init__(self, self._report_blob_name = self._append_suffix(report_blob_name, suffix) self._report_path = '/'.join(['gs:/', _BUCKET_NAME, self._report_blob_name]) self._project = parser_args.project - args = ['--input_pattern {}'.format(input_pattern), - '--report_path {}'.format(self._report_path), + args = ['--report_path {}'.format(self._report_path), '--project {}'.format(parser_args.project), '--staging_location {}'.format(parser_args.staging_location), '--temp_location {}'.format(parser_args.temp_location), @@ -150,8 +148,7 @@ def _get_args(): def _get_test_configs(): # type: () -> List[List[Dict]] """Gets all test configs in preprocessor_tests.""" - required_keys = ['test_name', 'report_blob_name', 'input_pattern', - 'expected_contents'] + required_keys = ['test_name', 'report_blob_name', 'expected_contents'] test_file_path = os.path.join(os.getcwd(), _TEST_FOLDER) return run_tests_common.get_configs(test_file_path, required_keys) diff --git a/gcp_variant_transforms/testing/integration/run_vcf_to_bq_tests.py b/gcp_variant_transforms/testing/integration/run_vcf_to_bq_tests.py index 38ed45f47..573f65f4d 100644 --- a/gcp_variant_transforms/testing/integration/run_vcf_to_bq_tests.py +++ b/gcp_variant_transforms/testing/integration/run_vcf_to_bq_tests.py @@ -16,7 +16,7 @@ To define a new integration test case, create a json file in `gcp_variant_transforms/testing/integration/vcf_to_bq_tests` directory and -specify at least test_name, table_name, and input_pattern for the integration +specify at least test_name and table_name for the integration test. You may add multiple test cases (Now at most two are supported) in one json file, and the second test case will run after the first one finishes. @@ -66,7 +66,6 @@ def __init__(self, context, # type: TestContextManager test_name, # type: str table_name, # type: str - input_pattern, # type: str assertion_configs, # type: List[Dict] zones=None, # type: List[str] **kwargs # type: **str @@ -78,8 +77,7 @@ def __init__(self, self._project = context.project output_table = '{}:{}'.format(context.project, self._table_name) self._assertion_configs = assertion_configs - args = ['--input_pattern {}'.format(input_pattern), - '--output_table {}'.format(output_table), + args = ['--output_table {}'.format(output_table), '--project {}'.format(context.project), '--staging_location {}'.format(context.staging_location), '--temp_location {}'.format(context.temp_location), @@ -259,8 +257,7 @@ def _get_args(): def _get_test_configs(run_presubmit_tests, run_all_tests, test_file_suffix=''): # type: (bool, bool, str) -> List[List[Dict]] """Gets all test configs.""" - required_keys = ['test_name', 'table_name', 'input_pattern', - 'assertion_configs'] + required_keys = ['test_name', 'table_name', 'assertion_configs'] test_file_path = _get_test_file_path(run_presubmit_tests, run_all_tests, test_file_suffix) test_configs = run_tests_common.get_configs(test_file_path, diff --git a/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/medium_tests/gnomad_genomes_GRCh38_chrX_head2500_run_vep_from_file.json b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/medium_tests/gnomad_genomes_GRCh38_chrX_head2500_run_vep_from_file.json new file mode 100644 index 000000000..efe0f5721 --- /dev/null +++ b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/medium_tests/gnomad_genomes_GRCh38_chrX_head2500_run_vep_from_file.json @@ -0,0 +1,86 @@ +[ + { + "test_name": "gnomad-genomes-grch37-chr-x-head2500-run-vep-from-file", + "table_name": "gnomad_genomes_GRCh37_chrX_head2500_run_vep_from_file", + "input_file": "gs://gcp-variant-transforms-testfiles/small_tests/input_files/gnomad_input", + "annotation_fields": "CSQ", + "vep_assembly": "GRCh37", + "runner": "DataflowRunner", + "run_annotation_pipeline": "True", + "annotation_output_dir": "gs://integration_test_runs/temp/vep_output/{TABLE_NAME}", + "shard_variants": "True", + "num_workers": 2, + "assertion_configs": [ + { + "query": ["NUM_ROWS_QUERY"], + "expected_result": {"num_rows": 2337} + }, + { + "query": [ + "SELECT COUNT(0) AS num_annotation_sets ", + "FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ AS CSQ" + ], + "expected_result": {"num_annotation_sets": 21167} + }, + { + "query": [ + "SELECT COUNT(0) AS num_annotation_sets ", + "FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ_VT AS CSQ_VT" + ], + "expected_result": {"num_annotation_sets": 21059} + }, + { + "query": [ + "SELECT SUM(start_position * number_of_annotations) AS hash_sum ", + "FROM ( ", + " SELECT start_position, reference_bases, A.alt, ", + " COUNT(0) AS number_of_annotations ", + " FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ AS CSQ", + " GROUP BY 1, 2, 3", + ")" + ], + "expected_result": {"hash_sum": 17184695290} + }, + { + "query": [ + "SELECT SUM(start_position * number_of_annotations) AS hash_sum ", + "FROM ( ", + " SELECT start_position, reference_bases, A.alt, ", + " COUNT(0) AS number_of_annotations ", + " FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ_VT AS CSQ_VT", + " GROUP BY 1, 2, 3", + ")" + ], + "expected_result": {"hash_sum": 17180709457} + }, + { + "query": [ + "SELECT COUNT(DISTINCT CSQ.Feature) AS num_features ", + "FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ AS CSQ" + ], + "expected_result": {"num_features": 74} + }, + { + "query": [ + "SELECT COUNT(DISTINCT CSQ_VT.Feature) AS num_features ", + "FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ_VT AS CSQ_VT" + ], + "expected_result": {"num_features": 71} + }, + { + "query": [ + "SELECT COUNT(DISTINCT CSQ.SYMBOL) AS num_symbol ", + "FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ AS CSQ" + ], + "expected_result": {"num_symbol": 12} + }, + { + "query": [ + "SELECT COUNT(DISTINCT CSQ_VT.SYMBOL) AS num_symbol ", + "FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ_VT AS CSQ_VT" + ], + "expected_result": {"num_symbol": 12} + } + ] + } +] diff --git a/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/medium_tests/test_annotation_pipeline_from_file_with_sharding.json b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/medium_tests/test_annotation_pipeline_from_file_with_sharding.json new file mode 100644 index 000000000..6eecca17f --- /dev/null +++ b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/medium_tests/test_annotation_pipeline_from_file_with_sharding.json @@ -0,0 +1,53 @@ +[ + { + "test_name": "test-annotation-pipeline-from-file-with-sharding", + "table_name": "test_annotation_pipeline_from_file_with_sharding", + "input_file": "gs://gcp-variant-transforms-testfiles/small_tests/input_files/combine_input", + "vep_assembly": "GRCh37", + "runner": "DataflowRunner", + "run_annotation_pipeline": "True", + "annotation_output_dir": "gs://integration_test_runs/temp/vep_output/{TABLE_NAME}", + "shard_variants": "True", + "infer_headers": "True", + "num_workers": 2, + "assertion_configs": [ + { + "query": ["NUM_ROWS_QUERY"], + "expected_result": {"num_rows": 19778} + }, + { + "query": [ + "SELECT COUNT(0) AS num_annotation_sets ", + "FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ_VT AS CSQ_VT" + ], + "expected_result": {"num_annotation_sets": 16898} + }, + { + "query": [ + "SELECT SUM(start_position * number_of_annotations) AS hash_sum ", + "FROM ( ", + " SELECT start_position, reference_bases, A.alt, ", + " COUNT(0) AS number_of_annotations ", + " FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ_VT AS CSQ_VT", + " GROUP BY 1, 2, 3", + ")" + ], + "expected_result": {"hash_sum": 13631192958} + }, + { + "query": [ + "SELECT COUNT(DISTINCT CSQ_VT.Feature) AS num_features ", + "FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ_VT AS CSQ_VT" + ], + "expected_result": {"num_features": 417} + }, + { + "query": [ + "SELECT COUNT(DISTINCT CSQ_VT.SYMBOL) AS num_symbol ", + "FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ_VT AS CSQ_VT" + ], + "expected_result": {"num_symbol": 92} + } + ] + } +] diff --git a/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/medium_tests/test_annotation_pipeline_from_file_without_sharding.json b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/medium_tests/test_annotation_pipeline_from_file_without_sharding.json new file mode 100644 index 000000000..fde8e58eb --- /dev/null +++ b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/medium_tests/test_annotation_pipeline_from_file_without_sharding.json @@ -0,0 +1,53 @@ +[ + { + "test_name": "test-annotation-pipeline-from-file-without-sharding", + "table_name": "test_annotation_pipeline_from_file_without_sharding", + "input_file": "gs://gcp-variant-transforms-testfiles/small_tests/input_files/combine_input", + "vep_assembly": "GRCh37", + "runner": "DataflowRunner", + "run_annotation_pipeline": "True", + "annotation_output_dir": "gs://integration_test_runs/temp/vep_output/{TABLE_NAME}", + "shard_variants": "False", + "infer_headers": "True", + "num_workers": 2, + "assertion_configs": [ + { + "query": ["NUM_ROWS_QUERY"], + "expected_result": {"num_rows": 19778} + }, + { + "query": [ + "SELECT COUNT(0) AS num_annotation_sets ", + "FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ_VT AS CSQ_VT" + ], + "expected_result": {"num_annotation_sets": 16898} + }, + { + "query": [ + "SELECT SUM(start_position * number_of_annotations) AS hash_sum ", + "FROM ( ", + " SELECT start_position, reference_bases, A.alt, ", + " COUNT(0) AS number_of_annotations ", + " FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ_VT AS CSQ_VT", + " GROUP BY 1, 2, 3", + ")" + ], + "expected_result": {"hash_sum": 13631192958} + }, + { + "query": [ + "SELECT COUNT(DISTINCT CSQ_VT.Feature) AS num_features ", + "FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ_VT AS CSQ_VT" + ], + "expected_result": {"num_features": 417} + }, + { + "query": [ + "SELECT COUNT(DISTINCT CSQ_VT.SYMBOL) AS num_symbol ", + "FROM {TABLE_NAME} AS T, T.alternate_bases AS A, A.CSQ_VT AS CSQ_VT" + ], + "expected_result": {"num_symbol": 92} + } + ] + } +] diff --git a/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/small_tests/combine_from_multiple_inputs.json b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/small_tests/combine_from_multiple_inputs.json new file mode 100644 index 000000000..ac7b0ba48 --- /dev/null +++ b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/small_tests/combine_from_multiple_inputs.json @@ -0,0 +1,24 @@ +[ + { + "test_name": "combine-from-multiple-inputs", + "table_name": "combine_from_multiple_inputs", + "input_file": "gs://gcp-variant-transforms-testfiles/small_tests/input_files/combine_input", + "allow_incompatible_records": "True", + "runner": "DirectRunner", + "zones": ["us-west1-b"], + "assertion_configs": [ + { + "query": ["NUM_ROWS_QUERY"], + "expected_result": {"num_rows": 19790} + }, + { + "query": ["SELECT SUM(DP) AS sum FROM {TABLE_NAME}"], + "expected_result": {"sum": 262} + }, + { + "query": ["SELECT COUNT(DB) AS cnt FROM {TABLE_NAME}"], + "expected_result": {"cnt": 4} + } + ] + } +] diff --git a/gcp_variant_transforms/vcf_to_bq.py b/gcp_variant_transforms/vcf_to_bq.py index 97e893ed3..8a5e2fb80 100644 --- a/gcp_variant_transforms/vcf_to_bq.py +++ b/gcp_variant_transforms/vcf_to_bq.py @@ -85,8 +85,12 @@ _SHARDS_FOLDER = 'shards' -def _read_variants(input_pattern, pipeline, known_args, pipeline_mode): - # type: (str, beam.Pipeline, argparse.Namespace, int) -> pvalue.PCollection +def _read_variants(all_patterns, # type: List[str] + pipeline, # type: beam.Pipeline + known_args, # type: argparse.Namespace + pipeline_mode # type: int + ): + # type: (...) -> pvalue.PCollection """Helper method for returning a PCollection of Variants from VCFs.""" representative_header_lines = None if known_args.representative_header_file: @@ -95,17 +99,18 @@ def _read_variants(input_pattern, pipeline, known_args, pipeline_mode): if pipeline_mode == pipeline_common.PipelineModes.LARGE: variants = (pipeline - | 'InputFilePattern' >> beam.Create([input_pattern]) + | 'InputFilePattern' >> beam.Create(all_patterns) | 'ReadAllFromVcf' >> vcfio.ReadAllFromVcf( representative_header_lines=representative_header_lines, allow_malformed_records=( known_args.allow_malformed_records))) else: variants = pipeline | 'ReadFromVcf' >> vcfio.ReadFromVcf( - input_pattern, + all_patterns[0], representative_header_lines=representative_header_lines, allow_malformed_records=known_args.allow_malformed_records, vcf_parser_type=vcfio.VcfParserType[known_args.vcf_parser]) + return variants @@ -131,7 +136,7 @@ def _get_variant_merge_strategy(known_args # type: argparse.Namespace raise ValueError('Merge strategy is not supported.') -def _add_inferred_headers(input_pattern, # type: str +def _add_inferred_headers(all_patterns, # type: List[str] pipeline, # type: beam.Pipeline known_args, # type: argparse.Namespace merged_header, # type: pvalue.PCollection @@ -141,7 +146,10 @@ def _add_inferred_headers(input_pattern, # type: str annotation_fields_to_infer = (known_args.annotation_fields if known_args.infer_annotation_types else []) inferred_headers = ( - _read_variants(input_pattern, pipeline, known_args, pipeline_mode) + _read_variants(all_patterns, + pipeline, + known_args, + pipeline_mode) | 'FilterVariants' >> filter_variants.FilterVariants( reference_names=known_args.reference_names) | 'InferHeaderFields' >> infer_headers.InferHeaderFields( @@ -159,7 +167,7 @@ def _add_inferred_headers(input_pattern, # type: str def _shard_variants(known_args, pipeline_args, pipeline_mode): - # type: (argparse.Namespace, List[str], int) -> str + # type: (argparse.Namespace, List[str], int) -> List[str] """Reads the variants and writes them to VCF shards. Returns: @@ -173,8 +181,8 @@ def _shard_variants(known_args, pipeline_args, pipeline_mode): vcf_shards_output_dir = filesystems.FileSystems.join( known_args.annotation_output_dir, _SHARDS_FOLDER) with beam.Pipeline(options=options) as p: - variants = _read_variants(known_args.input_pattern, p, known_args, - pipeline_mode) + variants = _read_variants( + known_args.all_patterns, p, known_args, pipeline_mode) call_names = (variants | 'CombineCallNames' >> combine_call_names.CallNamesCombiner()) @@ -186,11 +194,11 @@ def _shard_variants(known_args, pipeline_args, pipeline_mode): beam.pvalue.AsSingleton(call_names), known_args.number_of_variants_per_shard)) - return vep_runner_util.format_dir_path(vcf_shards_output_dir) + return [vep_runner_util.format_dir_path(vcf_shards_output_dir)] -def _annotate_vcf_files(input_pattern, known_args, pipeline_args): - # type: (str, argparse.Namespace, List[str]) -> str +def _annotate_vcf_files(all_patterns, known_args, pipeline_args): + # type: (List[str], argparse.Namespace, List[str]) -> str """Annotates the VCF files using VEP. Returns: @@ -204,7 +212,7 @@ def _annotate_vcf_files(input_pattern, known_args, pipeline_args): with beam.Pipeline(options=options) as p: _ = (p - | beam.Create([input_pattern]) + | beam.Create(all_patterns) | 'AnnotateShards' >> beam.ParDo( annotate_files.AnnotateFile(known_args, pipeline_args))) if known_args.annotation_fields: @@ -229,13 +237,12 @@ def _update_google_cloud_job_name(google_cloud_options, job_name): google_cloud_options.job_name = job_name -def _merge_headers(input_pattern, known_args, pipeline_args, pipeline_mode, - annotated_vcf_pattern=None): +def _merge_headers(known_args, pipeline_args, + pipeline_mode, annotated_vcf_pattern=None): # type: (str, argparse.Namespace, List[str], int, str) -> None """Merges VCF headers using beam based on pipeline_mode.""" if known_args.representative_header_file: return - options = pipeline_options.PipelineOptions(pipeline_args) # Always run pipeline locally if data is small. @@ -258,7 +265,8 @@ def _merge_headers(input_pattern, known_args, pipeline_args, pipeline_mode, temp_directory, temp_merged_headers_file_name) with beam.Pipeline(options=options) as p: - headers = pipeline_common.read_headers(p, pipeline_mode, input_pattern) + headers = pipeline_common.read_headers( + p, pipeline_mode, known_args.all_patterns) merged_header = pipeline_common.get_merged_headers( headers, known_args.split_alternate_allele_info_fields, @@ -268,7 +276,9 @@ def _merge_headers(input_pattern, known_args, pipeline_args, pipeline_mode, p, known_args, pipeline_mode, merged_header, annotated_vcf_pattern) if known_args.infer_headers or known_args.infer_annotation_types: - infer_headers_input_pattern = annotated_vcf_pattern or input_pattern + infer_headers_input_pattern = ( + [annotated_vcf_pattern] if + annotated_vcf_pattern else known_args.all_patterns) merged_header = _add_inferred_headers(infer_headers_input_pattern, p, known_args, merged_header, pipeline_mode) @@ -299,10 +309,11 @@ def _run_annotation_pipeline(known_args, pipeline_args): _validate_annotation_pipeline_args(known_args, pipeline_args) known_args.omit_empty_sample_calls = True - files_to_be_annotated = known_args.input_pattern + files_to_be_annotated = known_args.all_patterns if known_args.shard_variants: pipeline_mode = pipeline_common.get_pipeline_mode( - known_args.input_pattern, known_args.optimize_for_large_inputs) + files_to_be_annotated, + known_args.optimize_for_large_inputs) files_to_be_annotated = _shard_variants(known_args, pipeline_args, pipeline_mode) @@ -321,15 +332,21 @@ def run(argv=None): annotated_vcf_pattern = _run_annotation_pipeline(known_args, pipeline_args) - input_pattern = annotated_vcf_pattern or known_args.input_pattern + all_patterns = ( + [annotated_vcf_pattern] if annotated_vcf_pattern + else known_args.all_patterns) + variant_merger = _get_variant_merge_strategy(known_args) + pipeline_mode = pipeline_common.get_pipeline_mode( - input_pattern, known_args.optimize_for_large_inputs) + all_patterns, + known_args.optimize_for_large_inputs) # Starts a pipeline to merge VCF headers in beam if the total files that # match the input pattern exceeds _SMALL_DATA_THRESHOLD - _merge_headers(known_args.input_pattern, known_args, pipeline_args, + _merge_headers(known_args, pipeline_args, pipeline_mode, annotated_vcf_pattern) + # Retrieve merged headers prior to launching the pipeline. This is needed # since the BigQuery schema cannot yet be dynamically created based on input. # See https://issues.apache.org/jira/browse/BEAM-2801. @@ -354,7 +371,7 @@ def run(argv=None): beam_pipeline_options = pipeline_options.PipelineOptions(pipeline_args) pipeline = beam.Pipeline(options=beam_pipeline_options) - variants = _read_variants(input_pattern, pipeline, known_args, pipeline_mode) + variants = _read_variants(all_patterns, pipeline, known_args, pipeline_mode) variants |= 'FilterVariants' >> filter_variants.FilterVariants( reference_names=known_args.reference_names) if partitioner: diff --git a/gcp_variant_transforms/vcf_to_bq_preprocess.py b/gcp_variant_transforms/vcf_to_bq_preprocess.py index 777c7372c..05c644ebe 100644 --- a/gcp_variant_transforms/vcf_to_bq_preprocess.py +++ b/gcp_variant_transforms/vcf_to_bq_preprocess.py @@ -94,18 +94,25 @@ def run(argv=None): known_args, pipeline_args = pipeline_common.parse_args(argv, _COMMAND_LINE_OPTIONS) options = pipeline_options.PipelineOptions(pipeline_args) - pipeline_mode = pipeline_common.get_pipeline_mode(known_args.input_pattern) + all_patterns = known_args.all_patterns + pipeline_mode = pipeline_common.get_pipeline_mode(all_patterns) with beam.Pipeline(options=options) as p: - headers = pipeline_common.read_headers(p, pipeline_mode, - known_args.input_pattern) + headers = pipeline_common.read_headers(p, pipeline_mode, all_patterns) merged_headers = pipeline_common.get_merged_headers(headers) merged_definitions = (headers | 'MergeDefinitions' >> merge_header_definitions.MergeDefinitions()) if known_args.report_all_conflicts: - variants = p | 'ReadFromVcf' >> vcfio.ReadFromVcf( - known_args.input_pattern, allow_malformed_records=True) + if len(all_patterns) == 1: + variants = p | 'ReadFromVcf' >> vcfio.ReadFromVcf( + all_patterns[0], allow_malformed_records=True) + else: + variants = (p + | 'InputFilePattern' >> beam.Create(all_patterns) + | 'ReadAllFromVcf' >> vcfio.ReadAllFromVcf( + allow_malformed_records=True)) + malformed_records = variants | filter_variants.ExtractMalformedVariants() inferred_headers, merged_headers = (_get_inferred_headers(variants, merged_headers))