From 370c08a2324d0f39b31324b5614cd8c432e123c2 Mon Sep 17 00:00:00 2001 From: tcezard Date: Tue, 2 Jul 2024 16:52:11 +0100 Subject: [PATCH] New table to store aggregate per taxonomy and assembly for the release --- .../gather_release_counts.py | 68 ++++++++++++++++--- .../release_count_models.py | 25 +++++++ .../publish_release_files_to_ftp.py | 1 + 3 files changed, 86 insertions(+), 8 deletions(-) diff --git a/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py b/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py index 30ffe11f8..11cb6662a 100644 --- a/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py +++ b/eva-accession-release-automation/gather_clustering_counts/gather_release_counts.py @@ -17,7 +17,7 @@ from sqlalchemy.orm import Session from gather_clustering_counts.release_count_models import RSCountCategory, RSCount, get_sql_alchemy_engine, \ - RSCountPerTaxonomy, RSCountPerAssembly + RSCountPerTaxonomy, RSCountPerAssembly, RSCountPerTaxonomyAssembly logger = logging_config.get_logger(__name__) @@ -342,6 +342,48 @@ def _write_per_assembly_counts(self, session): setattr(assembly_row, self._type_to_column(rs_type, is_new=True), count_new) session.add(assembly_row) + def _write_per_taxonomy_and_assembly_counts(self, session): + """Load the aggregated count per assembly (assume previous version of the release was loaded already)""" + species_assembly_counts, species_assembly_annotations = self.generate_per_taxonomy_and_assembly_counts() + for taxonomy, assembly in species_assembly_counts: + query = select(RSCountPerTaxonomyAssembly).where( + RSCountPerTaxonomyAssembly.taxonomy_id == taxonomy, + RSCountPerTaxonomyAssembly.assembly_accession == assembly, + RSCountPerTaxonomyAssembly.release_version == self.release_version + ) + result = session.execute(query).fetchone() + if result: + taxonomy_assembly_row = result.RSCountPerAssembly + else: + self.info(f"Create persistence for aggregate per assembly {assembly}") + taxonomy_assembly_row = RSCountPerAssembly( + taxonomy_id=taxonomy, + assembly_accession=assembly, + release_folder=species_assembly_annotations.get((taxonomy, assembly)).get('release_folder'), + release_version=self.release_version, + ) + # Retrieve the count for the previous release + query = select(RSCountPerTaxonomyAssembly).where( + RSCountPerTaxonomyAssembly.taxonomy_id == taxonomy, + RSCountPerTaxonomyAssembly.assembly_accession == assembly, + RSCountPerTaxonomyAssembly.release_version == self.release_version - 1 + ) + result = session.execute(query).fetchone() + if result: + prev_count_for_taxonomy_assembly = result.RSCountPerTaxonomyAssembly + else: + prev_count_for_taxonomy_assembly = None + for rs_type in species_assembly_counts.get((taxonomy, assembly)): + if prev_count_for_taxonomy_assembly: + count_new = species_assembly_counts.get((taxonomy, assembly)).get(rs_type) - \ + getattr(prev_count_for_taxonomy_assembly, self._type_to_column(rs_type)) + else: + count_new = 0 + setattr(taxonomy_assembly_row, self._type_to_column(rs_type), + species_assembly_counts.get((taxonomy, assembly)).get(rs_type)) + setattr(taxonomy_assembly_row, self._type_to_column(rs_type, is_new=True), count_new) + session.add(taxonomy_assembly_row) + def write_counts_to_db(self): """ For all the counts gathered in this self.all_counts_grouped, write them to the db if they do not exist already. @@ -352,6 +394,7 @@ def write_counts_to_db(self): self._write_exploded_counts(session) self._write_per_taxonomy_counts(session) self._write_per_assembly_counts(session) + self._write_per_taxonomy_and_assembly_counts(session) def get_assembly_counts_from_database(self): """ @@ -400,6 +443,7 @@ def generate_per_taxonomy_counts(self): species_annotations = defaultdict(dict) for count_groups in self.all_counts_grouped: taxonomy_and_types = set([(count_dict['taxonomy'], count_dict['idtype']) for count_dict in count_groups]) + release_folder_map = dict((count_dict['taxonomy'], count_dict['release_folder'] for count_dict in count_groups)) for taxonomy, rstype in taxonomy_and_types: if taxonomy not in species_annotations: species_annotations[taxonomy] = {'assemblies': set(), 'release_folder': None} @@ -412,7 +456,7 @@ def generate_per_taxonomy_counts(self): if count_dict['taxonomy'] is taxonomy and count_dict['idtype'] is rstype ]) ) - species_annotations[taxonomy]['release_folder'] = count_groups[0]['release_folder'] + species_annotations[taxonomy]['release_folder'] = release_folder_map[taxonomy] return species_counts, species_annotations def generate_per_assembly_counts(self): @@ -420,6 +464,7 @@ def generate_per_assembly_counts(self): assembly_annotations = {} for count_groups in self.all_counts_grouped: assembly_and_types = set([(count_dict['assembly'], count_dict['idtype']) for count_dict in count_groups]) + release_folder_map = dict((count_dict['taxonomy'], count_dict['release_folder'] for count_dict in count_groups)) for assembly_accession, rstype in assembly_and_types: if assembly_accession not in assembly_annotations: assembly_annotations[assembly_accession] = {'taxonomies': set(), 'release_folder': None} @@ -435,12 +480,19 @@ def generate_per_assembly_counts(self): assembly_annotations[assembly_accession]['release_folder'] = assembly_accession return assembly_counts, assembly_annotations - # def generate_per_species_assembly_counts(self): - # species_assembly_counts = defaultdict(Counter) - # for count_groups in self.all_counts_grouped: - # for count_dict in count_groups: - # species_assembly_counts[count_dict['assembly'] + '\t' + count_dict['assembly']][count_dict['idtype'] + '_rs'] += count_dict['count'] - # return species_assembly_counts + def generate_per_taxonomy_and_assembly_counts(self): + species_assembly_counts = defaultdict(Counter) + species_assembly_annotations = defaultdict(dict) + for count_groups in self.all_counts_grouped: + taxonomy_assembly_and_types = set([(count_dict['taxonomy'], count_dict['assembly'], count_dict['idtype']) for count_dict in count_groups]) + release_folder_map = dict((count_dict['taxonomy'], count_dict['release_folder'] for count_dict in count_groups)) + for taxonomy, assembly, rstype in taxonomy_assembly_and_types: + if (taxonomy, assembly) not in species_assembly_annotations: + species_assembly_annotations[(taxonomy, assembly)] = {'release_folder': None} + # All count_dict have the same count in a group + species_assembly_counts[(taxonomy, assembly)][rstype] += count_groups[0]['count'] + species_assembly_annotations[(taxonomy, assembly)]['release_folder'] = release_folder_map[taxonomy] + '/' + assembly + return species_assembly_counts, species_assembly_annotations def detect_inconsistent_types(self): inconsistent_types = [] diff --git a/eva-accession-release-automation/gather_clustering_counts/release_count_models.py b/eva-accession-release-automation/gather_clustering_counts/release_count_models.py index b38f242f2..45eed2e62 100644 --- a/eva-accession-release-automation/gather_clustering_counts/release_count_models.py +++ b/eva-accession-release-automation/gather_clustering_counts/release_count_models.py @@ -89,6 +89,31 @@ class RSCountPerAssembly(Base): schema = 'eva_stats' +class RSCountPerTaxonomyAssembly(Base): + """ + Table that provide the aggregated count per taxonomy and assembly + """ + __tablename__ = 'release_rs_count_per_taxonomy_assembly' + + taxonomy_id = Column(Integer, primary_key=True) + assembly_accession = Column(String, primary_key=True) + release_version = Column(Integer, primary_key=True) + release_folder = Column(String) + current_rs = Column(BigInteger, default=0) + multimap_rs = Column(BigInteger, default=0) + merged_rs = Column(BigInteger, default=0) + deprecated_rs = Column(BigInteger, default=0) + merged_deprecated_rs = Column(BigInteger, default=0) + unmapped_rs = Column(BigInteger, default=0) + new_current_rs = Column(BigInteger, default=0) + new_multimap_rs = Column(BigInteger, default=0) + new_merged_rs = Column(BigInteger, default=0) + new_deprecated_rs = Column(BigInteger, default=0) + new_merged_deprecated_rs = Column(BigInteger, default=0) + new_unmapped_rs = Column(BigInteger, default=0) + schema = 'eva_stats' + + def get_sql_alchemy_engine(dbtype, username, password, host_url, database, port): engine = create_engine(URL.create( dbtype + '+psycopg2', diff --git a/eva-accession-release-automation/publish_release_to_ftp/publish_release_files_to_ftp.py b/eva-accession-release-automation/publish_release_to_ftp/publish_release_files_to_ftp.py index 63b224380..22ee45839 100644 --- a/eva-accession-release-automation/publish_release_to_ftp/publish_release_files_to_ftp.py +++ b/eva-accession-release-automation/publish_release_to_ftp/publish_release_files_to_ftp.py @@ -27,6 +27,7 @@ from ebi_eva_common_pyutils.logger import logging_config from ebi_eva_internal_pyutils.metadata_utils import get_metadata_connection_handle from ebi_eva_internal_pyutils.pg_utils import get_all_results_for_query + from run_release_in_embassy.release_common_utils import get_release_folder_name from run_release_in_embassy.run_release_for_species import load_config, get_release_folder from run_release_in_embassy.release_metadata import release_vcf_file_categories, release_text_file_categories