Skip to content

Commit

Permalink
reformatted with yapf
Browse files Browse the repository at this point in the history
  • Loading branch information
reece committed Apr 18, 2019
1 parent 30393b0 commit 5291f1a
Show file tree
Hide file tree
Showing 48 changed files with 783 additions and 407 deletions.
82 changes: 49 additions & 33 deletions hgvs/alignmentmapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ class AlignmentMapper(object):
:param str alt_aln_method: string representing the alignment method; valid values depend on data source
"""
__slots__ = ("tx_ac", "alt_ac", "alt_aln_method", "strand", "gc_offset", "cds_start_i", "cds_end_i", "tgt_len",
"cigar", "ref_pos", "tgt_pos", "cigar_op")
__slots__ = ("tx_ac", "alt_ac", "alt_aln_method", "strand", "gc_offset", "cds_start_i",
"cds_end_i", "tgt_len", "cigar", "ref_pos", "tgt_pos", "cigar_op")

def __init__(self, hdp, tx_ac, alt_ac, alt_aln_method):
self.tx_ac = tx_ac
Expand All @@ -40,25 +40,27 @@ def __init__(self, hdp, tx_ac, alt_ac, alt_aln_method):
if self.alt_aln_method != "transcript":
tx_info = hdp.get_tx_info(self.tx_ac, self.alt_ac, self.alt_aln_method)
if tx_info is None:
raise HGVSDataNotAvailableError("AlignmentMapper(tx_ac={self.tx_ac}, "
"alt_ac={self.alt_ac}, alt_aln_method={self.alt_aln_method}): "
"No transcript info".format(self=self))
raise HGVSDataNotAvailableError(
"AlignmentMapper(tx_ac={self.tx_ac}, "
"alt_ac={self.alt_ac}, alt_aln_method={self.alt_aln_method}): "
"No transcript info".format(self=self))

tx_exons = hdp.get_tx_exons(self.tx_ac, self.alt_ac, self.alt_aln_method)
if tx_exons is None:
raise HGVSDataNotAvailableError("AlignmentMapper(tx_ac={self.tx_ac}, "
"alt_ac={self.alt_ac}, alt_aln_method={self.alt_aln_method}): "
"No transcript exons".format(self=self))
raise HGVSDataNotAvailableError(
"AlignmentMapper(tx_ac={self.tx_ac}, "
"alt_ac={self.alt_ac}, alt_aln_method={self.alt_aln_method}): "
"No transcript exons".format(self=self))

# hgvs-386: An assumption when building the cigar string
# is that exons are adjacent. Assert that here.
sorted_tx_exons = sorted(tx_exons, key=lambda e: e["ord"])
for i in range(1, len(sorted_tx_exons)):
if sorted_tx_exons[i - 1]["tx_end_i"] != sorted_tx_exons[i]["tx_start_i"]:
raise HGVSDataNotAvailableError("AlignmentMapper(tx_ac={self.tx_ac}, "
"alt_ac={self.alt_ac}, alt_aln_method={self.alt_aln_method}): "
"Exons {a} and {b} are not adjacent".format(
self=self, a=i, b=i + 1))
raise HGVSDataNotAvailableError(
"AlignmentMapper(tx_ac={self.tx_ac}, "
"alt_ac={self.alt_ac}, alt_aln_method={self.alt_aln_method}): "
"Exons {a} and {b} are not adjacent".format(self=self, a=i, b=i + 1))

self.strand = tx_exons[0]["alt_strand"]
self.gc_offset = tx_exons[0]["alt_start_i"]
Expand All @@ -71,15 +73,17 @@ def __init__(self, hdp, tx_ac, alt_ac, alt_aln_method):
# this covers the identity cases n <-> c
tx_identity_info = hdp.get_tx_identity_info(self.tx_ac)
if tx_identity_info is None:
raise HGVSDataNotAvailableError("AlignmentMapper(tx_ac={self.tx_ac}, "
"alt_ac={self.alt_ac}, alt_aln_method={self.alt_aln_method}): "
"No transcript identity info".format(self=self))
raise HGVSDataNotAvailableError(
"AlignmentMapper(tx_ac={self.tx_ac}, "
"alt_ac={self.alt_ac}, alt_aln_method={self.alt_aln_method}): "
"No transcript identity info".format(self=self))
self.cds_start_i = tx_identity_info["cds_start_i"]
self.cds_end_i = tx_identity_info["cds_end_i"]
self.tgt_len = sum(tx_identity_info["lengths"])

assert not ((self.cds_start_i is None) ^
(self.cds_end_i is None)), "CDS start and end must both be defined or neither defined"
assert not (
(self.cds_start_i is None) ^
(self.cds_end_i is None)), "CDS start and end must both be defined or neither defined"

def __str__(self):
return "{self.__class__.__name__}: {self.tx_ac} ~ {self.alt_ac} ~ {self.alt_aln_method}; " \
Expand Down Expand Up @@ -144,17 +148,21 @@ def g_to_n(self, g_interval):

grs, gre = g_interval.start.base - 1 - self.gc_offset, g_interval.end.base - 1 - self.gc_offset
# frs, fre = (f)orward (r)na (s)tart & (e)nd; forward w.r.t. genome
frs, frs_offset, frs_cigar = self._map(from_pos=self.ref_pos, to_pos=self.tgt_pos, pos=grs, base="start")
fre, fre_offset, fre_cigar = self._map(from_pos=self.ref_pos, to_pos=self.tgt_pos, pos=gre, base="end")
frs, frs_offset, frs_cigar = self._map(
from_pos=self.ref_pos, to_pos=self.tgt_pos, pos=grs, base="start")
fre, fre_offset, fre_cigar = self._map(
from_pos=self.ref_pos, to_pos=self.tgt_pos, pos=gre, base="end")

if self.strand == -1:
frs, fre = self.tgt_len - fre - 1, self.tgt_len - frs - 1
frs_offset, fre_offset = -fre_offset, -frs_offset

# The returned interval would be uncertain when locating at alignment gaps
return hgvs.location.BaseOffsetInterval(
start=hgvs.location.BaseOffsetPosition(base=frs + 1, offset=frs_offset, datum=Datum.SEQ_START),
end=hgvs.location.BaseOffsetPosition(base=fre + 1, offset=fre_offset, datum=Datum.SEQ_START),
start=hgvs.location.BaseOffsetPosition(
base=frs + 1, offset=frs_offset, datum=Datum.SEQ_START),
end=hgvs.location.BaseOffsetPosition(
base=fre + 1, offset=fre_offset, datum=Datum.SEQ_START),
uncertain=frs_cigar in 'DI' or fre_cigar in 'DI')

def n_to_g(self, n_interval):
Expand All @@ -168,8 +176,10 @@ def n_to_g(self, n_interval):
start_offset, end_offset = -end_offset, -start_offset

# returns the genomic range start (grs) and end (gre)
grs, _, grs_cigar = self._map(from_pos=self.tgt_pos, to_pos=self.ref_pos, pos=frs, base="start")
gre, _, gre_cigar = self._map(from_pos=self.tgt_pos, to_pos=self.ref_pos, pos=fre, base="end")
grs, _, grs_cigar = self._map(
from_pos=self.tgt_pos, to_pos=self.ref_pos, pos=frs, base="start")
gre, _, gre_cigar = self._map(
from_pos=self.tgt_pos, to_pos=self.ref_pos, pos=fre, base="end")
grs, gre = grs + self.gc_offset + 1, gre + self.gc_offset + 1
gs, ge = grs + start_offset, gre + end_offset

Expand All @@ -184,10 +194,11 @@ def n_to_c(self, n_interval):

if self.cds_start_i is None: # cds_start_i defined iff cds_end_i defined; see assertion above
raise HGVSUsageError(
"CDS is undefined for {self.tx_ac}; cannot map to c. coordinate (non-coding transcript?)".format(
self=self))
"CDS is undefined for {self.tx_ac}; cannot map to c. coordinate (non-coding transcript?)"
.format(self=self))
if n_interval.start.base <= 0 or n_interval.end.base > self.tgt_len:
raise HGVSInvalidIntervalError("The given coordinate is outside the bounds of the reference sequence.")
raise HGVSInvalidIntervalError(
"The given coordinate is outside the bounds of the reference sequence.")

# start
if n_interval.start.base <= self.cds_start_i:
Expand All @@ -211,8 +222,10 @@ def n_to_c(self, n_interval):
ce_datum = Datum.CDS_END

c_interval = hgvs.location.BaseOffsetInterval(
start=hgvs.location.BaseOffsetPosition(base=cs, offset=n_interval.start.offset, datum=cs_datum),
end=hgvs.location.BaseOffsetPosition(base=ce, offset=n_interval.end.offset, datum=ce_datum),
start=hgvs.location.BaseOffsetPosition(
base=cs, offset=n_interval.start.offset, datum=cs_datum),
end=hgvs.location.BaseOffsetPosition(
base=ce, offset=n_interval.end.offset, datum=ce_datum),
uncertain=n_interval.uncertain)
return c_interval

Expand All @@ -221,8 +234,8 @@ def c_to_n(self, c_interval):

if self.cds_start_i is None: # cds_start_i defined iff cds_end_i defined; see assertion above
raise HGVSUsageError(
"CDS is undefined for {self.tx_ac}; cannot map from c. coordinate (non-coding transcript?)".format(
self=self))
"CDS is undefined for {self.tx_ac}; cannot map from c. coordinate (non-coding transcript?)"
.format(self=self))

# start
if c_interval.start.datum == Datum.CDS_START and c_interval.start.base < 0:
Expand All @@ -240,11 +253,14 @@ def c_to_n(self, c_interval):
r_end = c_interval.end.base + self.cds_end_i

if r_start <= 0 or r_end > self.tgt_len:
raise HGVSInvalidIntervalError("The given coordinate is outside the bounds of the reference sequence.")
raise HGVSInvalidIntervalError(
"The given coordinate is outside the bounds of the reference sequence.")

n_interval = hgvs.location.BaseOffsetInterval(
start=hgvs.location.BaseOffsetPosition(base=r_start, offset=c_interval.start.offset, datum=Datum.SEQ_START),
end=hgvs.location.BaseOffsetPosition(base=r_end, offset=c_interval.end.offset, datum=Datum.SEQ_START),
start=hgvs.location.BaseOffsetPosition(
base=r_start, offset=c_interval.start.offset, datum=Datum.SEQ_START),
end=hgvs.location.BaseOffsetPosition(
base=r_end, offset=c_interval.end.offset, datum=Datum.SEQ_START),
uncertain=c_interval.uncertain)
return n_interval

Expand Down
47 changes: 33 additions & 14 deletions hgvs/assemblymapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,23 @@ def __init__(self,
"""

