Skip to content

Commit

Permalink
refator vcf to bq script #178 (#194)
Browse files Browse the repository at this point in the history
- Before writing the generating conflicts report script for Issue 178(Basically creating another script similar to vcf_to_bq: parsing the arguments and define the pipelines), it is observed that there are some functions can be reused by vcf_to_bq and the preprocessor.

- Therefore, refactored the vcf_to_bq and moved some common functions that can be shared by vcf_to_bq and the vcf_to_bq_preprocessor to a new module.

Type hints (Issue 189) are also added.

Tested: unit tests
  • Loading branch information
allieychen committed Apr 20, 2018
1 parent 6b0090b commit b5ae3e7
Show file tree
Hide file tree
Showing 5 changed files with 270 additions and 158 deletions.
17 changes: 16 additions & 1 deletion .idea/inspectionProfiles/Project_Default.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

141 changes: 36 additions & 105 deletions gcp_variant_transforms/vcf_to_bq.py
Expand Up @@ -33,32 +33,30 @@

from __future__ import absolute_import

import argparse
import argparse # pylint: disable=unused-import
import datetime
import enum
import logging
import sys
import tempfile
from typing import List, Optional # pylint: disable=unused-import

import apache_beam as beam
from apache_beam.io.filesystems import FileSystems
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam import pvalue # pylint: disable=unused-import
from apache_beam.io import filesystems
from apache_beam.options import pipeline_options

from gcp_variant_transforms.beam_io import vcf_header_io
from gcp_variant_transforms.beam_io import vcfio
from gcp_variant_transforms import vcf_to_bq_common
from gcp_variant_transforms.libs import metrics_util
from gcp_variant_transforms.libs import processed_variant
from gcp_variant_transforms.libs import vcf_header_parser
from gcp_variant_transforms.libs.variant_merge import merge_with_non_variants_strategy
from gcp_variant_transforms.libs.variant_merge import move_to_calls_strategy
from gcp_variant_transforms.libs import processed_variant
from gcp_variant_transforms.libs.variant_merge import variant_merge_strategy # pylint: disable=unused-import
from gcp_variant_transforms.options import variant_transform_options
from gcp_variant_transforms.transforms import filter_variants
from gcp_variant_transforms.transforms import merge_headers
from gcp_variant_transforms.transforms import merge_variants
from gcp_variant_transforms.transforms import variant_to_bigquery
from gcp_variant_transforms.transforms import infer_undefined_headers

_COMMAND_LINE_OPTIONS = [
variant_transform_options.VcfReadOptions,
Expand All @@ -68,22 +66,13 @@
variant_transform_options.MergeOptions,
]

# If the # of files matching the input file_pattern exceeds this value, then
# headers will be merged in beam.
_SMALL_DATA_THRESHOLD = 100
_LARGE_DATA_THRESHOLD = 50000
_MERGE_HEADERS_FILE_NAME = 'merged_headers.vcf'
_MERGE_HEADERS_JOB_NAME = 'merge-vcf-headers'


class PipelineModes(enum.Enum):
"""An Enum specifying the mode of the pipeline based on the data size."""
SMALL = 0
MEDIUM = 1
LARGE = 2


def _get_variant_merge_strategy(known_args):
def _get_variant_merge_strategy(known_args # type: argparse.Namespace
):
# type: (...) -> Optional(variant_merge_strategy.VariantMergeStrategy)
merge_options = variant_transform_options.MergeOptions
if (not known_args.variant_merge_strategy or
known_args.variant_merge_strategy == merge_options.NONE):
Expand All @@ -103,48 +92,13 @@ def _get_variant_merge_strategy(known_args):
raise ValueError('Merge strategy is not supported.')


def _read_variants(pipeline, known_args):
"""Helper method for returning a ``PCollection`` of Variants from VCFs."""
if known_args.optimize_for_large_inputs:
variants = (pipeline
| 'InputFilePattern' >> beam.Create(
[known_args.input_pattern])
| 'ReadAllFromVcf' >> vcfio.ReadAllFromVcf(
allow_malformed_records=(
known_args.allow_malformed_records)))
else:
variants = pipeline | 'ReadFromVcf' >> vcfio.ReadFromVcf(
known_args.input_pattern,
allow_malformed_records=known_args.allow_malformed_records)
return variants


def _get_pipeline_mode(known_args):
"""Returns the mode the pipeline should operate in based on input size."""
if known_args.optimize_for_large_inputs:
return PipelineModes.LARGE

match_results = FileSystems.match([known_args.input_pattern])
if not match_results:
raise ValueError('No files matched input_pattern: {}'.format(
known_args.input_pattern))

total_files = len(match_results[0].metadata_list)
if total_files > _LARGE_DATA_THRESHOLD:
return PipelineModes.LARGE
elif total_files > _SMALL_DATA_THRESHOLD:
return PipelineModes.MEDIUM

return PipelineModes.SMALL

