Skip to content

Commit

Permalink
Broadcast domain status during upgrade process
Browse files Browse the repository at this point in the history
  • Loading branch information
mmacfadden committed Jul 11, 2021
1 parent 8ae7205 commit bd00b69
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 17 deletions.
Expand Up @@ -139,7 +139,9 @@ private[server] final class BackendServices(context: ActorContext[_],
* database.
* @return Success if the actors are started; a failure otherwise.
*/
private[this] def startActors(persistenceConfig: Config, convergenceDbProvider: DatabaseProvider): Try[Unit] = Try {
private[this] def startActors(persistenceConfig: Config,
convergenceDbProvider: DatabaseProvider,
): Try[Unit] = Try {
createUserSessionTokenReaperActor(convergenceDbProvider)
createDatabaseManager(persistenceConfig, convergenceDbProvider)
createStoreActors(persistenceConfig, convergenceDbProvider)
Expand Down Expand Up @@ -211,7 +213,8 @@ private[server] final class BackendServices(context: ActorContext[_],
context.spawn(UserFavoriteDomainStoreActor(favoriteDomainStore), "FavoriteDomains")
}

private[this] def createDatabaseManager(persistenceConfig: Config, convergenceDbProvider: DatabaseProvider): ActorRef[DatabaseManagerActor.Message] = {
private[this] def createDatabaseManager(persistenceConfig: Config,
convergenceDbProvider: DatabaseProvider): ActorRef[DatabaseManagerActor.Message] = {
val dbServerConfig = persistenceConfig.getConfig("server")
val convergenceRepo = new SchemaMetaDataRepository(ConvergenceSchemaManager.BasePath)
val domainRepo = new SchemaMetaDataRepository(DomainSchemaManager.BasePath)
Expand All @@ -230,7 +233,8 @@ private[server] final class BackendServices(context: ActorContext[_],
convergenceDbProvider,
convergenceSchemaVersion,
domainSchemaVersion)
context.spawn(DatabaseManagerActor(databaseManager), name = "DatabaseManager")
val domainStore = new DomainStore(convergenceDbProvider)
context.spawn(DatabaseManagerActor(databaseManager, domainStore, domainLifecycleTopic), name = "DatabaseManager")
}).fold({ err =>
throw new IllegalStateException(err.toString)
}, actor => actor)
Expand Down
Expand Up @@ -16,6 +16,7 @@ import com.convergencelabs.convergence.server.backend.db.schema.SchemaManager.Sc
import com.convergencelabs.convergence.server.backend.db.{DatabaseProvider, DomainDatabaseFactory}
import com.convergencelabs.convergence.server.backend.services.domain.DomainPersistenceManagerActor.DomainNotFoundException
import com.convergencelabs.convergence.server.model.DomainId
import com.convergencelabs.convergence.server.model.server.domain.DomainStatus
import grizzled.slf4j.Logging

import scala.util.{Failure, Success, Try}
Expand Down
Expand Up @@ -11,20 +11,25 @@

package com.convergencelabs.convergence.server.backend.services.server

import akka.actor.typed.pubsub.Topic.Publish
import akka.actor.typed.receptionist.{Receptionist, ServiceKey}
import akka.actor.typed.scaladsl.{AbstractBehavior, ActorContext, Behaviors}
import akka.actor.typed.{ActorRef, Behavior}
import com.convergencelabs.convergence.common.Ok
import com.convergencelabs.convergence.server.backend.datastore.EntityNotFoundException
import com.convergencelabs.convergence.server.backend.datastore.convergence.{ConvergenceSchemaDeltaLogEntry, ConvergenceSchemaVersionLogEntry, DomainSchemaDeltaLogEntry, DomainSchemaVersionLogEntry}
import com.convergencelabs.convergence.server.backend.datastore.convergence.{ConvergenceSchemaDeltaLogEntry, ConvergenceSchemaVersionLogEntry, DomainSchemaDeltaLogEntry, DomainSchemaVersionLogEntry, DomainStore}
import com.convergencelabs.convergence.server.backend.db.schema.{DatabaseManager, DatabaseSchemaStatus}
import com.convergencelabs.convergence.server.backend.services.server.DomainLifecycleTopic.DomainStatusChanged
import com.convergencelabs.convergence.server.model.DomainId
import com.convergencelabs.convergence.server.model.server.domain.DomainStatus
import com.convergencelabs.convergence.server.util.serialization.akka.CborSerializable
import com.fasterxml.jackson.annotation.{JsonSubTypes, JsonTypeInfo}
import grizzled.slf4j.Logging

private final class DatabaseManagerActor(context: ActorContext[DatabaseManagerActor.Message],
databaseManager: DatabaseManager)
databaseManager: DatabaseManager,
domainStore: DomainStore,
domainLifecycleTopic: ActorRef[DomainLifecycleTopic.TopicMessage])
extends AbstractBehavior[DatabaseManagerActor.Message](context) with Logging {

import DatabaseManagerActor._
Expand Down Expand Up @@ -100,12 +105,20 @@ private final class DatabaseManagerActor(context: ActorContext[DatabaseManagerAc
databaseManager.upgradeConvergence()

case UpgradeDomainRequest(domainId, replyTo) =>
domainLifecycleTopic ! Publish(DomainStatusChanged(domainId, DomainStatus.SchemaUpgrading))

replyTo ! UpgradeDomainResponse(Right(Ok()))

databaseManager.upgradeDomain(domainId)

case UpgradeDomainsRequest(replyTo) =>
replyTo ! UpgradeDomainsResponse(Right(Ok()))
databaseManager.upgradeAllDomains()
// No matter what happened, we want to broadcast the current status.
domainStore.getDomain(domainId)
.map { domain =>
domainLifecycleTopic ! Publish(DomainStatusChanged(domainId, domain.status))
}
.recover { e =>
error(s"Could not get the current domain status after a domain upgrade for domain: ${domainId.namespace}/${domainId.domainId}", e)
}
}

Behaviors.same
Expand All @@ -116,8 +129,10 @@ object DatabaseManagerActor {

val Key: ServiceKey[Message] = ServiceKey[Message]("DatabaseManagerActor")

def apply(schemaManager: DatabaseManager): Behavior[Message] =
Behaviors.setup(context => new DatabaseManagerActor(context, schemaManager))
def apply(schemaManager: DatabaseManager,
domainStore: DomainStore,
domainLifecycleTopic: ActorRef[DomainLifecycleTopic.TopicMessage]): Behavior[Message] =
Behaviors.setup(context => new DatabaseManagerActor(context, schemaManager, domainStore, domainLifecycleTopic))

/////////////////////////////////////////////////////////////////////////////
// Message Protocol
Expand Down Expand Up @@ -199,13 +214,6 @@ object DatabaseManagerActor {

final case class UpgradeDomainResponse(response: Either[UnknownError, Ok]) extends CborSerializable

//
// UpgradeDomainsRequest
//
final case class UpgradeDomainsRequest(replyTo: ActorRef[UpgradeDomainsResponse]) extends Message

final case class UpgradeDomainsResponse(response: Either[UnknownError, Ok]) extends CborSerializable

final case class UnknownError() extends AnyRef
with GetDomainSchemaStatusError
with GetConvergenceSchemaStatusError
Expand Down

0 comments on commit bd00b69

Please sign in to comment.