Skip to content

Commit

Permalink
Implemented the ability to get a models version at a specified time.
Browse files Browse the repository at this point in the history
  • Loading branch information
mmacfadden committed Jul 11, 2021
1 parent c83e42f commit e0d2c8f
Show file tree
Hide file tree
Showing 5 changed files with 234 additions and 51 deletions.
Expand Up @@ -58,6 +58,8 @@ private final class HistoricModelClientActor(context: ActorContext[HistoricModel
onDataRequest(dataRequest, replyCallback)
case operationRequest: HistoricalOperationRequestMessage =>
onOperationRequest(operationRequest, replyCallback)
case getVersion: ModelGetVersionAtTimeRequestMessage =>
onGetVersionAtTimeRequestMessage(getVersion, replyCallback)
}
}

Expand Down Expand Up @@ -102,6 +104,31 @@ private final class HistoricModelClientActor(context: ActorContext[HistoricModel
}))
.recoverWith(handleAskFailure(_, cb))
}

private[this] def onGetVersionAtTimeRequestMessage(request: ModelGetVersionAtTimeRequestMessage, cb: ReplyCallback): Unit = {
val ModelGetVersionAtTimeRequestMessage(modelId, targetTime, _) = request
targetTime match {
case Some(time) =>
operationStoreActor.ask[ModelOperationServiceActor.GetVersionAtTimeResponse](
ModelOperationServiceActor.GetVersionAtTimeRequest(domain, request.modelId, timestampToInstant(time), _))
.map(_.version.fold(
{
case ModelOperationServiceActor.ModelNotFoundError() =>
cb.expectedError(ErrorCodes.ModelNotFound, s"A model with id '$modelId' does not exist.")
case ModelOperationServiceActor.UnknownError() =>
cb.unexpectedError("Unexpected error getting historical model operations.")
case ModelOperationServiceActor.InvalidModelTime(msg) =>
cb.unexpectedError(msg)
},
{ version =>
val response = ModelGetVersionAtTimeResponseMessage(version.version)
cb.reply(response)
}))
.recoverWith(handleAskFailure(_, cb))
case None =>
}

}
}

