Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 33 additions & 22 deletions src/ensembl/production/metadata/api/factories/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from ensembl.production.metadata.api.exceptions import *
from ensembl.production.metadata.api.models import Dataset, Genome, GenomeDataset, \
DatasetType
DatasetType, DatasetStatus
from ensembl.production.metadata.updater.updater_utils import update_attributes


Expand All @@ -33,6 +33,9 @@ def create_all_child_datasets(self, session, dataset_uuid):
def create_dataset(self, session, genome_input, dataset_source, dataset_type, dataset_attributes, name, label,
version, status="Submitted"):
# Check if genome_input is a UUID (string) or a Genome object
if isinstance(status, str):
status = DatasetStatus(status)

if isinstance(genome_input, str):
genome = session.query(Genome).filter(Genome.genome_uuid == genome_input).one()
elif isinstance(genome_input, Genome):
Expand Down Expand Up @@ -82,6 +85,8 @@ def get_parent_datasets(self, dataset_uuid, **kwargs):
raise DatasetFactoryException("session or metadata_uri are required")

def update_dataset_status(self, dataset_uuid, status, **kwargs):
if isinstance(status, str):
status = DatasetStatus(status)
updated_datasets = [(dataset_uuid, status)]
session = kwargs.get('session')
metadata_uri = kwargs.get('metadata_uri')
Expand All @@ -101,7 +106,7 @@ def update_dataset_status(self, dataset_uuid, status, **kwargs):
return updated_datasets

def update_dataset_attributes(self, dataset_uuid, attribute_dict, **kwargs):
#TODO ADD DELETE opiton to kwargs to redo dataset_attributes.
# TODO ADD DELETE opiton to kwargs to redo dataset_attributes.
session = kwargs.get('session')
metadata_uri = kwargs.get('metadata_uri')
if not isinstance(attribute_dict, dict):
Expand All @@ -118,6 +123,8 @@ def update_dataset_attributes(self, dataset_uuid, attribute_dict, **kwargs):
return dataset_attributes

def get_genomes_by_status_and_type(self, status, dataset_type, **kwargs):
if isinstance(status, str):
status = DatasetStatus(status)
session = kwargs.get('session')
metadata_uri = kwargs.get('metadata_uri')
if session:
Expand Down Expand Up @@ -170,7 +177,6 @@ def __create_child_datasets_recursive(self, session, parent_dataset):
child_dataset = self.__get_dataset(session, child_dataset_uuid)
self.__create_child_datasets_recursive(session, child_dataset)


