From 3bdceed372a4168b8f3187382e44402213970c87 Mon Sep 17 00:00:00 2001 From: Gavin Huttley Date: Mon, 1 Jan 2024 08:52:59 +1100 Subject: [PATCH] API: get_alignment() returns a generator [CHANGED] it's possible a set of coordinates can map to multiple alignment records. We yield these one at a time. [NEW] allow selecting just a subset of an alignment, using the GapPositions class to reduce the genome sequence query to only that required for the final result. [NEW] multiple extensive tests of selecting subsets of alignment. --- src/ensembl_lite/_aligndb.py | 113 ++++++++++++++++++----- tests/test_aligndb.py | 174 +++++++++++++++++++++++++++++------ 2 files changed, 237 insertions(+), 50 deletions(-) diff --git a/src/ensembl_lite/_aligndb.py b/src/ensembl_lite/_aligndb.py index 5075349..07c9cca 100644 --- a/src/ensembl_lite/_aligndb.py +++ b/src/ensembl_lite/_aligndb.py @@ -132,42 +132,109 @@ def get_alignment( genomes: dict, species: str, coord_name: str, - start=None, - end=None, -) -> Alignment: - """return a cogent3 Alignment""" + start: int | None = None, + end: int | None = None, +) -> typing.Generator[Alignment]: + """yields cogent3 Alignments""" from ensembl_lite.convert import gap_coords_to_seq - # todo connect to annotation db has been filtered for all records for - # each species that fall within - # the coordinates of the records + if species not in genomes: + raise ValueError(f"unknown species {species!r}") + + if start is not None: # deal with case where numpy scalars are input + start = int(start) + if end is not None: + end = int(end) + + # todo connect to annotation db that has been filtered for all records for + # each species that fall within the coordinates of the records align_records = align_db.get_records_matching( species=species, coord_name=coord_name, start=start, end=end ) + # sample the sequences - seqs = [] for block in align_records: - # todo we get the gaps corresponding to the reference sequence - # and convert them to a GapPosition instance. We then convert - # the start, end into align_start, align_end. Those values are - # used for all other species -- they are converted into sequence - # coordinates for a species -- selecting their sequence and, - # building the aligned instance, selecting the annotation subset. - for record in block: - species = record["species"] + seqs = [] + # we get the gaps corresponding to the reference sequence + # and convert them to a GapPosition instance. We then convert + # the start, end into align_start, align_end. Those values are + # used for all other species -- they are converted into sequence + # coordinates for each species -- selecting their sequence and, + # building the aligned instance, selecting the annotation subset. + for align_record in block: + if ( + align_record["species"] == species + and align_record["coord_name"] == coord_name + ): + # start, end are genomic positions and the align_record + # start / end are also genomic positions + genome_start = align_record["start"] + genome_end = align_record["end"] + gaps = GapPositions( + align_record["gap_spans"], seq_length=genome_end - genome_start + ) + + # We use the GapPosition object to identify the alignment + # positions the start / end correspond to. The alignment + # positions are used below for slicing each sequence in the + # alignment. + + # make sure the sequence start and end are within this + # aligned block + seq_start = max(start or genome_start, genome_start) + seq_end = min(end or genome_end, genome_end) + # make these coordinates relative to the aligned segment + if align_record["strand"] == "-": + # if record is on minus strand, then genome end is + # the alignment start + seq_start, seq_end = genome_end - seq_end, genome_end - seq_start + else: + seq_start = seq_start - genome_start + seq_end = seq_end - genome_start + + align_start = gaps.from_seq_to_align_index(seq_start) + align_end = gaps.from_seq_to_align_index(seq_end) + break + else: + raise ValueError(f"no matching alignment record for {species!r}") + + for align_record in block: + species = align_record["species"] genome = genomes[species] + coord_name = align_record["coord_name"] + # We need to convert the alignment coordinates into sequence + # coordinates for this species. + genome_start = align_record["start"] + genome_end = align_record["end"] + gaps = GapPositions( + align_record["gap_spans"], seq_length=genome_end - genome_start + ) + + # We use the alignment indices derived for the reference sequence + # above + seq_start = gaps.from_align_to_seq_index(align_start) + seq_end = gaps.from_align_to_seq_index(align_end) + seq_length = seq_end - seq_start + if align_record["strand"] == "-": + # if it's neg strand, the alignment start is the genome end + seq_start = gaps.seq_length - seq_end s = genome.get_seq( - coord_name=record["coord_name"], - start=record["start"], - end=record["end"], + coord_name=coord_name, + start=genome_start + seq_start, + end=genome_start + seq_start + seq_length, ) - s = make_seq(s, name=record["coord_name"], moltype="dna") - if record["strand"] == "-": + # we now trim the gaps for this sequence to the sub-alignment + gaps = gaps[align_start:align_end] + + s = make_seq(s, name=coord_name, moltype="dna") + if align_record["strand"] == "-": s = s.rc() - aligned = gap_coords_to_seq(record["gap_spans"], s) + + aligned = gap_coords_to_seq(gaps.gaps, s) seqs.append(aligned) - return Alignment(seqs) + # todo need to add annotation_db + yield Alignment(seqs) def _adjust_gap_starts(gaps: numpy.ndarray, new_start: int) -> numpy.ndarray: diff --git a/tests/test_aligndb.py b/tests/test_aligndb.py index 6a3252c..d03dc44 100644 --- a/tests/test_aligndb.py +++ b/tests/test_aligndb.py @@ -2,6 +2,7 @@ import pytest from cogent3 import make_seq +from cogent3.core.annotation_db import GffAnnotationDb from ensembl_lite._aligndb import ( AlignDb, @@ -21,15 +22,35 @@ def small_seqs(): "s2": "GTG------GTAGAAGTTCCAAATAATGAA", "s3": "GCTGAAGTAGTGGAAGTTGCAAAT---GAA", } - return make_aligned_seqs( + seqs = make_aligned_seqs( data=seqs, moltype="dna", array_align=False, info=dict(species=dict(s1="human", s2="mouse", s3="dog")), ) + annot_db = GffAnnotationDb(source=":memory:") + annot_db.add_feature( + seqid="s1", biotype="gene", name="not-on-s2", spans=[(4, 7)], on_alignment=False + ) + annot_db.add_feature( + seqid="s2", + biotype="gene", + name="includes-s2-gap", + spans=[(2, 6)], + on_alignment=False, + ) + annot_db.add_feature( + seqid="s3", + biotype="gene", + name="includes-s3-gap", + spans=[(22, 27)], + on_alignment=False, + ) + seqs.annotation_db = annot_db + return seqs -def make_records(start, end): +def make_records(start, end, block_id): aln = small_seqs()[start:end] records = [] species = aln.info.species @@ -39,7 +60,7 @@ def make_records(start, end): record = AlignRecordType( source="blah", species=species[seq.name], - block_id=0, + block_id=block_id, coord_name=seq.name, start=seq.map.start, end=seq.map.end, @@ -52,7 +73,7 @@ def make_records(start, end): @pytest.fixture def small_records(): - records = make_records(1, 5) + records = make_records(1, 5, 0) return records @@ -121,19 +142,19 @@ def genomedbs_aligndb(small_records): def test_building_alignment(genomedbs_aligndb): genomes, align_db = genomedbs_aligndb - got = get_alignment(align_db, genomes, species="mouse", coord_name="s2") + got = list(get_alignment(align_db, genomes, species="mouse", coord_name="s2"))[0] orig = small_seqs()[1:5] assert got.to_dict() == orig.to_dict() @pytest.mark.parametrize( "kwargs", - (dict(species="dodo", coord_name="s2"), dict(species="mouse", coord_name="s222")), + (dict(species="dodo", coord_name="s2"),), ) def test_building_alignment_invalid_details(genomedbs_aligndb, kwargs): genomes, align_db = genomedbs_aligndb with pytest.raises(ValueError): - get_alignment(align_db, genomes, **kwargs) + list(get_alignment(align_db, genomes, **kwargs)) @pytest.mark.parametrize( @@ -237,14 +258,14 @@ def test_variant_slices(data, slice): assert (orig == gaps.gaps).all() -def make_sample(): - seqs = small_seqs() +def make_sample(two_aligns=False): + aln = small_seqs() # we will reverse complement the s2 genome compared to the original # this means our coordinates for alignment records from that genome # also need to be rc'ed - species = seqs.info.species + species = aln.info.species genomes = {} - for seq in seqs.seqs: + for seq in aln.seqs: name = seq.name seq = seq.data.degap() if seq.name == "s2": @@ -254,13 +275,34 @@ def make_sample(): genome.add_records(records=[(name, str(seq))]) genomes[species[name]] = genome - # define a slice - start, end = 1, 12 - align_records = make_records(start, end) - # identify the rc segment for s2, which will be reverse complemented - # relative to "genome" - s2 = seqs.get_gapped_seq("s2") - selected = s2[start:end].rc().degap() + # make annotation db's + annot_dbs = {} + for name in aln.names: + feature_db = aln.annotation_db.subset(seqid=name) + annot_dbs[species[name]] = feature_db + + # define two alignment blocks that incorporate features + align_records = _update_records(s2_genome, aln, 0, 1, 12) + if two_aligns: + align_records += _update_records(s2_genome, aln, 1, 22, 30) + align_db = AlignDb(source=":memory:") + align_db.add_records(records=align_records) + + return genomes, align_db + + +def _update_records(s2_genome, aln, block_id, start, end): + # start, end are the coordinates used to slice the alignment + align_records = make_records(start, end, block_id) + # in the alignment, s2 is in reverse complement relative to its genome + # In order to be sure what "genome" coordinates are for s2, we first slice + # the alignment + aln = aln[start:end] + # then get the ungapped sequence + s2 = aln.get_seq("s2") + # and reverse complement it ... + selected = s2.rc() + # so we can get the genome coordinates for this segment on the s2 genome start = s2_genome.find(str(selected)) end = start + len(selected) for record in align_records: @@ -269,20 +311,98 @@ def make_sample(): record["end"] = end record["strand"] = "-" break - align_db = AlignDb(source=":memory:") - align_db.add_records(records=align_records) - - return genomes, align_db + return align_records -def test_select_alignment_rc(): - expect = small_seqs()[1:12] +@pytest.mark.parametrize( + "start_end", + ( + (None, None), + (None, 11), + (3, None), + (3, 13), + ), +) +@pytest.mark.parametrize( + "species_coord", + ( + ("human", "s1"), + ("dog", "s3"), + ), +) +def test_select_alignment_plus_strand(species_coord, start_end): + species, coord_name = species_coord + start, end = start_end + aln = small_seqs() + expect = aln[max(1, start or 1) : min(end or 12, 12)] # one sequence is stored in reverse complement genomes, align_db = make_sample() - got = get_alignment( - align_db=align_db, genomes=genomes, species="human", coord_name="s1" + got = list( + get_alignment( + align_db=align_db, + genomes=genomes, + species=species, + coord_name=coord_name, + start=start, + end=end, + ) ) - assert got.to_dict() == expect.to_dict() + assert len(got) == 1 + assert got[0].to_dict() == expect.to_dict() + + +@pytest.mark.parametrize( + "start_end", + ( + (None, None), + (None, 5), + (2, None), + (2, 7), + ), +) +def test_select_alignment_minus_strand(start_end): + species, coord_name = "mouse", "s2" + start, end = start_end + aln = small_seqs() + ft = aln.add_feature( + biotype="custom", + name="selected", + seqid="s2", + on_alignment=False, + spans=[(max(1, start or 0), min(end or 12, 12))], + ) + expect = aln[ft.map.start : min(ft.map.end, 12)] + + # mouse sequence is on minus strand, so need to adjust + # coordinates for query + s2 = aln.get_seq("s2") + s2_ft = list(s2.get_features(name="selected"))[0] + if not any([start is None, end is None]): + start = len(s2) - s2_ft.map.end + end = len(s2) - s2_ft.map.start + elif start == None != end: # noqa E711 + start = len(s2) - s2_ft.map.end + end = None + elif start != None == end: # noqa E711 + end = len(s2) - s2_ft.map.start + start = None + + # mouse sequence is on minus strand, so need to adjust + # coordinates for query + + genomes, align_db = make_sample(two_aligns=False) + got = list( + get_alignment( + align_db=align_db, + genomes=genomes, + species=species, + coord_name=coord_name, + start=start, + end=end, + ) + ) + assert len(got) == 1 + assert got[0].to_dict() == expect.to_dict() @pytest.mark.parametrize(