def _add_inferred_headers(pipeline, known_args, merged_header):
inferred_headers = (
_read_variants(pipeline, known_args)
| 'FilterVariants' >> filter_variants.FilterVariants(
reference_names=known_args.reference_names)
| ' InferUndefinedHeaderFields' >>
infer_undefined_headers.InferUndefinedHeaderFields(
beam.pvalue.AsSingleton(merged_header)))
def _add_inferred_headers(pipeline, # type: beam.Pipeline
known_args, # type: argparse.Namespace
merged_header # type: pvalue.PCollection
):
# type: (...) -> pvalue.PCollection
inferred_headers = vcf_to_bq_common.get_inferred_headers(pipeline, known_args,
merged_header)
merged_header = (
(inferred_headers, merged_header)
| beam.Flatten()
Expand All @@ -154,19 +108,19 @@ def _add_inferred_headers(pipeline, known_args, merged_header):


def _merge_headers(known_args, pipeline_args, pipeline_mode):
# type: (argparse.Namespace, List[str], int) -> None
"""Merges VCF headers using beam based on pipeline_mode."""
if known_args.representative_header_file:
return

options = PipelineOptions(pipeline_args)
options = pipeline_options.PipelineOptions(pipeline_args)

# Always run pipeline locally if data is small.
if (pipeline_mode == PipelineModes.SMALL and
if (pipeline_mode == vcf_to_bq_common.PipelineModes.SMALL and
not known_args.infer_undefined_headers):
options.view_as(StandardOptions).runner = 'DirectRunner'

options.view_as(pipeline_options.StandardOptions).runner = 'DirectRunner'

google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options = options.view_as(pipeline_options.GoogleCloudOptions)
if google_cloud_options.job_name:
google_cloud_options.job_name += '-' + _MERGE_HEADERS_JOB_NAME
else:
Expand All @@ -179,50 +133,26 @@ def _merge_headers(known_args, pipeline_args, pipeline_mode):
datetime.datetime.now().strftime('%Y%m%d-%H%M%S'),
google_cloud_options.job_name,
_MERGE_HEADERS_FILE_NAME])
known_args.representative_header_file = FileSystems.join(
known_args.representative_header_file = filesystems.FileSystems.join(
temp_directory, temp_merged_headers_file_name)

with beam.Pipeline(options=options) as p:
headers = p
if pipeline_mode == PipelineModes.LARGE:
headers |= (beam.Create([known_args.input_pattern])
| vcf_header_io.ReadAllVcfHeaders())
else:
headers |= vcf_header_io.ReadVcfHeaders(known_args.input_pattern)

merged_header = (headers
| 'MergeHeaders' >> merge_headers.MergeHeaders(
known_args.split_alternate_allele_info_fields,
known_args.allow_incompatible_records))

headers = vcf_to_bq_common.read_headers(p, pipeline_mode, known_args)
merged_header = vcf_to_bq_common.get_merged_headers(headers, known_args)
if known_args.infer_undefined_headers:
merged_header = _add_inferred_headers(p, known_args, merged_header)

_ = (merged_header | 'WriteHeaders' >> vcf_header_io.WriteVcfHeaders(
known_args.representative_header_file))

def _add_parser_arguments(options, parser):
for transform_options in options:
transform_options.add_arguments(parser)


def _validate_args(options, parsed_args):
for transform_options in options:
transform_options.validate(parsed_args)
vcf_to_bq_common.write_headers(merged_header,
known_args.representative_header_file)


def run(argv=None):
# type: (List[str]) -> None
"""Runs VCF to BigQuery pipeline."""
logging.info('Command: %s', ' '.join(argv or sys.argv))
parser = argparse.ArgumentParser()
parser.register('type', 'bool', lambda v: v.lower() == 'true')
command_line_options = [option() for option in _COMMAND_LINE_OPTIONS]
_add_parser_arguments(command_line_options, parser)
known_args, pipeline_args = parser.parse_known_args(argv)
_validate_args(command_line_options, known_args)

known_args, pipeline_args = vcf_to_bq_common.parse_args(argv,
_COMMAND_LINE_OPTIONS)
variant_merger = _get_variant_merge_strategy(known_args)
pipeline_mode = _get_pipeline_mode(known_args)
pipeline_mode = vcf_to_bq_common.get_pipeline_mode(known_args)

# Starts a pipeline to merge VCF headers in beam if the total files that
# match the input pattern exceeds _SMALL_DATA_THRESHOLD
Expand All @@ -242,9 +172,9 @@ def run(argv=None):
known_args.minimal_vep_alt_matching,
counter_factory)

pipeline_options = PipelineOptions(pipeline_args)
pipeline = beam.Pipeline(options=pipeline_options)
variants = _read_variants(pipeline, known_args)
beam_pipeline_options = pipeline_options.PipelineOptions(pipeline_args)
pipeline = beam.Pipeline(options=beam_pipeline_options)
variants = vcf_to_bq_common.read_variants(pipeline, known_args)
variants |= 'FilterVariants' >> filter_variants.FilterVariants(
reference_names=known_args.reference_names)
if variant_merger:
Expand All @@ -267,6 +197,7 @@ def run(argv=None):

