diff --git a/src/main/scala/com/convergencelabs/convergence/server/backend/BackendServices.scala b/src/main/scala/com/convergencelabs/convergence/server/backend/BackendServices.scala index ad8e1169..b5bcaa9b 100644 --- a/src/main/scala/com/convergencelabs/convergence/server/backend/BackendServices.scala +++ b/src/main/scala/com/convergencelabs/convergence/server/backend/BackendServices.scala @@ -139,7 +139,9 @@ private[server] final class BackendServices(context: ActorContext[_], * database. * @return Success if the actors are started; a failure otherwise. */ - private[this] def startActors(persistenceConfig: Config, convergenceDbProvider: DatabaseProvider): Try[Unit] = Try { + private[this] def startActors(persistenceConfig: Config, + convergenceDbProvider: DatabaseProvider, + ): Try[Unit] = Try { createUserSessionTokenReaperActor(convergenceDbProvider) createDatabaseManager(persistenceConfig, convergenceDbProvider) createStoreActors(persistenceConfig, convergenceDbProvider) @@ -211,7 +213,8 @@ private[server] final class BackendServices(context: ActorContext[_], context.spawn(UserFavoriteDomainStoreActor(favoriteDomainStore), "FavoriteDomains") } - private[this] def createDatabaseManager(persistenceConfig: Config, convergenceDbProvider: DatabaseProvider): ActorRef[DatabaseManagerActor.Message] = { + private[this] def createDatabaseManager(persistenceConfig: Config, + convergenceDbProvider: DatabaseProvider): ActorRef[DatabaseManagerActor.Message] = { val dbServerConfig = persistenceConfig.getConfig("server") val convergenceRepo = new SchemaMetaDataRepository(ConvergenceSchemaManager.BasePath) val domainRepo = new SchemaMetaDataRepository(DomainSchemaManager.BasePath) @@ -230,7 +233,8 @@ private[server] final class BackendServices(context: ActorContext[_], convergenceDbProvider, convergenceSchemaVersion, domainSchemaVersion) - context.spawn(DatabaseManagerActor(databaseManager), name = "DatabaseManager") + val domainStore = new DomainStore(convergenceDbProvider) + context.spawn(DatabaseManagerActor(databaseManager, domainStore, domainLifecycleTopic), name = "DatabaseManager") }).fold({ err => throw new IllegalStateException(err.toString) }, actor => actor) diff --git a/src/main/scala/com/convergencelabs/convergence/server/backend/db/schema/DatabaseManager.scala b/src/main/scala/com/convergencelabs/convergence/server/backend/db/schema/DatabaseManager.scala index 04a83e52..32fc2352 100644 --- a/src/main/scala/com/convergencelabs/convergence/server/backend/db/schema/DatabaseManager.scala +++ b/src/main/scala/com/convergencelabs/convergence/server/backend/db/schema/DatabaseManager.scala @@ -16,6 +16,7 @@ import com.convergencelabs.convergence.server.backend.db.schema.SchemaManager.Sc import com.convergencelabs.convergence.server.backend.db.{DatabaseProvider, DomainDatabaseFactory} import com.convergencelabs.convergence.server.backend.services.domain.DomainPersistenceManagerActor.DomainNotFoundException import com.convergencelabs.convergence.server.model.DomainId +import com.convergencelabs.convergence.server.model.server.domain.DomainStatus import grizzled.slf4j.Logging import scala.util.{Failure, Success, Try} diff --git a/src/main/scala/com/convergencelabs/convergence/server/backend/services/server/DatabaseManagerActor.scala b/src/main/scala/com/convergencelabs/convergence/server/backend/services/server/DatabaseManagerActor.scala index 28979356..26d5f100 100644 --- a/src/main/scala/com/convergencelabs/convergence/server/backend/services/server/DatabaseManagerActor.scala +++ b/src/main/scala/com/convergencelabs/convergence/server/backend/services/server/DatabaseManagerActor.scala @@ -11,20 +11,25 @@ package com.convergencelabs.convergence.server.backend.services.server +import akka.actor.typed.pubsub.Topic.Publish import akka.actor.typed.receptionist.{Receptionist, ServiceKey} import akka.actor.typed.scaladsl.{AbstractBehavior, ActorContext, Behaviors} import akka.actor.typed.{ActorRef, Behavior} import com.convergencelabs.convergence.common.Ok import com.convergencelabs.convergence.server.backend.datastore.EntityNotFoundException -import com.convergencelabs.convergence.server.backend.datastore.convergence.{ConvergenceSchemaDeltaLogEntry, ConvergenceSchemaVersionLogEntry, DomainSchemaDeltaLogEntry, DomainSchemaVersionLogEntry} +import com.convergencelabs.convergence.server.backend.datastore.convergence.{ConvergenceSchemaDeltaLogEntry, ConvergenceSchemaVersionLogEntry, DomainSchemaDeltaLogEntry, DomainSchemaVersionLogEntry, DomainStore} import com.convergencelabs.convergence.server.backend.db.schema.{DatabaseManager, DatabaseSchemaStatus} +import com.convergencelabs.convergence.server.backend.services.server.DomainLifecycleTopic.DomainStatusChanged import com.convergencelabs.convergence.server.model.DomainId +import com.convergencelabs.convergence.server.model.server.domain.DomainStatus import com.convergencelabs.convergence.server.util.serialization.akka.CborSerializable import com.fasterxml.jackson.annotation.{JsonSubTypes, JsonTypeInfo} import grizzled.slf4j.Logging private final class DatabaseManagerActor(context: ActorContext[DatabaseManagerActor.Message], - databaseManager: DatabaseManager) + databaseManager: DatabaseManager, + domainStore: DomainStore, + domainLifecycleTopic: ActorRef[DomainLifecycleTopic.TopicMessage]) extends AbstractBehavior[DatabaseManagerActor.Message](context) with Logging { import DatabaseManagerActor._ @@ -100,12 +105,20 @@ private final class DatabaseManagerActor(context: ActorContext[DatabaseManagerAc databaseManager.upgradeConvergence() case UpgradeDomainRequest(domainId, replyTo) => + domainLifecycleTopic ! Publish(DomainStatusChanged(domainId, DomainStatus.SchemaUpgrading)) + replyTo ! UpgradeDomainResponse(Right(Ok())) + databaseManager.upgradeDomain(domainId) - case UpgradeDomainsRequest(replyTo) => - replyTo ! UpgradeDomainsResponse(Right(Ok())) - databaseManager.upgradeAllDomains() + // No matter what happened, we want to broadcast the current status. + domainStore.getDomain(domainId) + .map { domain => + domainLifecycleTopic ! Publish(DomainStatusChanged(domainId, domain.status)) + } + .recover { e => + error(s"Could not get the current domain status after a domain upgrade for domain: ${domainId.namespace}/${domainId.domainId}", e) + } } Behaviors.same @@ -116,8 +129,10 @@ object DatabaseManagerActor { val Key: ServiceKey[Message] = ServiceKey[Message]("DatabaseManagerActor") - def apply(schemaManager: DatabaseManager): Behavior[Message] = - Behaviors.setup(context => new DatabaseManagerActor(context, schemaManager)) + def apply(schemaManager: DatabaseManager, + domainStore: DomainStore, + domainLifecycleTopic: ActorRef[DomainLifecycleTopic.TopicMessage]): Behavior[Message] = + Behaviors.setup(context => new DatabaseManagerActor(context, schemaManager, domainStore, domainLifecycleTopic)) ///////////////////////////////////////////////////////////////////////////// // Message Protocol @@ -199,13 +214,6 @@ object DatabaseManagerActor { final case class UpgradeDomainResponse(response: Either[UnknownError, Ok]) extends CborSerializable - // - // UpgradeDomainsRequest - // - final case class UpgradeDomainsRequest(replyTo: ActorRef[UpgradeDomainsResponse]) extends Message - - final case class UpgradeDomainsResponse(response: Either[UnknownError, Ok]) extends CborSerializable - final case class UnknownError() extends AnyRef with GetDomainSchemaStatusError with GetConvergenceSchemaStatusError