Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 125 additions & 35 deletions src/mavedb/lib/score_sets.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import re
from collections import Counter
from operator import attrgetter
from typing import TYPE_CHECKING, Any, BinaryIO, Iterable, List, Literal, Optional, Sequence
from typing import TYPE_CHECKING, Any, BinaryIO, Iterable, List, Optional, Sequence

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -37,6 +37,8 @@
from mavedb.models.experiment_controlled_keyword import ExperimentControlledKeywordAssociation
from mavedb.models.experiment_publication_identifier import ExperimentPublicationIdentifierAssociation
from mavedb.models.experiment_set import ExperimentSet
from mavedb.models.clinical_control import ClinicalControl
from mavedb.models.clinical_control_mapped_variant import mapped_variants_clinical_controls_association_table
from mavedb.models.gnomad_variant import GnomADVariant
from mavedb.models.mapped_variant import MappedVariant
from mavedb.models.publication_identifier import PublicationIdentifier
Expand All @@ -63,6 +65,10 @@

logger = logging.getLogger(__name__)

# Pattern for ClinVar-versioned namespaces of the form "clinvar.YEAR_MONTH",
# e.g. "clinvar.2024_01" for January 2024.
CLINVAR_NS_PATTERN = re.compile(r"^clinvar\.(\d+)_(0[1-9]|1[0-2])$")