object HistoricModelClientActor {
Expand Down
Expand Up @@ -90,6 +90,8 @@ private[realtime] object ConvergenceMessageBodyUtils {
case Body.HistoricalDataResponse(message) => message
case Body.HistoricalOperationsRequest(message) => message
case Body.HistoricalOperationsResponse(message) => message
case Body.ModelGetVersionAtTimeRequest(message) => message
case Body.ModelGetVersionAtTimeResponse(message) => message

// Identity
case Body.UsersGetRequest(message) => message
Expand Down Expand Up @@ -232,10 +234,14 @@ private[realtime] object ConvergenceMessageBodyUtils {
case message: RemoteReferenceUnsharedMessage => Body.ReferenceUnshared(message)
case message: ModelsQueryRequestMessage => Body.ModelsQueryRequest(message)
case message: ModelsQueryResponseMessage => Body.ModelsQueryResponse(message)

case message: HistoricalDataRequestMessage => Body.HistoricalDataRequest(message)
case message: HistoricalDataResponseMessage => Body.HistoricalDataResponse(message)
case message: HistoricalOperationRequestMessage => Body.HistoricalOperationsRequest(message)
case message: HistoricalOperationsResponseMessage => Body.HistoricalOperationsResponse(message)
case message: ModelGetVersionAtTimeRequestMessage => Body.ModelGetVersionAtTimeRequest(message)
case message: ModelGetVersionAtTimeResponseMessage => Body.ModelGetVersionAtTimeResponse(message)

case message: GetModelPermissionsRequestMessage => Body.GetModelPermissionsRequest(message)
case message: GetModelPermissionsResponseMessage => Body.GetModelPermissionsResponse(message)
case message: SetModelPermissionsRequestMessage => Body.SetModelPermissionsRequest(message)
Expand Down
Expand Up @@ -31,42 +31,63 @@ class ModelOperationStore private[domain](dbProvider: DatabaseProvider)
import ModelOperationStore._
import schema.ModelOperationClass._

private[this] val GetMaxVersionQuery = "SELECT max(version) as max FROM ModelOperation WHERE model.id = :modelId"

def getMaxVersion(id: String): Try[Option[Long]] = withDb { db =>
val params = Map(Constants.ModelId -> id)
/**
* Gets the maximum version for a model.
*
* @param modelId The id of the model.
* @return The max version or non if the model does not have any operations.
*/
def getMaxOperationVersion(modelId: String): Try[Option[Long]] = withDb { db =>
val params = Map(Constants.ModelId -> modelId)
OrientDBUtil
.findDocument(db, GetMaxVersionQuery, params)
.map(_.flatMap(doc => Option(doc.getProperty("max"))))
}

private[this] val GetVersionAtOrBeforeTime = "SELECT max(version) as max FROM ModelOperation WHERE model.id = :modelId AND timestamp <= :time"
private[this] val GetMaxVersionQuery = "SELECT max(version) as max FROM ModelOperation WHERE model.id = :modelId"

def getVersionAtOrBeforeTime(id: String, time: Instant): Try[Option[Long]] = withDb { db =>
val params = Map(Constants.ModelId -> id, "time" -> new java.util.Date(time.toEpochMilli))
/**
* Gets the version the model was at, at a specific time.
*
* @param modelId The id of the model to get the operations for.
* @param time The time to get the model version for
* @return The version the model was at, at the given time.
*/
def getVersionAtOrBeforeTime(modelId: String, time: Instant): Try[Option[Long]] = withDb { db =>
val params = Map(Constants.ModelId -> modelId, "time" -> new java.util.Date(time.toEpochMilli))
OrientDBUtil
.findDocument(db, GetVersionAtOrBeforeTime, params)
.map(_.flatMap(doc => Option(doc.getProperty("max"))))
.findDocument(db, GetModelVersionAtTimeQuery, params)
.map(_.flatMap(doc => Option(doc.getProperty("version").asInstanceOf[Long])))
}

private[this] val GetModelOperationQuery = "SELECT FROM ModelOperation WHERE model.id = :modelId AND version = :version"
private[this] val GetModelVersionAtTimeQuery =
s"""SELECT
| ${Fields.Version}
|FROM
| ModelOperation
|WHERE
| model.id = :modelId AND
| ${Fields.Timestamp} <= :time
|ORDER BY ${Fields.Version} DESC
|LIMIT 1""".stripMargin


def getModelOperation(id: String, version: Long): Try[Option[ModelOperation]] = withDb { db =>
val params = Map(Constants.ModelId -> id, "version" -> version)
/**
* Gets a specific operation for a model.
*
* @param modelId The id of the model.
* @param version The version of the operation to get.
* @return Some operation if the model has an operation with that version or None otherwise.
*/
def getModelOperation(modelId: String, version: Long): Try[Option[ModelOperation]] = withDb { db =>
val params = Map(Constants.ModelId -> modelId, "version" -> version)
OrientDBUtil
.findDocument(db, GetModelOperationQuery, params)
.map(_.map(docToModelOperation))
}

private[this] val GetMaxOperationForSessionAfterVersionQuery =
"""SELECT
| version
|FROM ModelOperation
|WHERE
| model.id = :modelId AND
| version >= :version AND
| session.id = :sessionId
|ORDER BY version DESC LIMIT 1""".stripMargin
private[this] val GetModelOperationQuery = "SELECT FROM ModelOperation WHERE model.id = :modelId AND version = :version"


def getMaxOperationForSessionAfterVersion(id: String, sessionId: String, version: Long): Try[Option[Long]] = withDb { db =>
val params = Map(Constants.ModelId -> id, Fields.Version -> version, "sessionId" -> sessionId)
Expand All @@ -76,31 +97,42 @@ class ModelOperationStore private[domain](dbProvider: DatabaseProvider)
}
}

private[this] val GetOperationsAfterVersionQuery =
"""SELECT *
private[this] val GetMaxOperationForSessionAfterVersionQuery =
"""SELECT
| version
|FROM ModelOperation
|WHERE
| model.id = :modelId AND
| version >= :version
|ORDER BY version ASC""".stripMargin
| version >= :version AND
| session.id = :sessionId
|ORDER BY version DESC LIMIT 1""".stripMargin


def getOperationsAfterVersion(id: String, version: Long, limit: Option[Long] = None): Try[List[ModelOperation]] = withDb { db =>
/**
* Get all operations after a version for a specific model.
*
* @param modelId The id of the model.
* @param version The version to get operations after.
* @param limit The maximum number of operations to return.
* @return The requested list of operations.
*/
def getOperationsAfterVersion(modelId: String, version: Long, limit: Option[Long] = None): Try[List[ModelOperation]] = withDb { db =>
val query = OrientDBUtil.buildPagedQuery(GetOperationsAfterVersionQuery, QueryLimit(limit), QueryOffset())
val params = Map(Constants.ModelId -> id, Fields.Version -> version)
val params = Map(Constants.ModelId -> modelId, Fields.Version -> version)
OrientDBUtil
.query(db, query, params)
.map(_.map(docToModelOperation))
}

private[this] val GetOperationsInVersionRangeQuery =
private[this] val GetOperationsAfterVersionQuery =
"""SELECT *
|FROM ModelOperation
|WHERE
| model.id = :modelId AND
| version >= :firstVersion AND
| version <= :lastVersion
| version >= :version
|ORDER BY version ASC""".stripMargin


/**
* Gets operations in an inclusive version range.
*
Expand All @@ -116,25 +148,36 @@ class ModelOperationStore private[domain](dbProvider: DatabaseProvider)
.map(_.map(docToModelOperation))
}

private[this] val DeleteAllOperationsForModelCommand = "DELETE FROM ModelOperation WHERE model.id = :modelId"
private[this] val GetOperationsInVersionRangeQuery =
"""SELECT *
|FROM ModelOperation
|WHERE
| model.id = :modelId AND
| version >= :firstVersion AND
| version <= :lastVersion
|ORDER BY version ASC""".stripMargin

/**
* Deletes all the operations for a given model.
*
* @param modelId The id of the model to delete all operations for.
* @param db The optional database instance to use.
* @return Success if the operation succeeds; a Failure otherwise.
*/
def deleteAllOperationsForModel(modelId: String, db: Option[ODatabaseDocument] = None): Try[Unit] = withDb(db) { db =>
val params = Map(Constants.ModelId -> modelId)
OrientDBUtil.commandReturningCount(db, DeleteAllOperationsForModelCommand, params).map(_ => ())
}

private[this] val CreateModelOperationCommand =
"""
|INSERT INTO
| ModelOperation
|SET
| model = (SELECT FROM Model WHERE id = :modelId),
| version = :version,
| timestamp = :timestamp,
| session = (SELECT FROM DomainSession WHERE id = :sessionId),
| operation = :operation
|""".stripMargin
private[this] val DeleteAllOperationsForModelCommand = "DELETE FROM ModelOperation WHERE model.id = :modelId"

/**
* Creates and stores a new model operations
*
* @param modelOperation The operation to store.
* @param db The optional database instance to use.
* @return Success if the operation succeeds; a Failure otherwise.
*/
def createModelOperation(modelOperation: NewModelOperation, db: Option[ODatabaseDocument] = None): Try[Unit] = withDb(db) { db =>
val opDoc = OrientDBOperationMapper.operationToODocument(modelOperation.op)
opDoc.save()
Expand All @@ -149,6 +192,18 @@ class ModelOperationStore private[domain](dbProvider: DatabaseProvider)

OrientDBUtil.command(db, CreateModelOperationCommand, params).map(_ => ())
}

private[this] val CreateModelOperationCommand =
"""
|INSERT INTO
| ModelOperation
|SET
| model = (SELECT FROM Model WHERE id = :modelId),
| version = :version,
| timestamp = :timestamp,
| session = (SELECT FROM DomainSession WHERE id = :sessionId),
| operation = :operation
|""".stripMargin
}

object ModelOperationStore {
Expand Down
Expand Up @@ -15,12 +15,12 @@ import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
import akka.actor.typed.{ActorRef, Behavior}
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import com.convergencelabs.convergence.server.backend.datastore.EntityNotFoundException
import com.convergencelabs.convergence.server.backend.services.domain.identity.IdentityServiceActor.ReceiveTimeout
import com.convergencelabs.convergence.server.backend.services.domain.{DomainPersistenceManager, BaseDomainShardedActor}
import com.convergencelabs.convergence.server.backend.services.domain.{BaseDomainShardedActor, DomainPersistenceManager}
import com.convergencelabs.convergence.server.model.DomainId
import com.convergencelabs.convergence.server.util.serialization.akka.CborSerializable
import com.fasterxml.jackson.annotation.{JsonSubTypes, JsonTypeInfo}

import java.time.Instant
import scala.concurrent.duration.FiniteDuration

final class ModelOperationServiceActor private(domainId: DomainId,
Expand All @@ -36,13 +36,15 @@ final class ModelOperationServiceActor private(domainId: DomainId,
override def receiveInitialized(msg: Message): Behavior[Message] = {
msg match {
case msg: GetOperationsRequest =>
handleGetOperations(msg)
onGetOperations(msg)
case msg: GetVersionAtTimeRequest =>
onGetVersionAtTimeRequest(msg)
case ReceiveTimeout(_) =>
this.passivate()
}
}

def handleGetOperations(msg: GetOperationsRequest): Behavior[Message] = {
def onGetOperations(msg: GetOperationsRequest): Behavior[Message] = {
val GetOperationsRequest(this.domainId, modelId, first, last, replyTo) = msg
this.persistenceProvider.modelOperationStore.getOperationsInVersionRange(modelId, first, last)
.map(ops => Right(ops))
Expand All @@ -58,6 +60,27 @@ final class ModelOperationServiceActor private(domainId: DomainId,
Behaviors.same
}

def onGetVersionAtTimeRequest(msg: GetVersionAtTimeRequest): Behavior[Message] = {
val GetVersionAtTimeRequest(this.domainId, modelId, time, replyTo) = msg
this.persistenceProvider.modelOperationStore.getVersionAtOrBeforeTime(modelId, time)
.map {
case Some(version) =>
Right(Version(version))
case None =>
Left(InvalidModelTime("The requested model did not exist at the specified time"))
}
.recover {
case _: EntityNotFoundException =>
Left(ModelNotFoundError())
case cause =>
context.log.error("Unexpected error getting model operations", cause)
Left(UnknownError())
}
.foreach(replyTo ! GetVersionAtTimeResponse(_))

Behaviors.same
}

override protected def getDomainId(msg: Message): DomainId = msg.domainId

override protected def getReceiveTimeoutMessage(): Message = ReceiveTimeout(this.domainId)
Expand Down Expand Up @@ -111,13 +134,37 @@ object ModelOperationServiceActor {

final case class GetOperationsResponse(operations: Either[GetOperationsError, List[ModelOperation]]) extends CborSerializable

//
// GetVersionAtTime
//
final case class GetVersionAtTimeRequest(domainId: DomainId,
modelId: String,
time: Instant,
replyTo: ActorRef[GetVersionAtTimeResponse]) extends Message

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(Array(
new JsonSubTypes.Type(value = classOf[ModelNotFoundError], name = "model_not_found"),
new JsonSubTypes.Type(value = classOf[ModelNotFoundError], name = "invalid_version"),
new JsonSubTypes.Type(value = classOf[UnknownError], name = "unknown")
))
sealed trait GetVersionAtTimeError

final case class InvalidModelTime(message: String) extends GetVersionAtTimeError

final case class Version(version: Long)

final case class GetVersionAtTimeResponse(version: Either[GetVersionAtTimeError, Version]) extends CborSerializable

//
// Common Errors
//
final case class ModelNotFoundError() extends AnyRef
with GetOperationsError
with GetVersionAtTimeError

final case class UnknownError() extends AnyRef
with GetOperationsError
with GetVersionAtTimeError

}

0 comments on commit e0d2c8f

Please sign in to comment.