Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix release folder assignation and add table to store aggregate per taxonomy and assembly #458

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from sqlalchemy.orm import Session

from gather_clustering_counts.release_count_models import RSCountCategory, RSCount, get_sql_alchemy_engine, \
RSCountPerTaxonomy, RSCountPerAssembly
RSCountPerTaxonomy, RSCountPerAssembly, RSCountPerTaxonomyAssembly

logger = logging_config.get_logger(__name__)

Expand Down Expand Up @@ -275,6 +275,7 @@ def _write_per_taxonomy_counts(self, session):
result = session.execute(query).fetchone()
if result:
taxonomy_row = result.RSCountPerTaxonomy
self.info(f"Update counts for aggregate per taxonomy {taxonomy_id}")
else:
self.info(f"Create persistence for aggregate per taxonomy {taxonomy_id}")
taxonomy_row = RSCountPerTaxonomy(
Expand All @@ -294,11 +295,11 @@ def _write_per_taxonomy_counts(self, session):
else:
prev_count_for_taxonomy = None
for rs_type in taxonomy_counts.get(taxonomy_id):
count_prev = 0
if prev_count_for_taxonomy:
count_new = taxonomy_counts.get(taxonomy_id).get(rs_type) - getattr(prev_count_for_taxonomy,
self._type_to_column(rs_type))
else:
count_new = 0
count_prev = getattr(prev_count_for_taxonomy, self._type_to_column(rs_type))

count_new = taxonomy_counts.get(taxonomy_id).get(rs_type) - count_prev
setattr(taxonomy_row, self._type_to_column(rs_type), taxonomy_counts.get(taxonomy_id).get(rs_type))
setattr(taxonomy_row, self._type_to_column(rs_type, is_new=True), count_new)
session.add(taxonomy_row)
Expand All @@ -314,6 +315,7 @@ def _write_per_assembly_counts(self, session):
result = session.execute(query).fetchone()
if result:
assembly_row = result.RSCountPerAssembly
self.info(f"Update counts for aggregate per assembly {assembly}")
else:
self.info(f"Create persistence for aggregate per assembly {assembly}")
assembly_row = RSCountPerAssembly(
Expand All @@ -333,15 +335,57 @@ def _write_per_assembly_counts(self, session):
else:
prev_count_for_assembly = None
for rs_type in assembly_counts.get(assembly):
count_prev = 0
if prev_count_for_assembly:
count_new = assembly_counts.get(assembly).get(rs_type) - getattr(prev_count_for_assembly,
self._type_to_column(rs_type))
else:
count_new = 0
count_prev = getattr(prev_count_for_assembly, self._type_to_column(rs_type))
count_new = assembly_counts.get(assembly).get(rs_type) -count_prev
setattr(assembly_row, self._type_to_column(rs_type), assembly_counts.get(assembly).get(rs_type))
setattr(assembly_row, self._type_to_column(rs_type, is_new=True), count_new)
session.add(assembly_row)

def _write_per_taxonomy_and_assembly_counts(self, session):
"""Load the aggregated count per assembly (assume previous version of the release was loaded already)"""
species_assembly_counts, species_assembly_annotations = self.generate_per_taxonomy_and_assembly_counts()
for taxonomy, assembly in species_assembly_counts:
query = select(RSCountPerTaxonomyAssembly).where(
RSCountPerTaxonomyAssembly.taxonomy_id == taxonomy,
RSCountPerTaxonomyAssembly.assembly_accession == assembly,
RSCountPerTaxonomyAssembly.release_version == self.release_version
)
result = session.execute(query).fetchone()
if result:
taxonomy_assembly_row = result.RSCountPerTaxonomyAssembly
self.info(f"Update counts for aggregate per taxonomy {taxonomy} and assembly {assembly}")
else:
self.info(f"Create persistence for aggregate per taxonomy {taxonomy} and assembly {assembly}")
taxonomy_assembly_row = RSCountPerTaxonomyAssembly(
taxonomy_id=taxonomy,
assembly_accession=assembly,
release_folder=species_assembly_annotations.get((taxonomy, assembly)).get('release_folder'),
release_version=self.release_version,
)
# Retrieve the count for the previous release
query = select(RSCountPerTaxonomyAssembly).where(
RSCountPerTaxonomyAssembly.taxonomy_id == taxonomy,
RSCountPerTaxonomyAssembly.assembly_accession == assembly,
RSCountPerTaxonomyAssembly.release_version == self.release_version - 1
)
result = session.execute(query).fetchone()
if result:
prev_count_for_taxonomy_assembly = result.RSCountPerTaxonomyAssembly
else:
prev_count_for_taxonomy_assembly = None
for rs_type in species_assembly_counts.get((taxonomy, assembly)):
count_prev = 0
if prev_count_for_taxonomy_assembly:
count_prev = getattr(prev_count_for_taxonomy_assembly, self._type_to_column(rs_type))

count_new = species_assembly_counts.get((taxonomy, assembly)).get(rs_type) - count_prev
setattr(taxonomy_assembly_row, self._type_to_column(rs_type),
species_assembly_counts.get((taxonomy, assembly)).get(rs_type))
setattr(taxonomy_assembly_row, self._type_to_column(rs_type, is_new=True), count_new)
session.add(taxonomy_assembly_row)

def write_counts_to_db(self):
"""
For all the counts gathered in this self.all_counts_grouped, write them to the db if they do not exist already.
Expand All @@ -352,6 +396,7 @@ def write_counts_to_db(self):
self._write_exploded_counts(session)
self._write_per_taxonomy_counts(session)
self._write_per_assembly_counts(session)
self._write_per_taxonomy_and_assembly_counts(session)

def get_assembly_counts_from_database(self):
"""
Expand Down Expand Up @@ -400,6 +445,7 @@ def generate_per_taxonomy_counts(self):
species_annotations = defaultdict(dict)
for count_groups in self.all_counts_grouped:
taxonomy_and_types = set([(count_dict['taxonomy'], count_dict['idtype']) for count_dict in count_groups])
release_folder_map = dict((count_dict['taxonomy'], count_dict['release_folder']) for count_dict in count_groups)
for taxonomy, rstype in taxonomy_and_types:
if taxonomy not in species_annotations:
species_annotations[taxonomy] = {'assemblies': set(), 'release_folder': None}
Expand All @@ -412,7 +458,7 @@ def generate_per_taxonomy_counts(self):
if count_dict['taxonomy'] is taxonomy and count_dict['idtype'] is rstype
])
)
species_annotations[taxonomy]['release_folder'] = count_groups[0]['release_folder']
species_annotations[taxonomy]['release_folder'] = release_folder_map[taxonomy]
return species_counts, species_annotations

