Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions src/ensembl/production/metadata/grpc/adaptors/genome.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,14 +204,15 @@ def fetch_genomes(self, genome_id=None, genome_uuid=None, genome_tag=None, organ

if allow_unreleased:
# fetch everything (released + unreleased)
pass
logger.info("ALLOW_UNRELEASED is set to True...")
elif unreleased_only:
# fetch unreleased only
# this filter will get all Genome entries where there's no associated GenomeRelease
# the tilde (~) symbol is used for negation.
genome_select = genome_select.filter(~Genome.genome_releases.any())
logger.info("Fetching only unreleased data...")
else:
# fetch released only
logger.info("Fetching released data only...")
# Check if genome is released
# TODO: why did I add this check?! -> removing this breaks the test_update tests
with self.metadata_db.session_scope() as session:
Expand All @@ -225,6 +226,7 @@ def fetch_genomes(self, genome_id=None, genome_uuid=None, genome_tag=None, organ
is_genome_released = session.execute(prep_query).first()

if is_genome_released:
logger.info(f"Genome UUID '{genome_uuid}' is released")
# Include release related info if released_only is True
genome_select = genome_select.add_columns(GenomeRelease, EnsemblRelease, EnsemblSite) \
.join(GenomeRelease, Genome.genome_id == GenomeRelease.genome_id) \
Expand All @@ -245,6 +247,10 @@ def fetch_genomes(self, genome_id=None, genome_uuid=None, genome_tag=None, organ
if release_type is not None:
genome_select = genome_select.filter(EnsemblRelease.release_type == release_type)

else:
logger.info(f"Genome UUID '{genome_uuid}' doesn't exist or it's not released yet.")
return []

logger.debug(genome_select)
with self.metadata_db.session_scope() as session:
session.expire_on_commit = False
Expand Down
38 changes: 37 additions & 1 deletion src/ensembl/production/metadata/grpc/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,41 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import warnings


def parse_boolean_var(var):
"""
Parse an input variable into a boolean value.

This function interprets the input `var` and attempts to convert it into a boolean value (`True` or `False`).
It handles booleans and strings specifically, and defaults to `False` for other types with a warning.

Args:
var (bool|str|any): The variable to parse. This can be a boolean, a string, or any other type.
- If it's a boolean, it's returned as-is.
- If it's a string, it's considered `True` unless it's 'f', 'false', 'no', 'none', '0'
or 'n' (case-insensitive), or it's an empty string.
- For other types, a warning is issued, and `False` is returned.

Returns:
bool: The parsed boolean value. Returns `True` or `False` based on the input:
- `True` if `var` is `True`, a non-falsy string not equal to 'f', 'false', 'no', 'none', '0', or 'n'.
- `False` if `var` is `False`, a string equal to 'f', 'false', 'no', 'none', '0', or 'n', any non-string and
non-boolean input, or an empty string.

Raises:
Warning: If `var` is not a boolean or a string, a warning is raised indicating the input
couldn't be parsed to a boolean.
"""
if isinstance(var, bool):
return var
elif isinstance(var, str):
return not ((var.lower() in ("f", "false", "no", "none", "0", "n")) or (not var))
else:
# default to false, something is wrong.
warnings.warn(f"Var {var} couldn't be parsed to boolean")
return False


class MetadataConfig:
Expand All @@ -18,4 +53,5 @@ class MetadataConfig:
pool_size = os.environ.get("POOL_SIZE", 20)
max_overflow = os.environ.get("MAX_OVERFLOW", 0)
pool_recycle = os.environ.get("POOL_RECYCLE", 50)
allow_unreleased = os.environ.get("ALLOW_UNRELEASED", False)
allow_unreleased = parse_boolean_var(os.environ.get("ALLOW_UNRELEASED", False))
debug_mode = parse_boolean_var(os.environ.get("DEBUG", False))
21 changes: 19 additions & 2 deletions src/ensembl/production/metadata/grpc/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,20 @@
import grpc
import logging

from ensembl.production.metadata.grpc.config import MetadataConfig as cfg
from ensembl.production.metadata.grpc import ensembl_metadata_pb2_grpc
from ensembl.production.metadata.grpc.servicer import EnsemblMetadataServicer

logger = logging.getLogger(__name__)

# Determine the logging level based on the value of cfg.debug_mode
log_level = logging.DEBUG if cfg.debug_mode else logging.WARNING

logging.basicConfig(
level=log_level,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)


def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
Expand All @@ -24,9 +35,15 @@ def serve():
)
server.add_insecure_port("[::]:50051")
server.start()
server.wait_for_termination()
try:
server.wait_for_termination()
except KeyboardInterrupt:
logger.info("KeyboardInterrupt caught, stopping the server...")
server.stop(grace=0) # Immediately stop the server
logger.info("gRPC server has shut down gracefully")


if __name__ == "__main__":
logging.basicConfig()
logger.info("gRPC server starting on port 50051...")
logger.info(f"DEBUG: {cfg.debug_mode}")
serve()
24 changes: 24 additions & 0 deletions src/ensembl/production/metadata/grpc/servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,96 +9,120 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging

from ensembl.production.metadata.grpc import ensembl_metadata_pb2_grpc

import ensembl.production.metadata.grpc.utils as utils

logger = logging.getLogger(__name__)


class EnsemblMetadataServicer(ensembl_metadata_pb2_grpc.EnsemblMetadataServicer):
def __init__(self):
self.db = utils.connect_to_db()

def GetSpeciesInformation(self, request, context):
logger.debug(f"Received RPC for GetSpeciesInformation with request: {request}")
return utils.get_species_information(self.db, request.genome_uuid)

def GetAssemblyInformation(self, request, context):
logger.debug(f"Received RPC for GetAssemblyInformation with request: {request}")
return utils.get_assembly_information(self.db, request.assembly_uuid)

def GetGenomesByAssemblyAccessionID(self, request, context):
logger.debug(f"Received RPC for GetGenomesByAssemblyAccessionID with request: {request}")
return utils.get_genomes_from_assembly_accession_iterator(
self.db, request.assembly_accession, request.release_version
)

def GetSubSpeciesInformation(self, request, context):
logger.debug(f"Received RPC for GetSubSpeciesInformation with request: {request}")
return utils.get_sub_species_info(self.db, request.organism_uuid, request.group)

def GetTopLevelStatistics(self, request, context):
logger.debug(f"Received RPC for GetTopLevelStatistics with request: {request}")
return utils.get_top_level_statistics(self.db, request.organism_uuid, request.group)

def GetTopLevelStatisticsByUUID(self, request, context):
logger.debug(f"Received RPC for GetTopLevelStatisticsByUUID with request: {request}")
return utils.get_top_level_statistics_by_uuid(self.db, request.genome_uuid)

def GetGenomeUUID(self, request, context):
logger.debug(f"Received RPC for GetGenomeUUID with request: {request}")
return utils.get_genome_uuid(self.db, request.production_name, request.assembly_name, request.use_default)

def GetGenomeByUUID(self, request, context):
logger.debug(f"Received RPC for GetGenomeByUUID with request: {request}")
return utils.get_genome_by_uuid(self.db, request.genome_uuid, request.release_version)

def GetGenomesByKeyword(self, request, context):
logger.debug(f"Received RPC for GetGenomesByKeyword with request: {request}")
return utils.get_genomes_by_keyword_iterator(
self.db, request.keyword, request.release_version
)

def GetGenomeByName(self, request, context):
logger.debug(f"Received RPC for GetGenomeByName with request: {request}")
return utils.get_genome_by_name(
self.db, request.ensembl_name, request.site_name, request.release_version
)

def GetRelease(self, request, context):
logger.debug(f"Received RPC for GetRelease with request: {request}")
return utils.release_iterator(
self.db, request.site_name, request.release_version, request.current_only
)

def GetReleaseByUUID(self, request, context):
logger.debug(f"Received RPC for GetReleaseByUUID with request: {request}")
return utils.release_by_uuid_iterator(self.db, request.genome_uuid)

def GetGenomeSequence(self, request, context):
logger.debug(f"Received RPC for GetGenomeSequence with request: {request}")
return utils.genome_sequence_iterator(
self.db, request.genome_uuid, request.chromosomal_only
)

def GetAssemblyRegion(self, request, context):
logger.debug(f"Received RPC for GetAssemblyRegion with request: {request}")
return utils.assembly_region_iterator(
self.db, request.genome_uuid, request.chromosomal_only
)

def GetGenomeAssemblySequenceRegion(self, request, context):
logger.debug(f"Received RPC for GetGenomeAssemblySequenceRegion with request: {request}")
return utils.genome_assembly_sequence_region(
self.db, request.genome_uuid, request.sequence_region_name
)

def GetDatasetsListByUUID(self, request, context):
logger.debug(f"Received RPC for GetDatasetsListByUUID with request: {request}")
return utils.get_datasets_list_by_uuid(
self.db, request.genome_uuid, request.release_version
)

def GetDatasetInformation(self, request, context):
logger.debug(f"Received RPC for GetDatasetInformation with request: {request}")
return utils.get_dataset_by_genome_and_dataset_type(
self.db, request.genome_uuid, request.dataset_type
)

def GetOrganismsGroupCount(self, request, context):
logger.debug(f"Received RPC for GetOrganismsGroupCount with request: {request}")
return utils.get_organisms_group_count(
self.db, request.release_version
)

def GetGenomeUUIDByTag(self, request, context):
logger.debug(f"Received RPC for GetGenomeUUIDByTag with request: {request}")
return utils.get_genome_uuid_by_tag(self.db, request.genome_tag)

def GetFTPLinks(self, request, context):
return utils.get_ftp_links(self.db, request.genome_uuid, request.dataset_type, request.release_version)

def GetReleaseVersionByUUID(self, request, context):
logger.debug(f"Received RPC for GetReleaseVersionByUUID with request: {request}")
return utils.get_release_version_by_uuid(
self.db, request.genome_uuid, request.dataset_type, request.release_version
)
Loading