super(AssemblyMapper, self).__init__(
hdp=hdp, replace_reference=replace_reference, prevalidation_level=prevalidation_level, *args, **kwargs)
hdp=hdp,
replace_reference=replace_reference,
prevalidation_level=prevalidation_level,
*args,
**kwargs)
self.assembly_name = assembly_name
self.alt_aln_method = alt_aln_method
self.normalize = normalize
self.in_par_assume = in_par_assume
self._norm = None
if self.normalize:
self._norm = hgvs.normalizer.Normalizer(hdp, alt_aln_method=alt_aln_method, validate=False)
self._assembly_map = {k: v for k, v in hdp.get_assembly_map(self.assembly_name).items() if k.startswith("NC_")}
self._norm = hgvs.normalizer.Normalizer(
hdp, alt_aln_method=alt_aln_method, validate=False)
self._assembly_map = {
k: v
for k, v in hdp.get_assembly_map(self.assembly_name).items() if k.startswith("NC_")
}
self._assembly_accessions = set(self._assembly_map.keys())

def __repr__(self):
Expand All @@ -79,30 +87,36 @@ def __repr__(self):
"replace_reference={self.replace_reference})".format(self=self, t=type(self)))

def g_to_c(self, var_g, tx_ac):
var_out = super(AssemblyMapper, self).g_to_c(var_g, tx_ac, alt_aln_method=self.alt_aln_method)
var_out = super(AssemblyMapper, self).g_to_c(
var_g, tx_ac, alt_aln_method=self.alt_aln_method)
return self._maybe_normalize(var_out)