def generate_per_assembly_counts(self):
Expand All @@ -431,16 +477,26 @@ def generate_per_assembly_counts(self):
for count_dict in count_groups
if count_dict['assembly'] is assembly_accession and count_dict['idtype'] is rstype
]))

assembly_annotations[assembly_accession]['release_folder'] = assembly_accession
return assembly_counts, assembly_annotations

# def generate_per_species_assembly_counts(self):
# species_assembly_counts = defaultdict(Counter)
# for count_groups in self.all_counts_grouped:
# for count_dict in count_groups:
# species_assembly_counts[count_dict['assembly'] + '\t' + count_dict['assembly']][count_dict['idtype'] + '_rs'] += count_dict['count']
# return species_assembly_counts
def generate_per_taxonomy_and_assembly_counts(self):
species_assembly_counts = defaultdict(Counter)
species_assembly_annotations = defaultdict(dict)
for count_groups in self.all_counts_grouped:
taxonomy_assembly_and_types = set([(count_dict['taxonomy'], count_dict['assembly'], count_dict['idtype']) for count_dict in count_groups])
release_folder_map = dict((count_dict['taxonomy'], count_dict['release_folder']) for count_dict in count_groups)
for taxonomy, assembly, rstype in taxonomy_assembly_and_types:
if (taxonomy, assembly) not in species_assembly_annotations:
species_assembly_annotations[(taxonomy, assembly)] = {'release_folder': None}
# All count_dict have the same count in a group
species_assembly_counts[(taxonomy, assembly)][rstype] += count_groups[0]['count']
if assembly != 'Unmapped':
species_assembly_annotations[(taxonomy, assembly)]['release_folder'] = \
release_folder_map[taxonomy] + '/' + assembly
else:
species_assembly_annotations[(taxonomy, assembly)]['release_folder'] = release_folder_map[taxonomy]
return species_assembly_counts, species_assembly_annotations

def detect_inconsistent_types(self):
inconsistent_types = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,31 @@ class RSCountPerAssembly(Base):
schema = 'eva_stats'


class RSCountPerTaxonomyAssembly(Base):
"""
Table that provide the aggregated count per taxonomy and assembly
"""
__tablename__ = 'release_rs_count_per_taxonomy_assembly'

taxonomy_id = Column(Integer, primary_key=True)
assembly_accession = Column(String, primary_key=True)
release_version = Column(Integer, primary_key=True)
release_folder = Column(String)
current_rs = Column(BigInteger, default=0)
multimap_rs = Column(BigInteger, default=0)
merged_rs = Column(BigInteger, default=0)
deprecated_rs = Column(BigInteger, default=0)
merged_deprecated_rs = Column(BigInteger, default=0)
unmapped_rs = Column(BigInteger, default=0)
new_current_rs = Column(BigInteger, default=0)
new_multimap_rs = Column(BigInteger, default=0)
new_merged_rs = Column(BigInteger, default=0)
new_deprecated_rs = Column(BigInteger, default=0)
new_merged_deprecated_rs = Column(BigInteger, default=0)
new_unmapped_rs = Column(BigInteger, default=0)
schema = 'eva_stats'