class HGVSColumns:
NUCLEOTIDE: str = "hgvs_nt" # dataset.constants.hgvs_nt_column
Expand Down Expand Up @@ -531,7 +537,7 @@ def find_publish_or_private_superseded_score_set_tail(
def get_score_set_variants_as_csv(
db: Session,
score_set: ScoreSet,
namespaces: List[Literal["scores", "counts", "vep", "gnomad", "clingen"]],
namespaces: List[str],
namespaced: Optional[bool] = None,
start: Optional[int] = None,
limit: Optional[int] = None,
Expand All @@ -548,8 +554,10 @@ def get_score_set_variants_as_csv(
The database session to use.
score_set : ScoreSet
The score set to get the variants from.
namespaces : List[Literal["scores", "counts", "vep", "gnomad", "clingen"]]
The namespaces for data. Now there are only scores, counts, VEP, gnomAD, and ClinGen. ClinVar will be added in the future.
namespaces : List[str]
The namespaces for data: "scores", "counts", "vep", "gnomad", "clingen", and/or
ClinVar-versioned namespaces of the form "clinvar.YEAR_MONTH" (e.g. "clinvar.2024_01"
for January 2024, which joins on db_name="ClinVar" and db_version="01_2024").
namespaced: Optional[bool] = None
Whether namespace the columns or not.
start : int, optional
Expand Down Expand Up @@ -600,11 +608,26 @@ def get_score_set_variants_as_csv(
namespaced_score_set_columns["gnomad"].append("gnomad_af")
if "clingen" in namespaced_score_set_columns:
namespaced_score_set_columns["clingen"].append("clingen_allele_id")

# Parse ClinVar-versioned namespaces of the form "clinvar.YEAR_MONTH".
# The corresponding db_version stored in clinical_controls is "MONTH_YEAR".
clinvar_namespaces: dict[str, str] = {} # namespace -> db_version (MONTH_YEAR)
for ns in namespaces:
m = CLINVAR_NS_PATTERN.match(ns)
if m:
year, month = m.group(1), m.group(2)
db_version = f"{month}_{year}"
clinvar_namespaces[ns] = db_version
namespaced_score_set_columns[ns] = ["clinical_significance", "clinical_review_status"]

variants: Sequence[Variant] = []
mappings: Optional[list[Optional[MappedVariant]]] = None
gnomad_data: Optional[list[Optional[GnomADVariant]]] = None

if "gnomad" in namespaces and include_post_mapped_hgvs:
# Mappings are needed whenever post-mapped HGVS or any ClinVar namespace is requested.
need_mappings = bool(include_post_mapped_hgvs or clinvar_namespaces)

if "gnomad" in namespaces and need_mappings:
variants_mappings_and_gnomad_query = (
select(Variant, MappedVariant, GnomADVariant)
.join(
Expand Down Expand Up @@ -640,7 +663,7 @@ def get_score_set_variants_as_csv(
variants.append(variant)
mappings.append(mapping)
gnomad_data.append(gnomad)
elif include_post_mapped_hgvs:
elif need_mappings:
variants_and_mappings_query = (
select(Variant, MappedVariant)
.join(
Expand Down Expand Up @@ -707,22 +730,68 @@ def get_score_set_variants_as_csv(
if limit:
variants_query = variants_query.limit(limit)
variants = db.scalars(variants_query).all()

# For each ClinVar namespace, fetch a mapping from mapped_variant_id to ClinicalControl.
clinvar_data_map: dict[str, dict[int, Optional[ClinicalControl]]] = {}
if clinvar_namespaces and mappings is not None:
mv_ids = [m.id for m in mappings if m is not None]
for ns, db_version in clinvar_namespaces.items():
mv_to_cc: dict[int, Optional[ClinicalControl]] = {}
if mv_ids:
aliased_cc = aliased(ClinicalControl)
cc_query = (
select(
mapped_variants_clinical_controls_association_table.c.mapped_variant_id,
aliased_cc,
)
.join(
aliased_cc,
mapped_variants_clinical_controls_association_table.c.clinical_control_id == aliased_cc.id,
)
.where(
and_(
mapped_variants_clinical_controls_association_table.c.mapped_variant_id.in_(mv_ids),
aliased_cc.db_name == "ClinVar",
aliased_cc.db_version == db_version,
)
)
)
for mv_id, cc in db.execute(cc_query).all():
mv_to_cc[mv_id] = cc
clinvar_data_map[ns] = mv_to_cc

# Build per-variant ClinVar lookup (list indexed in parallel with variants).
clinvar_per_variant: Optional[list[Optional[dict[str, Optional[ClinicalControl]]]]] = None
if clinvar_namespaces and mappings is not None:
clinvar_per_variant = []
for mapping in mappings:
row_clinvar: dict[str, Optional[ClinicalControl]] = {}
for ns, mv_to_cc in clinvar_data_map.items():
row_clinvar[ns] = mv_to_cc.get(mapping.id) if mapping is not None else None
clinvar_per_variant.append(row_clinvar)

rows_data = variants_to_csv_rows(
variants,
columns=namespaced_score_set_columns,
namespaced=namespaced,
mappings=mappings,
gnomad_data=gnomad_data,
clinvar_data_by_ns=clinvar_per_variant,
) # type: ignore
rows_columns = [
(
f"{namespace}.{col}"
if (namespaced and namespace not in ["core", "mavedb"])
else (f"mavedb.{col}" if namespaced and namespace == "mavedb" else col)
)
for namespace, cols in namespaced_score_set_columns.items()
for col in cols
]

rows_columns = []
for namespace, cols in namespaced_score_set_columns.items():
for col in cols:
if CLINVAR_NS_PATTERN.match(namespace):
# ClinVar versioned namespaces always include the full namespace prefix
# to avoid column-name collisions when multiple versions are requested.
rows_columns.append(f"{namespace}.{col}")
elif namespaced and namespace not in ["core", "mavedb"]:
rows_columns.append(f"{namespace}.{col}")
elif namespaced and namespace == "mavedb":
rows_columns.append(f"mavedb.{col}")
else:
rows_columns.append(col)

if drop_na_columns:
rows_data, rows_columns = drop_na_columns_from_csv_file_rows(rows_data, rows_columns)
Expand Down Expand Up @@ -769,6 +838,7 @@ def variant_to_csv_row(
columns: dict[str, list[str]],
mapping: Optional[MappedVariant] = None,
gnomad_data: Optional[GnomADVariant] = None,
clinvar_data_by_ns: Optional[dict[str, Optional[ClinicalControl]]] = None,
namespaced: Optional[bool] = None,
na_rep="NA",
) -> dict[str, Any]:
Expand All @@ -787,6 +857,8 @@ def variant_to_csv_row(
Mapped variant corresponding to the variant.
gnomad_data : variant.models.GnomADVariant, optional
gnomAD variant data corresponding to the variant.
clinvar_data_by_ns : dict[str, Optional[ClinicalControl]], optional
Per-variant ClinVar data keyed by namespace (e.g. "clinvar.2024_01").
na_rep : str
String to represent null values.

Expand Down Expand Up @@ -885,6 +957,23 @@ def variant_to_csv_row(
value = na_rep
key = f"clingen.{column_key}" if namespaced else column_key
row[key] = value
# Handle ClinVar-versioned namespaces (e.g. "clinvar.2024_01").
# These always use the full "namespace.column" key regardless of the namespaced flag
# to avoid collisions when multiple versions are requested.
for namespace_key, namespace_cols in columns.items():
if not CLINVAR_NS_PATTERN.match(namespace_key):
continue
clinvar_entry = (clinvar_data_by_ns or {}).get(namespace_key)
for column_key in namespace_cols:
if column_key == "clinical_significance":
value = str(clinvar_entry.clinical_significance) if clinvar_entry else na_rep
elif column_key == "clinical_review_status":
value = str(clinvar_entry.clinical_review_status) if clinvar_entry else na_rep
else:
value = na_rep
if is_null(value):
value = na_rep
row[f"{namespace_key}.{column_key}"] = value
return row


Expand All @@ -893,6 +982,7 @@ def variants_to_csv_rows(
columns: dict[str, list[str]],
mappings: Optional[Sequence[Optional[MappedVariant]]] = None,
gnomad_data: Optional[Sequence[Optional[GnomADVariant]]] = None,
clinvar_data_by_ns: Optional[Sequence[Optional[dict[str, Optional[ClinicalControl]]]]] = None,
namespaced: Optional[bool] = None,
na_rep="NA",
) -> Iterable[dict[str, Any]]:
Expand All @@ -911,33 +1001,33 @@ def variants_to_csv_rows(
List of mapped variants corresponding to the variants.
gnomad_data : list[Optional[variant.models.GnomADVariant]], optional
List of gnomAD variant data corresponding to the variants.
clinvar_data_by_ns : list[Optional[dict[str, Optional[ClinicalControl]]]], optional
Per-variant ClinVar data keyed by namespace (e.g. "clinvar.2024_01").
na_rep : str
String to represent null values.

Returns
-------
list[dict[str, Any]]
"""
if mappings is not None and gnomad_data is not None:
return map(
lambda zipped: variant_to_csv_row(
zipped[0], columns, mapping=zipped[1], gnomad_data=zipped[2], namespaced=namespaced, na_rep=na_rep
),
zip(variants, mappings, gnomad_data),
)
elif mappings is not None:
return map(
lambda pair: variant_to_csv_row(pair[0], columns, mapping=pair[1], namespaced=namespaced, na_rep=na_rep),
zip(variants, mappings),
)
elif gnomad_data is not None:
return map(
lambda pair: variant_to_csv_row(
pair[0], columns, gnomad_data=pair[1], namespaced=namespaced, na_rep=na_rep
),
zip(variants, gnomad_data),
)
return map(lambda v: variant_to_csv_row(v, columns, namespaced=namespaced, na_rep=na_rep), variants)
n = len(variants)
_mappings: Sequence[Optional[MappedVariant]] = mappings if mappings is not None else [None] * n
_gnomad: Sequence[Optional[GnomADVariant]] = gnomad_data if gnomad_data is not None else [None] * n
_clinvar: Sequence[Optional[dict[str, Optional[ClinicalControl]]]] = (
clinvar_data_by_ns if clinvar_data_by_ns is not None else [None] * n
)
return map(
lambda t: variant_to_csv_row(
t[0],
columns,
mapping=t[1],
gnomad_data=t[2],
clinvar_data_by_ns=t[3],
namespaced=namespaced,
na_rep=na_rep,
),
zip(variants, _mappings, _gnomad, _clinvar),
)


def find_meta_analyses_for_score_sets(db: Session, urns: list[str]) -> list[ScoreSet]:
Expand Down
6 changes: 3 additions & 3 deletions src/mavedb/lib/validation/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ def transform_score_set_list_to_urn_list(
return [score_set.urn for score_set in score_sets]
else:
return [
score_set.urn
for score_set in score_sets
if score_set.superseding_score_set is None or score_set.superseding_score_set.published_date is None
score_set.urn
for score_set in score_sets
if score_set.superseding_score_set is None or score_set.superseding_score_set.published_date is None
]


Expand Down
36 changes: 28 additions & 8 deletions src/mavedb/routers/score_sets.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
import time
from datetime import date, datetime
from typing import Any, List, Literal, Optional, Sequence, TypedDict, Union
from typing import Any, List, Optional, Sequence, TypedDict, Union

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -48,6 +48,7 @@
from mavedb.lib.permissions import Action, assert_permission, has_permission
from mavedb.lib.score_calibrations import create_score_calibration
from mavedb.lib.score_sets import (
CLINVAR_NS_PATTERN,
csv_data_to_df,
fetch_score_set_search_filter_options,
find_meta_analyses_for_experiment_sets,
Expand Down Expand Up @@ -714,8 +715,13 @@ def get_score_set_variants_csv(
urn: str,
start: int = Query(default=None, description="Start index for pagination"),
limit: int = Query(default=None, description="Maximum number of variants to return"),
namespaces: List[Literal["scores", "counts", "vep", "gnomad", "clingen"]] = Query(
default=["scores"], description="One or more data types to include: scores, counts, ClinGen, gnomAD, VEP"
namespaces: List[str] = Query(
default=["scores"],
description=(
'One or more data types to include: "scores", "counts", "vep", "gnomad", "clingen", '
'and/or ClinVar-versioned namespaces of the form "clinvar.YEAR_MONTH" '
'(e.g. "clinvar.2024_01" for January 2024).'
),
),
drop_na_columns: Optional[bool] = None,
include_custom_columns: Optional[bool] = None,
Expand All @@ -729,9 +735,6 @@ def get_score_set_variants_csv(
This differs from get_score_set_scores_csv() in that it returns only the HGVS columns, score column, and mapped HGVS
string.

TODO (https://github.com/VariantEffect/mavedb-api/issues/446) We may add another function for ClinVar and gnomAD.
export endpoint, with options governing which columns to include.

Parameters
__________
urn : str
Expand All @@ -740,9 +743,11 @@ def get_score_set_variants_csv(
The index to start from. If None, starts from the beginning.
limit : Optional[int]
The maximum number of variants to return. If None, returns all variants.
namespaces: List[Literal["scores", "counts", "vep", "gnomad", "clingen"]]
namespaces: List[str]
The namespaces of all columns except for accession, hgvs_nt, hgvs_pro, and hgvs_splice.
We may add ClinVar in the future.
Supported values: "scores", "counts", "vep", "gnomad", "clingen", and ClinVar-versioned
namespaces of the form "clinvar.YEAR_MONTH" (e.g. "clinvar.2024_01" for January 2024).
Multiple ClinVar namespaces with different YEAR_MONTH values may be requested simultaneously.
drop_na_columns : bool, optional
Whether to drop columns that contain only NA values. Defaults to False.
db : Session
Expand Down Expand Up @@ -772,6 +777,21 @@ def get_score_set_variants_csv(
logger.info(msg="Could not fetch scores with non-positive limit.", extra=logging_context())
raise HTTPException(status_code=422, detail="Limit must be positive")

_VALID_STATIC_NAMESPACES = {"scores", "counts", "vep", "gnomad", "clingen"}
invalid_namespaces = [
ns for ns in namespaces if ns not in _VALID_STATIC_NAMESPACES and not CLINVAR_NS_PATTERN.match(ns)
]
if invalid_namespaces:
raise HTTPException(
status_code=422,
detail=(
f"Invalid namespace(s): {invalid_namespaces}. "
'Each namespace must be one of "scores", "counts", "vep", "gnomad", "clingen", '
'or a ClinVar-versioned namespace of the form "clinvar.YEAR_MM" '
'(e.g. "clinvar.2024_01" for January 2024).'
),
)

score_set = db.query(ScoreSet).filter(ScoreSet.urn == urn).first()
if not score_set:
logger.info(msg="Could not fetch the requested scores; No such score set exists.", extra=logging_context())
Expand Down
Loading
Loading