def g_to_n(self, var_g, tx_ac):
var_out = super(AssemblyMapper, self).g_to_n(var_g, tx_ac, alt_aln_method=self.alt_aln_method)
var_out = super(AssemblyMapper, self).g_to_n(
var_g, tx_ac, alt_aln_method=self.alt_aln_method)
return self._maybe_normalize(var_out)

def g_to_t(self, var_g, tx_ac):
var_out = super(AssemblyMapper, self).g_to_t(var_g, tx_ac, alt_aln_method=self.alt_aln_method)
var_out = super(AssemblyMapper, self).g_to_t(
var_g, tx_ac, alt_aln_method=self.alt_aln_method)
return self._maybe_normalize(var_out)

def c_to_g(self, var_c):
alt_ac = self._alt_ac_for_tx_ac(var_c.ac)
var_out = super(AssemblyMapper, self).c_to_g(var_c, alt_ac, alt_aln_method=self.alt_aln_method)
var_out = super(AssemblyMapper, self).c_to_g(
var_c, alt_ac, alt_aln_method=self.alt_aln_method)
return self._maybe_normalize(var_out)

def n_to_g(self, var_n):
alt_ac = self._alt_ac_for_tx_ac(var_n.ac)
var_out = super(AssemblyMapper, self).n_to_g(var_n, alt_ac, alt_aln_method=self.alt_aln_method)
var_out = super(AssemblyMapper, self).n_to_g(
var_n, alt_ac, alt_aln_method=self.alt_aln_method)
return self._maybe_normalize(var_out)

