Skip to content

Conversation

@tneymanov
Copy link
Collaborator

@tneymanov tneymanov commented Mar 5, 2019

Allows users to specify a list of files or input pattern inside a file to ingest into the tool.

Notes:

  • Could be helpful for cases when there isn't an easy regex for users to select all of their files.
  • VCF to BQ preprocessor is also modified to accept both inputs.
  • Using input_file flag method automatically quasi-turns on optimize_for_large_inputs, so it would be preferable not to use it for inputs of less than 50k files.

Issue: 441

@coveralls
Copy link

coveralls commented Mar 5, 2019

Pull Request Test Coverage Report for Build 1650

  • 119 of 143 (83.22%) changed or added relevant lines in 8 files are covered.
  • 12 unchanged lines in 5 files lost coverage.
  • Overall coverage increased (+0.09%) to 89.025%

Changes Missing Coverage Covered Lines Changed/Added Lines %
gcp_variant_transforms/testing/integration/run_preprocessor_tests.py 0 2 0.0%
gcp_variant_transforms/testing/integration/run_vcf_to_bq_tests.py 0 2 0.0%
gcp_variant_transforms/pipeline_common.py 24 30 80.0%
gcp_variant_transforms/vcf_to_bq_preprocess.py 0 6 0.0%
gcp_variant_transforms/vcf_to_bq.py 4 12 33.33%
Files with Coverage Reduction New Missed Lines %
gcp_variant_transforms/vcf_to_bq_preprocess.py 1 0.0%
gcp_variant_transforms/testing/integration/run_vcf_to_bq_tests.py 1 26.49%
gcp_variant_transforms/testing/integration/run_preprocessor_tests.py 1 27.85%
gcp_variant_transforms/pipeline_common.py 2 70.83%
gcp_variant_transforms/vcf_to_bq.py 7 36.07%
Totals Coverage Status
Change from base Build 1638: 0.09%
Covered Lines: 7130
Relevant Lines: 8009

💛 - Coveralls

Copy link
Contributor