def get_sql_alchemy_engine(dbtype, username, password, host_url, database, port):
engine = create_engine(URL.create(
dbtype + '+psycopg2',
Expand All @@ -104,6 +129,7 @@ def get_sql_alchemy_engine(dbtype, username, password, host_url, database, port)
RSCountCategory.__table__.create(bind=engine, checkfirst=True)
RSCountPerAssembly.__table__.create(bind=engine, checkfirst=True)
RSCountPerTaxonomy.__table__.create(bind=engine, checkfirst=True)
RSCountPerTaxonomyAssembly.__table__.create(bind=engine, checkfirst=True)
return engine


Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
import os
from itertools import cycle
from unittest import TestCase
from unittest.mock import patch

from ebi_eva_common_pyutils.metadata_utils import get_metadata_connection_handle
from ebi_eva_common_pyutils.pg_utils import execute_query
from ebi_eva_internal_pyutils.metadata_utils import get_metadata_connection_handle
from ebi_eva_internal_pyutils.pg_utils import execute_query
from sqlalchemy import select
from sqlalchemy.orm import Session

from gather_clustering_counts.gather_release_counts import find_link, ReleaseCounter
from gather_clustering_counts.release_count_models import RSCountPerTaxonomy, RSCountPerAssembly, RSCountCategory, \
RSCount
RSCount, RSCountPerTaxonomyAssembly


def test_find_links():
Expand All @@ -35,18 +34,22 @@ def test_find_links():
assert find_link({'E'}, d1, d2) == (frozenset({'E'}), frozenset({}))



class TestReleaseCounter(TestCase):

resource_folder = os.path.dirname(__file__)

def setUp(self):
self.private_config_xml_file = os.path.join(self.resource_folder, 'config_xml_file.xml')
self.config_profile = "localhost"

def tearDown(self):
with get_metadata_connection_handle(self.config_profile, self.private_config_xml_file) as db_conn:
for sqlalchemy_class in [RSCountCategory, RSCount, RSCountPerTaxonomy, RSCountPerAssembly]:
query = f'DROP TABLE {sqlalchemy_class.schema}.{sqlalchemy_class.__tablename__}'
execute_query(db_conn, query)
table_names = ','.join(f'{sqlalchemy_class.schema}.{sqlalchemy_class.__tablename__}' for sqlalchemy_class in
[RSCountCategory, RSCount, RSCountPerTaxonomy, RSCountPerAssembly,
RSCountPerTaxonomyAssembly])
query = f'DROP TABLE IF EXISTS {table_names};'
print(query)
execute_query(db_conn, query)

def test_write_counts_to_db(self):
"""This test require a postgres database running on localhost. See config_xml_file.xml for detail."""
Expand Down Expand Up @@ -93,7 +96,19 @@ def test_write_counts_to_db(self):
assert rs_assembly_count.new_current_rs == 0
assert rs_assembly_count.release_folder == 'GCA_000003205.6'

query = select(RSCountPerTaxonomyAssembly).where(
RSCountPerTaxonomyAssembly.assembly_accession == 'GCA_000003205.6',
RSCountPerTaxonomyAssembly.taxonomy_id == 9913,
RSCountPerTaxonomyAssembly.release_version == 1
)
result = session.execute(query).fetchone()
rs_count_per_taxonomy_assembly = result.RSCountPerTaxonomyAssembly
assert rs_count_per_taxonomy_assembly.current_rs == 61038394
assert rs_count_per_taxonomy_assembly.new_current_rs == 0
assert rs_count_per_taxonomy_assembly.release_folder == 'Cow_9913/GCA_000003205.6'

def test_write_counts_to_db2(self):
"""This test require a postgres database running on localhost. See config_xml_file.xml for detail."""
log_files_release = [os.path.join(self.resource_folder, 'count_for_haplochromini_oreochromis_niloticus.log')]
folder_to_taxonomy = {'oreochromis_niloticus': 8128, 'haplochromini': 319058}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from ebi_eva_common_pyutils.logger import logging_config
from ebi_eva_internal_pyutils.metadata_utils import get_metadata_connection_handle
from ebi_eva_internal_pyutils.pg_utils import get_all_results_for_query

from run_release_in_embassy.release_common_utils import get_release_folder_name
from run_release_in_embassy.run_release_for_species import load_config, get_release_folder
from run_release_in_embassy.release_metadata import release_vcf_file_categories, release_text_file_categories
Expand Down
Loading