def __query_parent_datasets(self, session, dataset_uuid):
dataset = self.__get_dataset(session, dataset_uuid)
dataset_type = session.query(DatasetType).filter(
Expand Down Expand Up @@ -266,49 +272,51 @@ def __update_status(self, session, dataset_uuid, status):
# Then convert all to released. #Add a blocker and warning in here.
current_dataset = session.query(Dataset).filter(Dataset.dataset_uuid == dataset_uuid).one()
updated_datasets = (dataset_uuid, current_dataset.status)
#if released
if status == "Submitted":
# if released
if isinstance(status, str):
status = DatasetStatus(status)
if status == DatasetStatus.SUBMITTED: #"Submitted":
# Update to SUBMITTED and all parents.
# Do not touch the children.
# This should only be called in times of strife and error.
current_dataset.status = "Submitted"
current_dataset.status = DatasetStatus.SUBMITTED # "Submitted"
parent_uuid, parent_status = self.__query_parent_datasets(session, dataset_uuid)
if parent_uuid is not None:
self.__update_status(session, parent_uuid, "Submitted")
self.__update_status(session, parent_uuid, DatasetStatus.SUBMITTED) # "Submitted")

elif status == "Processing":
elif status == DatasetStatus.PROCESSING: #"Processing":
# Update to PROCESSING and all parents.
# Do not touch the children.
if current_dataset.status == "Released": # and it is not top level.
if current_dataset.status == DatasetStatus.RELEASED: #"Released": # and it is not top level.
return updated_datasets
# Check the dependents
dependents = self.__query_depends_on(session, dataset_uuid)
for uuid, dep_status in dependents:
if dep_status not in ("Processed", "Released"):
if dep_status not in (DatasetStatus.PROCESSED, DatasetStatus.RELEASED): #("Processed", "Released"):
return updated_datasets
current_dataset.status = "Processing"
current_dataset.status = DatasetStatus.PROCESSING # "Processing"
parent_uuid, parent_status = self.__query_parent_datasets(session, dataset_uuid)
if parent_uuid is not None:
self.__update_status(session, parent_uuid, "Processing")
self.__update_status(session, parent_uuid, DatasetStatus.PROCESSING) #"Processing")

elif status == "Processed":
if current_dataset.status == "Released": #and it is not top level.
elif status == DatasetStatus.PROCESSED: #"Processed":
if current_dataset.status == DatasetStatus.RELEASED: # "Released": # and it is not top level.
return updated_datasets
# Get children
children_uuid = self.__query_child_datasets(session, dataset_uuid)
# Check to see if any are still processing or submitted
for child, child_status in children_uuid:
if child_status in ("Processing", "Submitted"):
if child_status in (DatasetStatus.PROCESSING, DatasetStatus.SUBMITTED): #("Processing", "Submitted"):
return updated_datasets
# Update current dataset if all the children are updated.
current_dataset.status = "Processed"
current_dataset.status = DatasetStatus.PROCESSED#"Processed"
# Check if parent needs to be updated
parent_uuid, parent_status = self.__query_parent_datasets(session, dataset_uuid)
if parent_uuid is not None:
self.__update_status(session, parent_uuid, "Processed")
self.__update_status(session, parent_uuid, DatasetStatus.PROCESSED) #"Processed")

elif status == "Released":
#TODO: Check that you are top level. Then check all children are ready to release.
elif status == DatasetStatus.RELEASED: #"Released":
# TODO: Check that you are top level. Then check all children are ready to release.
# Get current datasets chain top level.
top_level_uuid = self.__query_top_level_parent(session, dataset_uuid)
# Check that all children and sub children etc
Expand All @@ -320,15 +328,16 @@ def __update_status(self, session, dataset_uuid, status):

# Update if all datasets in it's chain are processed, all genebuild and assembly are processed. Else return error.
for child_uuid, child_status in top_level_children:
if child_status != "Released" and child_status != "Processed":
# if child_status != "Released" and child_status != "Processed":
if child_status not in (DatasetStatus.RELEASED, DatasetStatus.PROCESSED): #
child_dataset = session.query(Dataset).filter(Dataset.dataset_uuid == child_uuid).one()
raise DatasetFactoryException(
f"Dataset {child_uuid} is not released or processed. It is {child_status}")
top_level_children = self.__query_all_child_datasets(session, top_level_uuid)
for child_uuid, child_status in top_level_children:
child_dataset = session.query(Dataset).filter(Dataset.dataset_uuid == child_uuid).one()
child_dataset.status = "Released"
current_dataset.status = "Released"
child_dataset.status = DatasetStatus.RELEASED # "Released"
current_dataset.status = DatasetStatus.RELEASED # "Released"
else:
raise DatasetFactoryException(f"Dataset status: {status} is not a vallid status")
updated_datasets = (current_dataset.dataset_uuid, current_dataset.status)
Expand All @@ -342,6 +351,8 @@ def __query_genomes_by_status_and_type(self, session, status, dataset_type):
raise ValueError("Session is not provided")

# Filter by Dataset status and DatasetType name
if isinstance(status, str):
status = DatasetStatus(status)
query = session.query(
Genome.genome_uuid,
Genome.production_name,
Expand Down
20 changes: 18 additions & 2 deletions src/ensembl/production/metadata/api/models/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,35 @@
import datetime
import logging
import uuid
import enum

import sqlalchemy
from sqlalchemy import Column, Integer, String, Enum, text, ForeignKey, Index, JSON
from sqlalchemy.dialects.mysql import DATETIME
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from sqlalchemy.types import Enum

from ensembl.production.metadata.api.exceptions import MissingMetaException
from ensembl.production.metadata.api.models.base import Base, LoadAble

logger = logging.getLogger(__name__)


class DatasetStatus(enum.Enum):
SUBMITTED = "Submitted"
PROCESSING = "Processing"
PROCESSED = "Processed"
RELEASED = "Released"


DatasetStatusType = sqlalchemy.types.Enum(
DatasetStatus,
name='dataset_status',
values_callable=lambda obj: [e.value for e in obj]
)


class Attribute(LoadAble, Base):
__tablename__ = 'attribute'

Expand All @@ -36,7 +53,6 @@ class Attribute(LoadAble, Base):
# attribute_id within dataset attribute
dataset_attributes = relationship("DatasetAttribute", back_populates='attribute')
# many to one relationships
# none


class Dataset(LoadAble, Base):
Expand All @@ -50,7 +66,7 @@ class Dataset(LoadAble, Base):
created = Column(DATETIME(fsp=6), server_default=func.now(), default=datetime.datetime.utcnow)
dataset_source_id = Column(ForeignKey('dataset_source.dataset_source_id'), nullable=False, index=True)
label = Column(String(128), nullable=False)
status = Column(Enum('Submitted', 'Processing', 'Processed', 'Released'), server_default=text('Submitted'))
status = Column(DatasetStatusType, server_default=text('Submitted'))

# One to many relationships
# dataset_id to dataset attribute and genome dataset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -887,4 +887,6 @@
2518 679d6452-799c-4a2f-8906-0db6c639e498 regulatory_features 1.0 2023-11-15 15:07:12.410801 Regulatory Annotation 670 7 Submitted \N
2519 bee4dca6-894e-4017-8376-0b13675de93a regulatory_features 1.0 2023-11-15 15:07:13.156918 Regulatory Annotation 671 7 Submitted \N
2520 fc5d3e13-340c-4e2a-9f49-256fc319331e regulatory_features 1.0 2023-11-15 15:07:13.825117 Regulatory Annotation 672 7 Submitted \N
\N
2522 747e14d7-4b82-4190-9899-d196e489ee18 xrefs \N 2024-03-12 09:44:26.000000 Child of genebuild 1 8 Processed \N
2523 fd45b1cf-a948-4556-9ce6-40c65371e4ad protein_features \N 2024-03-12 09:44:26.000000 Child of genebuild 1 9 Submitted \N
2524 747e14d7-4b82-4190-9899-d196e489ee20 xrefs \N 2024-03-12 09:44:26.000000 Child of genebuild 1 8 Submitted \N
Original file line number Diff line number Diff line change
Expand Up @@ -887,3 +887,6 @@
2459 1 2518 86 1
2460 1 2519 167 \N
2461 1 2520 211 \N
2462 1 2524 2 \N
2463 1 2522 3 \N
2464 1 2523 3 \N
60 changes: 41 additions & 19 deletions src/tests/test_dataset_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from ensembl.production.metadata.api.factories.datasets import DatasetFactory
from ensembl.production.metadata.api.models import (Dataset, DatasetAttribute, Attribute, DatasetSource, DatasetType,
GenomeDataset, Genome)
GenomeDataset, Genome, DatasetStatus)

db_directory = Path(__file__).parent / 'databases'
db_directory = db_directory.resolve()
Expand Down Expand Up @@ -110,7 +110,7 @@ def test_create_genebuild_children(self, multi_dbs):
session.commit()
data = session.query(Dataset).join(DatasetType).filter(
DatasetType.name == 'genome_browser_track').one()
assert data.status == "Submitted"
assert data.status == DatasetStatus.SUBMITTED # "Submitted"
# test get parent
test_parent, test_status = dataset_factory.get_parent_datasets(data.dataset_uuid, session=session)
assert test_parent == genebuild_uuid
Expand Down Expand Up @@ -140,30 +140,31 @@ def test_update_dataset_status(self, multi_dbs):
xref_uuid = xref_uuid[0]
# Processing
# Fail to update protein_features
temp, failed_status = dataset_factory.update_dataset_status(protfeat_uuid, "Processing",
temp, failed_status = dataset_factory.update_dataset_status(protfeat_uuid, DatasetStatus.PROCESSING,
session=session)
session.commit()
failed_status_check = session.query(Dataset.status).filter(Dataset.dataset_uuid == protfeat_uuid).one()
assert failed_status == "Submitted"
assert failed_status_check[0] == "Submitted"
assert failed_status == DatasetStatus.SUBMITTED # "Submitted"
assert failed_status_check[0] == DatasetStatus.SUBMITTED # "Submitted"
# succeed on xref
temp, succeed_status = dataset_factory.update_dataset_status(xref_uuid, "Processing",
temp, succeed_status = dataset_factory.update_dataset_status(xref_uuid, DatasetStatus.PROCESSING,
session=session)
session.commit()
succeed_status_check = session.query(Dataset.status).filter(Dataset.dataset_uuid == xref_uuid).one()
genebuild_status_check = session.query(Dataset.status).filter(Dataset.dataset_uuid == genebuild_uuid).one()
assert succeed_status == "Processing"
assert succeed_status_check[0] == "Processing"
assert genebuild_status_check[0] == "Processing"
assert succeed_status == DatasetStatus.PROCESSING # "Processing"
assert succeed_status_check[0] == DatasetStatus.PROCESSING # "Processing"
assert genebuild_status_check[0] == DatasetStatus.PROCESSING # "Processing"

# Processed
# Fail to update genebuild
temp, failed_status = dataset_factory.update_dataset_status(genebuild_uuid, "Processed",
temp, failed_status = dataset_factory.update_dataset_status(genebuild_uuid, DatasetStatus.PROCESSED,
# "Processed",
session=session)
session.commit()
genebuild_status_check = session.query(Dataset.status).filter(Dataset.dataset_uuid == genebuild_uuid).one()
assert failed_status == "Processing"
assert genebuild_status_check[0] == "Processing"
assert failed_status == DatasetStatus.PROCESSING # "Processing"
assert genebuild_status_check[0] == DatasetStatus.PROCESSING # "Processing"
# Change all the children
child_dataset_uuids = session.query(Dataset.dataset_uuid) \
.join(GenomeDataset, GenomeDataset.dataset_id == Dataset.dataset_id) \
Expand All @@ -173,21 +174,42 @@ def test_update_dataset_status(self, multi_dbs):
.filter(DatasetType.name != "genebuild").all()
for temp_uuid in child_dataset_uuids:
temp_uuid = temp_uuid[0]
dataset_factory.update_dataset_status(temp_uuid, "Processed", session=session)
dataset_factory.update_dataset_status(temp_uuid, DatasetStatus.PROCESSED,
session=session) # "Processed", session=session)
session.commit()
genebuild_status_check = session.query(Dataset.status).filter(
Dataset.dataset_uuid == genebuild_uuid).one()
assert genebuild_status_check[0] == "Processed"
dataset_factory.update_dataset_status(genebuild_uuid, "Released", session=session)
assert genebuild_status_check[0] == DatasetStatus.PROCESSED # "Processed"
dataset_factory.update_dataset_status(genebuild_uuid, DatasetStatus.RELEASED,
session=session) # "Released", session=session)
session.commit()
genebuild_status_check = session.query(Dataset.status).filter(
Dataset.dataset_uuid == genebuild_uuid).one()
assert genebuild_status_check[0] == "Released"
assert genebuild_status_check[0] == DatasetStatus.RELEASED # "Released"
protfeat_status_check = session.query(Dataset.status).filter(Dataset.dataset_uuid == protfeat_uuid).one()
assert protfeat_status_check[0] == "Released"
assert protfeat_status_check[0] == DatasetStatus.RELEASED # "Released"

# Check for submitted change
dataset_factory.update_dataset_status(protfeat_uuid, "Submitted", session=session)
dataset_factory.update_dataset_status(protfeat_uuid, DatasetStatus.SUBMITTED,
session=session) # "Submitted", session=session)
session.commit()
submitted_status = session.query(Dataset.status).filter(Dataset.dataset_uuid == protfeat_uuid).one()
assert submitted_status[0] == "Submitted"
assert submitted_status[0] == DatasetStatus.SUBMITTED # "Submitted"

def test_str_status(self, multi_dbs):
metadata_db = DBConnection(multi_dbs['ensembl_genome_metadata'].dbc.url)
with metadata_db.session_scope() as session:
ds_factory = DatasetFactory()

datasets = ds_factory.get_genomes_by_status_and_type(DatasetStatus.SUBMITTED, dataset_type='genebuild',
session=session)
datasets_2 = ds_factory.get_genomes_by_status_and_type('Submitted', 'genebuild', session=session)
assert datasets == datasets_2
xref_uuid = '747e14d7-4b82-4190-9899-d196e489ee20'
temp, succeed_status = ds_factory.update_dataset_status(xref_uuid, 'Processing',
session=session)
session.commit()
succeed_status_check = session.query(Dataset.status).filter(Dataset.dataset_uuid == xref_uuid).one()
assert succeed_status == succeed_status_check[0]


Loading