def t_to_g(self, var_t):
alt_ac = self._alt_ac_for_tx_ac(var_t.ac)
var_out = super(AssemblyMapper, self).t_to_g(var_t, alt_ac, alt_aln_method=self.alt_aln_method)
var_out = super(AssemblyMapper, self).t_to_g(
var_t, alt_ac, alt_aln_method=self.alt_aln_method)
return self._maybe_normalize(var_out)

def t_to_p(self, var_t):
Expand All @@ -120,7 +134,8 @@ def t_to_p(self, var_t):
return "non-coding"
if var_t.type == "c":
return self.c_to_p(var_t)
raise HGVSInvalidVariantError("Expected a coding (c.) or non-coding (n.) variant; got " + str(var_t))
raise HGVSInvalidVariantError("Expected a coding (c.) or non-coding (n.) variant; got " +
str(var_t))

def c_to_n(self, var_c):
var_out = super(AssemblyMapper, self).c_to_n(var_c)
Expand Down Expand Up @@ -148,8 +163,8 @@ def _alt_ac_for_tx_ac(self, tx_ac):
"""
alt_acs = [
e["alt_ac"] for e in self.hdp.get_tx_mapping_options(tx_ac)
if e["alt_aln_method"] == self.alt_aln_method and e["alt_ac"] in self._assembly_accessions
e["alt_ac"] for e in self.hdp.get_tx_mapping_options(tx_ac) if
e["alt_aln_method"] == self.alt_aln_method and e["alt_ac"] in self._assembly_accessions
]

if not alt_acs:
Expand All @@ -160,10 +175,14 @@ def _alt_ac_for_tx_ac(self, tx_ac):
if len(alt_acs) > 1:
names = set(self._assembly_map[ac] for ac in alt_acs)
if names != set("XY"):
alts = ", ".join(["{ac} ({n})".format(ac=ac, n=self._assembly_map[ac]) for ac in alt_acs])
alts = ", ".join(
["{ac} ({n})".format(ac=ac, n=self._assembly_map[ac]) for ac in alt_acs])
raise HGVSError("Multiple chromosomal alignments for {tx_ac} in {an}"
" using {am} (non-pseudoautosomal region) [{alts}]".format(
tx_ac=tx_ac, an=self.assembly_name, am=self.alt_aln_method, alts=alts))
tx_ac=tx_ac,
an=self.assembly_name,
am=self.alt_aln_method,
alts=alts))

# assume PAR
if self.in_par_assume is None:
Expand Down
41 changes: 28 additions & 13 deletions hgvs/dataproviders/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,33 +64,44 @@ def __init__(self, mode=None, cache=None):
self.cache = PersistentDict(cache, flag='r')

self.data_version = lru_cache(
maxsize=hgvs.global_config.lru_cache.maxsize, mode=self.mode, cache=self.cache)(self.data_version)
maxsize=hgvs.global_config.lru_cache.maxsize, mode=self.mode,
cache=self.cache)(self.data_version)
self.schema_version = lru_cache(
maxsize=hgvs.global_config.lru_cache.maxsize, mode=self.mode, cache=self.cache)(self.schema_version)
maxsize=hgvs.global_config.lru_cache.maxsize, mode=self.mode,
cache=self.cache)(self.schema_version)
self.get_acs_for_protein_seq = lru_cache(
maxsize=hgvs.global_config.lru_cache.maxsize, mode=self.mode,
cache=self.cache)(self.get_acs_for_protein_seq)
self.get_gene_info = lru_cache(
maxsize=hgvs.global_config.lru_cache.maxsize, mode=self.mode, cache=self.cache)(self.get_gene_info)
maxsize=hgvs.global_config.lru_cache.maxsize, mode=self.mode,
cache=self.cache)(self.get_gene_info)
self.get_pro_ac_for_tx_ac = lru_cache(
maxsize=hgvs.global_config.lru_cache.maxsize, mode=self.mode, cache=self.cache)(self.get_pro_ac_for_tx_ac)
maxsize=hgvs.global_config.lru_cache.maxsize, mode=self.mode,
cache=self.cache)(self.get_pro_ac_for_tx_ac)
self.get_seq = lru_cache(
maxsize=hgvs.global_config.lru_cache.maxsize, mode=self.mode, cache=self.cache)(self.get_seq)
maxsize=hgvs.global_config.lru_cache.maxsize, mode=self.mode,
cache=self.cache)(self.get_seq)
self.get_similar_transcripts = lru_cache(
maxsize=hgvs.global_config.lru_cache.maxsize, mode=self.mode,
cache=self.cache)(self.get_similar_transcripts)
self.get_tx_exons = lru_cache(
maxsize=hgvs.global_config.lru_cache.maxsize, mode=self.mode, cache=self.cache)(self.get_tx_exons)
maxsize=hgvs.global_config.lru_cache.maxsize, mode=self.mode,
cache=self.cache)(self.get_tx_exons)
self.get_tx_for_gene = lru_cache(
maxsize=hgvs.global_config.lru_cache.maxsize, mode=self.mode, cache=self.cache)(self.get_tx_for_gene)
maxsize=hgvs.global_config.lru_cache.maxsize, mode=self.mode,
cache=self.cache)(self.get_tx_for_gene)
self.get_tx_for_region = lru_cache(
maxsize=hgvs.global_config.lru_cache.maxsize, mode=self.mode, cache=self.cache)(self.get_tx_for_region)
maxsize=hgvs.global_config.lru_cache.maxsize, mode=self.mode,
cache=self.cache)(self.get_tx_for_region)
self.get_tx_identity_info = lru_cache(
maxsize=hgvs.global_config.lru_cache.maxsize, mode=self.mode, cache=self.cache)(self.get_tx_identity_info)
maxsize=hgvs.global_config.lru_cache.maxsize, mode=self.mode,
cache=self.cache)(self.get_tx_identity_info)
self.get_tx_info = lru_cache(
maxsize=hgvs.global_config.lru_cache.maxsize, mode=self.mode, cache=self.cache)(self.get_tx_info)
maxsize=hgvs.global_config.lru_cache.maxsize, mode=self.mode,
cache=self.cache)(self.get_tx_info)
self.get_tx_mapping_options = lru_cache(
maxsize=hgvs.global_config.lru_cache.maxsize, mode=self.mode, cache=self.cache)(self.get_tx_mapping_options)
maxsize=hgvs.global_config.lru_cache.maxsize, mode=self.mode,
cache=self.cache)(self.get_tx_mapping_options)

def _split_version_string(v):
versions = list(map(int, v.split(".")))
Expand All @@ -108,8 +119,12 @@ def _split_version_string(v):
return

raise RuntimeError(
"Incompatible versions: {k} requires schema version {rv}, but {self.url} provides version {av}".format(
k=type(self).__name__, self=self, rv=self.required_version, av=self.schema_version()))
"Incompatible versions: {k} requires schema version {rv}, but {self.url} provides version {av}"
.format(
k=type(self).__name__,
self=self,
rv=self.required_version,
av=self.schema_version()))

# required_version: what version of the remote schema is required
# by the subclass? This value is compared to the result of
Expand Down

0 comments on commit 5291f1a

Please sign in to comment.