@allieychen allieychen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Tural for adding input file option and detailed unit tests! I have added some comments. Please also add the related issue (#441) in the description.

@@ -0,0 +1,31 @@
[
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend to have the integration tests for the following cases:

  • vcf to bq
  • preprocessor
  • vcf to bq with annotation

The above cases would be affected by the input_file since it provides different sources for variants. On the other hand, merge_option should be fine as long as we can read the variants correctly.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, running the tests right now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the point of adding this test. It won't catch more errors than existing test cases (i.e., the above test case and merge_option_info_keys_to_move_to_calls_regex would cover possible the same failure scenarios.)

Copy link
Collaborator Author

@tneymanov tneymanov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for review, applied the changes. Running the new integration tests rn.

@@ -0,0 +1,31 @@
[
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, running the tests right now.

@tneymanov tneymanov force-pushed the file_of_files branch 2 times, most recently from 7456524 to 003beec Compare March 8, 2019 02:59
from gcp_variant_transforms.beam_io import vcfio
from gcp_variant_transforms.libs import bigquery_sanitizer


Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought python style was to keep two blank lines?

parser.add_argument(
'--input_file',
help=('File that contains the list of VCF file names to input. Either '
'this or --input_pattern flag has to be provided, exlusively.'

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo (exlusively)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

'--input_file',
help=('File that contains the list of VCF file names to input. Either '
'this or --input_pattern flag has to be provided, exlusively.'
'Note that using input_file than input_pattern is slower for '

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/than/rather than/

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

if parsed_args.input_pattern is not None:
try:
# Gets at most 1 pattern match result of type `filesystems.MatchResult`.
first_match = filesystems.FileSystems.match(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are no matches, does the array access not fail?

def read_headers(pipeline, pipeline_mode, input_pattern):
def get_file_names(input_file):
# type (str) -> List(str)
""" Reads all input file and extracts list of patterns out of it."""

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All this does it takes a file and returns a list of the lines, correct? Is there no python builtin for this? (something like open("foo").readlines() or so?)

To define a new integration test case, create a json file in
`gcp_variant_transforms/testing/integration/vcf_to_bq_tests` directory and
specify at least test_name, table_name, and input_pattern for the integration
specify at least test_name and table_name for the integration

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated change?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, related - this file was preventing us from running tests, since it assumed that input_pattern was a necessity. I've removed that requirement and updated the doc here to reflect that.

@tneymanov tneymanov force-pushed the file_of_files branch 5 times, most recently from 2ea46b8 to 3f2bfb5 Compare March 11, 2019 20:44
Copy link
Collaborator Author

@tneymanov tneymanov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to commit override, github doesn't seem to allow me to reply to some of the Aaron's comments, so I'll do so here

I thought python style was to keep two blank lines?

Done.

All this does it takes a file and returns a list of the lines, correct? Is there no python builtin for this? (something like open("foo").readlines() or so?)

soo there is in built in python (splitlines()), but doesn't seem to be in apache beam's filesystems.

If there are no matches, does the array access not fail?

I haven't touched that part but for the each supplied input pattern, match returns a list of MatchResult object, each containing the pattern searched and the list of files it matched (metadata_list). The amount of items that are extracted in metadata_list can be upper-constrained to reduce run time, by passing list of limits to for each search (as we do here by passing [1] for single input pattern and [1,1,...,1] for input_file, since we only want to check existence here). if no matches are found for pattern, metadata_list is None.

Also verified with mock run.

I was actually hoping we could do something like this for the flags themselves, but that will only work if we get a chance to read the file early in the process

Yeah, as discussed offline, there is no obvious way to differentiate between directory and file in the Filesystems package. We can look deeper into it later.


Update: I also moved the input_patterns extraction logic into the argument parser step and created a new arg for it, to be used instead of input_pattern and input_file. I still use input_file field in get_pipeline_mode() step, but only to have more extensive error message - if it's not required, we can cut it out as well.


Integration tests passed.

parser.add_argument(
'--input_file',
help=('File that contains the list of VCF file names to input. Either '
'this or --input_pattern flag has to be provided, exlusively.'
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

'--input_file',
help=('File that contains the list of VCF file names to input. Either '
'this or --input_pattern flag has to be provided, exlusively.'
'Note that using input_file than input_pattern is slower for '
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

To define a new integration test case, create a json file in
`gcp_variant_transforms/testing/integration/vcf_to_bq_tests` directory and
specify at least test_name, table_name, and input_pattern for the integration
specify at least test_name and table_name for the integration
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, related - this file was preventing us from running tests, since it assumed that input_pattern was a necessity. I've removed that requirement and updated the doc here to reflect that.

Copy link
Contributor

@allieychen allieychen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Tural, I added a few comments :)

(not parsed_args.input_pattern and not parsed_args.input_file)):
raise ValueError('Exactly one of input_pattern and input_file has to be '
'provided.')
if parsed_args.input_pattern is not None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be simplified as if parsed_args.input_pattern:

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we also have similar validations in _get_input_patterns and _get_file_names (we have to validate the input pattern listed in the input_file as well), I prefer to move all validations except the first statement to _get_file_names. And the code can be simplified:

  1. If input_file is provided, check whether it exists and read the contents and assign to known_args.input_pattern.
  2. Otherwise, convert known_args.input_pattern to [known_args.input_patterns].
  3. Now we can validate every item in input_pattern.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -0,0 +1 @@
gs://gcp-variant-transforms-testfiles/small_tests/valid-4.0*.vcf
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be worthy more to have two file_patterns for the testing purpose.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does - there is * which encapsulates .../valid-4.0.vcf, .../valid-4.0-one-more-filed.vcf and .../valid-4.0-new.vcf.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, what I mean is that we provide two patterns in this file, e.g.,
gs://gcp-variant-transforms-testfiles/small_tests/valid-4.0*.vcf
gs://gcp-variant-transforms-testfiles/small_tests/valid-4.1*.vcf

The new flag essentially allows the user to provide more than one pattern, while in this test case the user can actually use input_pattern=gs://gcp-variant-transforms-testfiles/small_tests/valid-4.0*.vcf

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

# type: (beam.Pipeline, int, str) -> pvalue.PCollection
"""Creates an initial PCollection by reading the VCF file headers."""
if pipeline_mode == PipelineModes.LARGE or input_file:
if pipeline_mode == PipelineModes.LARGE or len(input_patterns) > 1:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be easier to just change the pipeline_mode to LARGE in get_pipeline_mode when we have more than one input_pattern. But please check whether we use the pipeline mode anywhere else rather than reading the variants.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alas, annotation also looks up the pipeline mode, while it can have different input pattern (specifically, single input pattern of the shard directory).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The annotation pipeline also uses the pipeline_mode to read variants, I don't think it is different from other cases, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry, I wasn't clear - at this step we potentially already have sharded the output so only have 1 input pattern, but whether input was large or not is still relevant

@tneymanov tneymanov force-pushed the file_of_files branch 3 times, most recently from 260caa6 to fd5844e Compare March 13, 2019 20:19
@@ -0,0 +1,3 @@
./gcp_variant_transforms/testing/data/vcf/valid-4.0.vcf
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is better to include this in unit tests (i.e., create temporary files there). This folder is mainly used for integration tests.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you clarify what you mean here - do you want me to dynamically generate this and wrong files? Doesn't that seem like an overkill, or is there a precedence for this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per offline convo, Done.

@@ -0,0 +1,31 @@
[
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the point of adding this test. It won't catch more errors than existing test cases (i.e., the above test case and merge_option_info_keys_to_move_to_calls_regex would cover possible the same failure scenarios.)

 - replace all (input_pattern, input_file) inputs to input_patterns
 - add "combine" input file
 - refactor v_t_options file
 - add integration tests for preprocessor and vcf->bq with annotations
  - Remove validation from pipeline_common.
  - Adjust tests accordingly.
  - Change input_pattern checks to treat '' as absent input, to simplify
	the code.
@tneymanov tneymanov force-pushed the file_of_files branch 3 times, most recently from b44773d to 3223f34 Compare March 15, 2019 16:47
Copy link
Collaborator Author

@tneymanov tneymanov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately had to comment to every comment one by one again :(

I don't see the point of adding this test. It won't catch more errors than existing test cases (i.e., the above test case and merge_option_info_keys_to_move_to_calls_regex would cover possible the same failure scenarios.)

Done

	- Removed merge test duplicate for _from_file option.
	- Modified combine and gnomad tests.
	- Updated the gs storage for said tests.
	- Remove legacy debugging code.
	- Refactor validation to use pipeline_common for extracting file names
	- Create files for unit tests on the fly.
	- Set PipelineMode to Large, when multiple input patterns are available.
	- Move input pattern/file validation to pipeline_common "file reading" stage to remove duplication.
Copy link
Contributor

@allieychen allieychen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you Tural. I added more comments there. One thing I don't feel comfortable is now we have both known_args.input_pattern and known_args.input_patterns. This would be prone to bug. Can you try to come up with a better name? Or maybe just update input_pattern rather then use another variable when user uses input_file.

Copy link
Collaborator Author

@tneymanov tneymanov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Applied the requested changes. Renamed input_patterns to all_patterns.

Copy link
Contributor

@allieychen allieychen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you Tural! Please take a look at the comments I just added. Otherwise LGTM.

@@ -1,8 +1,8 @@
[
{
"test_name": "gnomad-genomes-grch37-chr-x-head2500-run-vep",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test has the same test_name and table_name as gnomad_genomes_GRCh37_chrX_head2500_run_vep.json, it will fail since we can not have the same dataflow job name, nor same table name.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

'is not recommended.'.format(vcfio.VcfParserType.PYVCF.name,
vcfio.VcfParserType.NUCLEUS.name)))


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Please remove the extra blank line. Generally, two blank lines between top-level definitions, class definitions. One blank line between method definitions.



def _get_file_names(input_file):
# type (str) -> List[str]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

# type:

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -0,0 +1,14 @@
[
{
"test_name": "no-conflicts",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use a unique test_name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this test actually passes even when we wrongly provide input_pattern, so it is preferred to modify the input file to actually generate some conflicts information. For instance, you can add gs://gcp-variant-transforms-testfiles/small_tests/invalid-4.0-POS-empty.vcf into combine, the expected contents should be something similar to report_malformed_records.json

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


def _read_variants(input_pattern, pipeline, known_args, pipeline_mode):
def _read_variants(all_patterns, pipeline, known_args, pipeline_mode):
# type: (str, beam.Pipeline, argparse.Namespace, int) -> pvalue.PCollection
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/str/List[str]

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.



def _add_inferred_headers(input_pattern, # type: str
def _add_inferred_headers(all_patterns, # type: str
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/str/List[str]

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


def _annotate_vcf_files(input_pattern, known_args, pipeline_args):
def _annotate_vcf_files(all_patterns, known_args, pipeline_args):
# type: (str, argparse.Namespace, List[str]) -> str
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/str/List[str]

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

annotated_vcf_pattern=None):
def _merge_headers(known_args, pipeline_args,
pipeline_mode, annotated_vcf_pattern=None):
# type: (str, argparse.Namespace, List[str], int, str) -> None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update the type.

Copy link
Collaborator Author

@tneymanov tneymanov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Applied the requested changes.

@@ -1,8 +1,8 @@
[
{
"test_name": "gnomad-genomes-grch37-chr-x-head2500-run-vep",
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.



def _get_file_names(input_file):
# type (str) -> List[str]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


def _read_variants(input_pattern, pipeline, known_args, pipeline_mode):
def _read_variants(all_patterns, pipeline, known_args, pipeline_mode):
# type: (str, beam.Pipeline, argparse.Namespace, int) -> pvalue.PCollection
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


def _annotate_vcf_files(input_pattern, known_args, pipeline_args):
def _annotate_vcf_files(all_patterns, known_args, pipeline_args):
# type: (str, argparse.Namespace, List[str]) -> str
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -0,0 +1,14 @@
[
{
"test_name": "no-conflicts",
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.



def _add_inferred_headers(input_pattern, # type: str
def _add_inferred_headers(all_patterns, # type: str
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

    - type definitions
    - preprocessor integration test
| vcf_header_io.ReadAllVcfHeaders())
else:
headers = pipeline | vcf_header_io.ReadVcfHeaders(input_pattern)
headers = pipeline | vcf_header_io.ReadVcfHeaders(all_patterns[0])

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this safe? What if all_patterns is empty? Are you just counting on this not being called unless there is at least one pattern in the list?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be - we actually had this discussion before hand with regards to having validation on multiple places or only at the beginning and decided that it probably isn't worth to copy an unreachable code all over the pipeline, so long as the validation in the beginning is done right. So yeah, I'm counting that all_patterns has been verified until this point and every pattern in all patterns has at least 1 match in the filesystem

Copy link
Collaborator Author

@tneymanov tneymanov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added integration tests for annotation pipeline, with and without sharding, which took quite a bit of trial and error, but eventually passed.

Running all the rest of integration tests now.

| vcf_header_io.ReadAllVcfHeaders())
else:
headers = pipeline | vcf_header_io.ReadVcfHeaders(input_pattern)
headers = pipeline | vcf_header_io.ReadVcfHeaders(all_patterns[0])
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be - we actually had this discussion before hand with regards to having validation on multiple places or only at the beginning and decided that it probably isn't worth to copy an unreachable code all over the pipeline, so long as the validation in the beginning is done right. So yeah, I'm counting that all_patterns has been verified until this point and every pattern in all patterns has at least 1 match in the filesystem

Copy link
Contributor

@allieychen allieychen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you Tural. LGTM!

@tneymanov tneymanov merged commit b0343a3 into googlegenomics:master Mar 25, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants