Skip to content

Commit

Permalink
issue #1014 - refactor so that all places pull out same MIV for pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
davmlaw committed May 30, 2024
1 parent 623a08e commit 1348cb6
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 33 deletions.
11 changes: 9 additions & 2 deletions upload/models/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ class UploadStep(models.Model):
CREATE_UNKNOWN_LOCI_AND_VARIANTS_TASK_NAME = "Create Unknown Loci and Variants"
PREPROCESS_VCF_NAME = "Preprocess VCF"
PROCESS_VCF_TASK_NAME = "Process VCF File"
NORMALIZE_SUB_STEP = "normalize"

name = models.TextField()
upload_pipeline = models.ForeignKey(UploadPipeline, on_delete=CASCADE)
Expand Down Expand Up @@ -552,8 +553,14 @@ def message(self):


class ModifiedImportedVariants(VCFImportInfo):
LINKED_SUB_STEP_NAME = UploadStep.NORMALIZE_SUB_STEP
has_more_details = True

@staticmethod
def get_for_pipeline(upload_pipeline) -> 'ModifiedImportedVariants':
upload_step = upload_pipeline.uploadstep_set.get(name=ModifiedImportedVariants.LINKED_SUB_STEP_NAME)
return ModifiedImportedVariants.objects.get_or_create(upload_step=upload_step)[0]

@property
def message(self):
qs = self.modifiedimportedvariant_set.filter(old_variant__isnull=False)
Expand Down Expand Up @@ -615,7 +622,7 @@ def vt_format_old_variant(old_variant: str, genome_build: GenomeBuild) -> list[s
return formatted_old_variants

@staticmethod
def bcftools_format_old_variant(old_variant: str, genome_build: GenomeBuild) -> list[str]:
def bcftools_format_old_variant(old_variant: str, svlen: Optional[str], genome_build: GenomeBuild) -> list[str]:
""" We need consistent formatting (case and use of chrom) so we can retrieve it easily.
May return multiple values """
formatted_old_variants = []
Expand All @@ -632,7 +639,7 @@ def bcftools_format_old_variant(old_variant: str, genome_build: GenomeBuild) ->
alt = alt_list[0]

contig = genome_build.chrom_contig_mappings[chrom]
variant_coordinate = VariantCoordinate.from_explicit_no_svlen(contig.name, position, ref, alt)
variant_coordinate = VariantCoordinate(chrom=contig.name, position=position, ref=ref, alt=alt, svlen=svlen)
return [ModifiedImportedVariant.get_old_variant_from_variant_coordinate(variant_coordinate)]


Expand Down
7 changes: 3 additions & 4 deletions upload/tasks/vcf/genotype_vcf_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from snpdb.variant_zygosity_count import update_all_variant_zygosity_counts_for_vcf, \
create_variant_zygosity_counts
from upload.models import VCFPipelineStage, UploadStep, UploadStepTaskType, UploadedVCFPendingAnnotation, \
UploadPipeline, SimpleVCFImportInfo, SkipUploadStepException
UploadPipeline, SimpleVCFImportInfo, SkipUploadStepException, ModifiedImportedVariants
from upload.tasks.vcf.import_vcf_step_task import ImportVCFStepTask
from upload.upload_processing import process_upload_pipeline
from variantgrid.celery import app
Expand Down Expand Up @@ -103,14 +103,13 @@ class ProcessGenotypeVCFDataTask(ImportVCFStepTask):
(ie via ImportGenotypeVCFTask) - this can run in parallel """

def process_items(self, upload_step):
from upload.vcf.vcf_import import import_vcf_file, \
get_preprocess_vcf_import_info, genotype_vcf_processor_factory
from upload.vcf.vcf_import import import_vcf_file, genotype_vcf_processor_factory

upload_pipeline = upload_step.upload_pipeline
uploaded_vcf = upload_pipeline.uploadedvcf

vcf = uploaded_vcf.vcf
preprocess_vcf_import_info = get_preprocess_vcf_import_info(upload_pipeline)
preprocess_vcf_import_info = ModifiedImportedVariants.get_for_pipeline(upload_pipeline)
bulk_inserter = genotype_vcf_processor_factory(upload_step, vcf.cohort.cohort_genotype_collection,
uploaded_vcf, preprocess_vcf_import_info)
return import_vcf_file(upload_step, bulk_inserter)
Expand Down
10 changes: 5 additions & 5 deletions upload/tasks/vcf/import_vcf_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@
from snpdb.models import AlleleLiftover
from snpdb.models.models_enums import ProcessingStatus, AlleleConversionTool
from snpdb.bcftools_liftover import bcftools_liftover
from upload.models import UploadStep, UploadedVCF
from upload.models import UploadStep, UploadedVCF, ModifiedImportedVariants
from upload.tasks.vcf.import_vcf_step_task import ImportVCFStepTask
from upload.upload_processing import process_vcf_file
from upload.vcf.bulk_allele_linking_vcf_processor import BulkAlleleLinkingVCFProcessor, FailedLiftoverVCFProcessor
from upload.vcf.bulk_minimal_vcf_processor import BulkMinimalVCFProcessor
from upload.vcf.vcf_import import import_vcf_file, get_preprocess_vcf_import_info
from upload.vcf.vcf_import import import_vcf_file
from upload.vcf.vcf_preprocess import preprocess_vcf
from variantgrid.celery import app

Expand Down Expand Up @@ -95,7 +95,7 @@ class ProcessVCFSetMaxVariantTask(ImportVCFStepTask):
Can run in parallel on split VCFs """

def process_items(self, upload_step):
preprocess_vcf_import_info = get_preprocess_vcf_import_info(upload_step.upload_pipeline)
preprocess_vcf_import_info = ModifiedImportedVariants.get_for_pipeline(upload_step.upload_pipeline)
bulk_inserter = BulkMinimalVCFProcessor(upload_step, preprocess_vcf_import_info)
items_processed = import_vcf_file(upload_step, bulk_inserter)
return items_processed
Expand All @@ -107,7 +107,7 @@ class ProcessVCFLinkAllelesSetMaxVariantTask(ImportVCFStepTask):
Can run in parallel on split VCFs """

def process_items(self, upload_step):
preprocess_vcf_import_info = get_preprocess_vcf_import_info(upload_step.upload_pipeline)
preprocess_vcf_import_info = ModifiedImportedVariants.get_for_pipeline(upload_step.upload_pipeline)
bulk_inserter = BulkAlleleLinkingVCFProcessor(upload_step, preprocess_vcf_import_info)
items_processed = import_vcf_file(upload_step, bulk_inserter)
return items_processed
Expand Down Expand Up @@ -144,7 +144,7 @@ def _error(self, upload_step: UploadStep, error_message: str):
class LiftoverProcessFailureVCFTask(ImportVCFStepTask):

def process_items(self, upload_step: UploadStep):
preprocess_vcf_import_info = get_preprocess_vcf_import_info(upload_step.upload_pipeline)
preprocess_vcf_import_info = ModifiedImportedVariants.get_for_pipeline(upload_step.upload_pipeline)
bulk_inserter = FailedLiftoverVCFProcessor(upload_step, preprocess_vcf_import_info)
return import_vcf_file(upload_step, bulk_inserter)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ <h3>Skipped annotation</h3>
{% if has_modified_imported_variants %}
<div>
<h3>Modified Imported Variants</h3>
<p>We <a target='_blank' href="https://genome.sph.umich.edu/wiki/Vt#Normalization">Decompose and Normalise variants using VT</a> during import, so variants from different VCF files have a consistent representation.
<p>We decompose multi-allelic variants and normalise indels (using <a href="https://samtools.github.io/bcftools/bcftools.html#norm">BCFtools</a>) during import, so variants from different VCF files have a consistent representation. Normalization is the left-aligned minimal representation of an indel

{% load jqgrid_tags %}
{% jqgrid 'upload_pipeline_modified_variants_grid' name='Modified Imported Variant' search=False upload_pipeline_id=upload_pipeline.pk %}
Expand Down
11 changes: 7 additions & 4 deletions upload/vcf/abstract_bulk_vcf_processor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os

import cyvcf2
from django.conf import settings

from library.django_utils.django_file_utils import get_import_processing_filename
Expand Down Expand Up @@ -52,9 +53,11 @@ def get_max_variant_id(self):

return None # No variants, or only reference

def add_modified_imported_variant(self, variant, variant_hash, miv_hash_list=None, miv_list=None):
def add_modified_imported_variant(self, variant: cyvcf2.Variant, variant_hash, miv_hash_list=None, miv_list=None):
# This used to handle VT tags: OLD_MULTIALLELIC / OLD_VARIANT but now we handle BCFTOOLS only
if bcftools_old_variant := variant.INFO.get(ModifiedImportedVariant.BCFTOOLS_OLD_VARIANT_TAG):
svlen = variant.INFO.get("SVLEN")

if miv_hash_list is None:
miv_hash_list = self.modified_imported_variant_hashes
if miv_list is None:
Expand All @@ -66,12 +69,12 @@ def add_modified_imported_variant(self, variant, variant_hash, miv_hash_list=Non
old_multiallelic = None

old_position = int(bcftools_old_variant.split("|")[1])
if old_position != variant.locus.position:
old_variant = old_position
if old_position != variant.POS:
old_variant = bcftools_old_variant
else:
old_variant = None # Wasn't normalized

for ov in ModifiedImportedVariant.bcftools_format_old_variant(bcftools_old_variant, self.genome_build):
for ov in ModifiedImportedVariant.bcftools_format_old_variant(bcftools_old_variant, svlen, self.genome_build):
# These 2 need to be in sync
miv_hash_list.append(variant_hash)
miv_list.append((old_multiallelic, old_variant, ov))
Expand Down
11 changes: 1 addition & 10 deletions upload/vcf/vcf_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from snpdb.models.models_genome import GenomeBuild
from snpdb.tasks.cohort_genotype_tasks import create_cohort_genotype_collection
from upload.models import UploadedVCF, PipelineFailedJobTerminateEarlyException, \
BackendVCF, UploadStep, ModifiedImportedVariants, UploadStepTaskType, VCFPipelineStage
BackendVCF, UploadStep, UploadStepTaskType, VCFPipelineStage
from upload.tasks.vcf.import_sql_copy_task import ImportModifiedImportedVariantSQLCopyTask
from upload.vcf.bulk_genotype_vcf_processor import BulkGenotypeVCFProcessor
from upload.vcf.bulk_no_genotype_vcf_processor import BulkNoGenotypeVCFProcessor
Expand Down Expand Up @@ -267,15 +267,6 @@ def import_vcf_file(upload_step, bulk_inserter) -> int:
return bulk_inserter.rows_processed


def get_preprocess_vcf_import_info(upload_pipeline):
try:
preprocess_vcf_sub_step = upload_pipeline.uploadstep_set.get(name=UploadStep.PREPROCESS_VCF_NAME)
# ModifiedImportedVariants object was created in Preprocess upload step
return ModifiedImportedVariants.objects.get(upload_step=preprocess_vcf_sub_step)
except:
return None


def update_uploaded_vcf_max_variant(pk, max_inserted_variant_id):
""" This can be run in parallel """

Expand Down
10 changes: 4 additions & 6 deletions upload/vcf/vcf_preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ def preprocess_vcf(upload_step, remove_info=False, annotate_gnomad_af=False):
MAX_STDERR_OUTPUT = 5000 # How much stderr output per process to store in DB

VCF_CLEAN_AND_FILTER_SUB_STEP = "vcf_clean_and_filter"
NORMALIZE_SUB_STEP = "normalize"
REMOVE_HEADER_SUB_STEP = "remove_header"
SPLIT_VCF_SUB_STEP = "split_vcf"

Expand Down Expand Up @@ -111,13 +110,13 @@ def preprocess_vcf(upload_step, remove_info=False, annotate_gnomad_af=False):
pipe_commands[VCF_CLEAN_AND_FILTER_SUB_STEP] = read_variants_cmd
sub_steps[VCF_CLEAN_AND_FILTER_SUB_STEP] = create_sub_step(upload_step, VCF_CLEAN_AND_FILTER_SUB_STEP, read_variants_cmd)

pipe_commands[NORMALIZE_SUB_STEP] = [settings.BCFTOOLS_COMMAND, "norm",
pipe_commands[UploadStep.NORMALIZE_SUB_STEP] = [settings.BCFTOOLS_COMMAND, "norm",
"--multiallelics=-", "--rm-dup=exact",
"--check-ref=w",
f"--old-rec-tag={ModifiedImportedVariant.BCFTOOLS_OLD_VARIANT_TAG}",
f"--fasta-ref={genome_build.reference_fasta}", "-"]
pipe_commands[REMOVE_HEADER_SUB_STEP] = [settings.BCFTOOLS_COMMAND, "view", "--no-header", "-"]
norm_substep_names = [NORMALIZE_SUB_STEP]
norm_substep_names = [UploadStep.NORMALIZE_SUB_STEP]

# Split up the VCF
split_vcf_dir = upload_pipeline.get_pipeline_processing_subdir("split_vcf")
Expand Down Expand Up @@ -200,9 +199,8 @@ def preprocess_vcf(upload_step, remove_info=False, annotate_gnomad_af=False):
_store_vcf_skip_stats(skipped_records_stats_file, clean_sub_step, "records")
_store_vcf_skip_stats(skipped_filters_stats_file, clean_sub_step, "FILTER")

# Create this here so downstream tasks can add modified imported variant messages
normalize_sub_step = sub_steps[NORMALIZE_SUB_STEP]
import_info, _ = ModifiedImportedVariants.objects.get_or_create(upload_step=normalize_sub_step)
# Create this here so downstream tasks (running in parallel) can all link against the same one
ModifiedImportedVariants.get_for_pipeline(upload_pipeline)
vcf_import_annotate_dir = upload_pipeline.get_pipeline_processing_subdir("vcf_import_annotate")
sort_order = upload_pipeline.get_max_step_sort_order()
for split_vcf_filename in glob.glob(f"{split_vcf_dir}/*.vcf.gz"):
Expand Down
2 changes: 1 addition & 1 deletion variantopedia/templates/variantopedia/variant_details.html
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ <h3>Variant</h3>

{% if modified_normalised_variants.exists %}
<div class='alert alert-warning'>
<p>The following VCF variant records were <a target='_blank' href="https://genome.sph.umich.edu/wiki/Vt#Normalization">normalised by VT during import</a> and converted to this variant ({{ variant }}).
<p>The following VCF variant records were <a target='_blank' href="https://samtools.github.io/bcftools/bcftools.html#norm">normalised by Bcftools</a> during import and converted to this variant ({{ variant }}).
<ul>
{% for mnv in modified_normalised_variants %}
<li>{{ mnv }}
Expand Down

0 comments on commit 1348cb6

Please sign in to comment.