metrics_util.log_all_counters(result)


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
141 changes: 141 additions & 0 deletions gcp_variant_transforms/vcf_to_bq_common.py
@@ -0,0 +1,141 @@
# Copyright 2018 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Common functions that are used by both vcf_to_bq and vcf_to_bq_preprocessor.
It includes parsing the command line arguments, reading the input, applying the
PTransforms and writing the output.
"""

from typing import List # pylint: disable=unused-import
import argparse
import enum

import apache_beam as beam
from apache_beam import pvalue
from apache_beam.io import filesystems

from gcp_variant_transforms.beam_io import vcfio
from gcp_variant_transforms.beam_io import vcf_header_io
from gcp_variant_transforms.transforms import filter_variants
from gcp_variant_transforms.transforms import infer_undefined_headers
from gcp_variant_transforms.transforms import merge_headers

# If the # of files matching the input file_pattern exceeds this value, then
# headers will be merged in beam.
_SMALL_DATA_THRESHOLD = 100
_LARGE_DATA_THRESHOLD = 50000


class PipelineModes(enum.Enum):
"""An Enum specifying the mode of the pipeline based on the data size."""
SMALL = 0
MEDIUM = 1
LARGE = 2


def parse_args(argv, command_line_options):
# type: (List[str], List[type]) -> (argparse.Namespace, List[str])
"""Parses the arguments.
Args:
argv: A list of string representing the pipeline arguments.
command_line_options: A list of type ``VariantTransformsOptions`` that
specifies the options that will be added to parser.
"""
parser = argparse.ArgumentParser()
parser.register('type', 'bool', lambda v: v.lower() == 'true')
options = [option() for option in command_line_options]
for transform_options in options:
transform_options.add_arguments(parser)
known_args, pipeline_args = parser.parse_known_args(argv)
for transform_options in options:
transform_options.validate(known_args)
return known_args, pipeline_args


def get_pipeline_mode(known_args):
# type: (argparse.Namespace) -> int
"""Returns the mode the pipeline should operate in based on input size."""
if known_args.optimize_for_large_inputs:
return PipelineModes.LARGE

match_results = filesystems.FileSystems.match([known_args.input_pattern])
if not match_results:
raise ValueError('No files matched input_pattern: {}'.format(
known_args.input_pattern))

total_files = len(match_results[0].metadata_list)
if total_files > _LARGE_DATA_THRESHOLD:
return PipelineModes.LARGE
elif total_files > _SMALL_DATA_THRESHOLD:
return PipelineModes.MEDIUM
return PipelineModes.SMALL


def read_variants(pipeline, known_args):
# type: (beam.Pipeline, argparse.Namespace) -> pvalue.PCollection
"""Helper method for returning a PCollection of Variants from VCFs."""
if known_args.optimize_for_large_inputs:
variants = (pipeline
| 'InputFilePattern' >> beam.Create([known_args.input_pattern])
| 'ReadAllFromVcf' >> vcfio.ReadAllFromVcf(
allow_malformed_records=(
known_args.allow_malformed_records)))
else:
variants = pipeline | 'ReadFromVcf' >> vcfio.ReadFromVcf(
known_args.input_pattern,
allow_malformed_records=known_args.allow_malformed_records)
return variants


def get_inferred_headers(pipeline, # type: beam.Pipeline
known_args, # type: argparse.Namespace
merged_header # type: pvalue.PCollection
):
# type: (...) -> pvalue.PCollection
"""Infers the missing headers."""
return (read_variants(pipeline, known_args)
| 'FilterVariants' >> filter_variants.FilterVariants(
reference_names=known_args.reference_names)
| ' InferUndefinedHeaderFields' >>
infer_undefined_headers.InferUndefinedHeaderFields(
pvalue.AsSingleton(merged_header)))


def read_headers(pipeline, pipeline_mode, known_args):
# type: (beam.Pipeline, int, argparse.Namespace) -> pvalue.PCollection
"""Creates an initial PCollection by reading the VCF file headers."""
if pipeline_mode == PipelineModes.LARGE:
headers = (pipeline
| beam.Create([known_args.input_pattern])
| vcf_header_io.ReadAllVcfHeaders())
else:
headers = pipeline | vcf_header_io.ReadVcfHeaders(known_args.input_pattern)
return headers


def get_merged_headers(headers, known_args):
# type: (pvalue.PCollection, argparse.Namespace) -> pvalue.PCollection
"""Applies the ``MergeHeaders`` PTransform on PCollection of ``VcfHeader``."""
return (headers | 'MergeHeaders' >> merge_headers.MergeHeaders(
known_args.split_alternate_allele_info_fields,
known_args.allow_incompatible_records))


def write_headers(merged_header, file_path):
# type: (pvalue.PCollection, str) -> None
"""Writes a PCollection of ``VcfHeader`` to location ``file_path``."""
_ = (merged_header | 'WriteHeaders' >>
vcf_header_io.WriteVcfHeaders(file_path))

0 comments on commit b5ae3e7

Please sign in to comment.