From f4d69094dea223d99d7747c7c92b67cb6b6514dd Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Thu, 1 Nov 2018 19:01:09 +0100 Subject: [PATCH 01/14] Did all link-related API changes. Introduced the /assignInstance endpoint. Extended /matchingInstance and /matchingResult with parameter 'Id' to pass the id of the calling instance. Changed specification, server and request handler accordingly. No db changes yet. --- OpenAPISpecification.yaml | 49 ++++++++++- .../instanceregistry/RequestHandler.scala | 86 ++++++++++++++++--- .../instanceregistry/connection/Server.scala | 52 +++++++++-- 3 files changed, 164 insertions(+), 23 deletions(-) diff --git a/OpenAPISpecification.yaml b/OpenAPISpecification.yaml index bdf268d..4832fa8 100644 --- a/OpenAPISpecification.yaml +++ b/OpenAPISpecification.yaml @@ -92,6 +92,12 @@ paths: on the server. operationId: matchingInstance parameters: + - in: query + name: Id + description: Id of the instance that is requesting a dependency + required: true + type: integer + format: int64 - name: ComponentType in: query description: Type of the instance to be retrieved @@ -185,7 +191,13 @@ paths: operationId: matchInstance parameters: - in: query - name: Id + name: callerId + description: The ID of the instance that is calling this endpoint + required: true + type: integer + format: int64 + - in: query + name: matchedInstanceId description: The ID of the instance that the sender was matched to. required: true type: integer @@ -208,7 +220,8 @@ paths: - Basic Operations summary: Gets the list of events associated to the specified instance description: >- - This command retrieves a list of events that are associated to the instance with the specified id. + This command retrieves a list of events that are associated to the + instance with the specified id. operationId: eventList parameters: - name: Id @@ -495,6 +508,38 @@ paths: description: ID not found on server '500': description: Internal server error + /assignInstance: + post: + tags: + - Docker Operations + summary: Assignes a new dependency to the specified instance + description: >- + This command assignes a new dependency to the instance with the specified id. Internally, this will + stop the instance, assign the new dependency and start the instance again. This is why this is only + applicable to docker instances. + operationId: assignInstance + parameters: + - in: query + name: Id + description: The ID of the instance whichs dependency should be updated + required: true + type: integer + format: int64 + - in: query + name: assignedInstanceId + description: The ID of the instance that should be assigned as dependency + required: true + type: integer + format: int64 + responses: + '202' : + description: 'Accepted, the operation will be completed in the future.' + '400' : + description: 'Bad request, the instance with the specified ID is no running inside a docker container or the assigned instance is of the wrong component type.' + '404': + description: 'One of the ids was not found on the server' + '500': + description: Internal server error definitions: Event: type: object diff --git a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandler.scala b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandler.scala index 6173dc1..2e88ddf 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandler.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandler.scala @@ -105,6 +105,7 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection) } def getMatchingInstanceOfType(compType: ComponentType): Try[Instance] = { + //TODO: Check for links in state 'Assigned' log.info(s"Trying to match to instance of type $compType ...") getNumberOfInstances(compType) match { case 0 => @@ -154,23 +155,70 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection) } } } + } + def handleInstanceLinkCreated(instanceIdFrom: Long, instanceIdTo: Long): OperationResult.Value = { + if(!instanceDao.hasInstance(instanceIdFrom) || !instanceDao.hasInstance(instanceIdTo)){ + OperationResult.IdUnknown + } else { + //TODO: Verify there is no link present that is not failed! + //TODO: Insert Link into database, state should be 'Assigned' + OperationResult.Ok + } } - def handleMatchingResult(id: Long, result: Boolean): OperationResult.Value = { - if (!instanceDao.hasInstance(id)) { + def handleInstanceAssignment(instanceId: Long, newDependencyId: Long): OperationResult.Value = { + if(!instanceDao.hasInstance(instanceId) || !instanceDao.hasInstance(newDependencyId)){ OperationResult.IdUnknown + } else if(!isInstanceDockerContainer(instanceId)){ + OperationResult.NoDockerContainer } else { - val instance = instanceDao.getInstance(id).get - instanceDao.addMatchingResult(id, result) - if (result && instance.instanceState == InstanceState.NotReachable) { - instanceDao.setStateFor(instance.id.get, InstanceState.Running) - fireStateChangedEvent(instanceDao.getInstance(id).get) - } else if (!result && instance.instanceState == InstanceState.Running) { - instanceDao.setStateFor(instance.id.get, InstanceState.NotReachable) - fireStateChangedEvent(instanceDao.getInstance(id).get) + val instance = instanceDao.getInstance(instanceId).get + val dependency = instanceDao.getInstance(instanceId).get + + if(assignmentAllowed(instance.componentType) && compatibleTypes(instance.componentType, dependency.componentType)){ + //TODO: Update database with assignment. Verify there is only one link present, remove it and replace it with new one + implicit val timeout : Timeout = Timeout(10 seconds) + + (dockerActor ? restart(instance.dockerId.get)).map{ + _ => log.info(s"Instance $instanceId restarted.") + instanceDao.setStateFor(instance.id.get, InstanceState.Stopped) //Set to stopped, will report start automatically + fireStateChangedEvent(instanceDao.getInstance(instanceId).get) + }.recover { + case ex: Exception => + log.warning(s"Failed to restart container with id $instanceId. Message is: ${ex.getMessage}") + fireDockerOperationErrorEvent(Some(instance), s"Pause failed with message: ${ex.getMessage}") + } + + OperationResult.Ok + } else { + OperationResult.InvalidTypeForOperation + } + + + } + } + + def handleMatchingResult(callerId: Long, matchedInstanceId: Long, result: Boolean): OperationResult.Value = { + if (!instanceDao.hasInstance(callerId) || !instanceDao.hasInstance(matchedInstanceId)) { + OperationResult.IdUnknown + } else { + val callingInstance = instanceDao.getInstance(callerId).get + val matchedInstance = instanceDao.getInstance(matchedInstanceId).get + + //Update list of matching results + instanceDao.addMatchingResult(matchedInstanceId, result) + //Update state of matchedInstance accordingly + if (result && matchedInstance.instanceState == InstanceState.NotReachable) { + instanceDao.setStateFor(matchedInstanceId, InstanceState.Running) + fireStateChangedEvent(instanceDao.getInstance(matchedInstanceId).get) //Re-retrieve instance bc reference was invalidated by 'setStateFor' + } else if (!result && matchedInstance.instanceState == InstanceState.Running) { + instanceDao.setStateFor(matchedInstanceId, InstanceState.NotReachable) + fireStateChangedEvent(instanceDao.getInstance(matchedInstanceId).get)//Re-retrieve instance bc reference was invalidated by 'setStateFor' } - log.info(s"Applied matching result $result to instance with id $id.") + log.info(s"Applied matching result $result to instance with id $matchedInstanceId.") + + //TODO: Handle Link state, set it to failed if needed. OperationResult.Ok } } @@ -565,11 +613,25 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection) } } + private def assignmentAllowed(instanceType: ComponentType) : Boolean = { + instanceType == ComponentType.Crawler || instanceType == ComponentType.WebApi || instanceType == ComponentType.WebApp + } + + private def compatibleTypes(instanceType: ComponentType, dependencyType: ComponentType) : Boolean = { + instanceType match { + case ComponentType.Crawler => dependencyType == ComponentType.ElasticSearch + case ComponentType.WebApi => dependencyType == ComponentType.ElasticSearch + case ComponentType.WebApp => dependencyType == ComponentType.WebApi + case _ => false + } + } + object OperationResult extends Enumeration { val IdUnknown: Value = Value("IdUnknown") val NoDockerContainer: Value = Value("NoDockerContainer") val IsDockerContainer: Value = Value("IsDockerContainer") - val InvalidStateForOperation: Value = Value("InvalidState") + val InvalidStateForOperation: Value = Value("InvalidStateForOperation") + val InvalidTypeForOperation: Value = Value("InvalidTypeForOperation") val Ok: Value = Value("Ok") val InternalError: Value = Value("InternalError") } diff --git a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/connection/Server.scala b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/connection/Server.scala index 292e4d5..47eb984 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/connection/Server.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/connection/Server.scala @@ -148,9 +148,9 @@ object Server extends HttpApp with InstanceJsonSupport with EventJsonSupport wit * be passed as an query argument named 'ComponentType' (so the call is /matchingInstance?ComponentType=Crawler). * @return Server route that either maps to 200 OK response containing the instance, or the resp. error codes. */ - def matchingInstance() : server.Route = parameters('ComponentType.as[String]){ compTypeString => + def matchingInstance() : server.Route = parameters('Id.as[Long], 'ComponentType.as[String]){ (id, compTypeString) => get{ - log.debug(s"GET /matchingInstance?ComponentType=$compTypeString has been called") + log.debug(s"GET /matchingInstance?Id=$id&ComponentType=$compTypeString has been called") val compType : ComponentType = ComponentType.values.find(v => v.toString == compTypeString).orNull log.info(s"Looking for instance of type $compType ...") @@ -158,8 +158,14 @@ object Server extends HttpApp with InstanceJsonSupport with EventJsonSupport wit if(compType != null){ handler.getMatchingInstanceOfType(compType) match { case Success(matchedInstance) => - log.info(s"Matched to $matchedInstance.") - complete(matchedInstance.toJson(instanceFormat)) + log.info(s"Matched request from $id to $matchedInstance.") + handler.handleInstanceLinkCreated(id, matchedInstance.id.get) match { + case handler.OperationResult.IdUnknown => + log.warning(s"Could not handle the creation of instance link, the id $id seems to be invalid.") + complete(HttpResponse(StatusCodes.NotFound, entity = s"Could not find instance with id $id.")) + case handler.OperationResult.Ok => + complete(matchedInstance.toJson(instanceFormat)) + } case Failure(x) => log.warning(s"Could not find matching instance for type $compType, message was ${x.getMessage}.") complete(HttpResponse(StatusCodes.NotFound, entity = s"Could not find matching instance for type $compType")) @@ -176,14 +182,14 @@ object Server extends HttpApp with InstanceJsonSupport with EventJsonSupport wit * parameters named 'Id' and 'MatchingSuccessful' (so the call is /matchingResult?Id=42&MatchingSuccessful=True). * @return Server route that either maps to 200 OK or to the respective error codes */ - def matchInstance() : server.Route = parameters('Id.as[Long], 'MatchingSuccessful.as[Boolean]){ (id, matchingResult) => + def matchInstance() : server.Route = parameters('callerId.as[Long], 'matchedInstanceId.as[Long], 'MatchingSuccessful.as[Boolean]){ (callerId, matchedInstanceId, matchingResult) => post { - log.debug(s"POST /matchingResult?Id=$id&MatchingSuccessful=$matchingResult has been called") + log.debug(s"POST /matchingResult?callerId=$callerId&matchedInstanceId=$matchedInstanceId&MatchingSuccessful=$matchingResult has been called") - handler.handleMatchingResult(id, matchingResult) match { + handler.handleMatchingResult(callerId, matchedInstanceId, matchingResult) match { case handler.OperationResult.IdUnknown => - log.warning(s"Cannot apply matching result for id $id, that id was not found.") - complete{HttpResponse(StatusCodes.NotFound, entity = s"Id $id not found.")} + log.warning(s"Cannot apply matching result for id $callerId to id $matchedInstanceId, at least one id could not be found") + complete{HttpResponse(StatusCodes.NotFound, entity = s"One of the ids $callerId and $matchedInstanceId was not found.")} case handler.OperationResult.Ok => complete{s"Matching result $matchingResult processed."} } @@ -437,6 +443,34 @@ object Server extends HttpApp with InstanceJsonSupport with EventJsonSupport wit } } + /** + * Called to assign a new instance dependency to the instance with the specified id. Both the ids of the instance and + * the specified dependency are passed as query arguments named 'Id' and 'assignedInstanceId' resp. (so the resulting + * call is /assignInstance?Id=42&assignedInstanceId=43). Will update the dependency in DB and than restart the container. + * @return Server route that either maps to 202 ACCEPTED or the respective error codes + */ + def assignInstance() : server.Route = parameters('Id.as[Long], 'assignedInstanceId.as[Long]) { (id, assignedInstanceId) => + post { + log.debug(s"POST /assignInstance?Id=$id&assignedInstanceId=$assignedInstanceId has been called") + + handler.handleInstanceAssignment(id, assignedInstanceId) match { + case handler.OperationResult.IdUnknown => + log.warning(s"Cannot assign $assignedInstanceId to $id, one or more ids not found.") + complete{HttpResponse(StatusCodes.NotFound, entity = s"Cannot assign instance, at least one of the ids $id / $assignedInstanceId was not found.")} + case handler.OperationResult.NoDockerContainer => + log.warning(s"Cannot assign $assignedInstanceId to $id, $id is no docker container.") + complete{HttpResponse(StatusCodes.BadRequest,entity = s"Cannot assign instance, $id is no docker container.")} + case handler.OperationResult.InvalidTypeForOperation => + log.warning(s"Cannot assign $assignedInstanceId to $id, incompatible types.") + complete{HttpResponse(StatusCodes.BadRequest,entity = s"Cannot assign $assignedInstanceId to $id, incompatible types.")} + case handler.OperationResult.Ok => + complete{HttpResponse(StatusCodes.Accepted, entity = "Operation accepted.")} + case x => + complete{HttpResponse(StatusCodes.InternalServerError, entity = s"Unexpected operation result $x")} + } + } + } + /** * Creates a WebSocketConnection that streams events that are issued by the registry to all connected clients. * @return Server route that maps to the WebSocketConnection From c597558eec5856b0e12ca7321a165a5968db933e Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Thu, 1 Nov 2018 20:03:25 +0100 Subject: [PATCH 02/14] Implemented links in core model. Implemented methods needed in the dao. Connected them to the RequestHandler. Still need to evaluate links in matching algorithm. --- .../instanceregistry/RequestHandler.scala | 60 +++++++++++------- .../instanceregistry/connection/Server.scala | 2 + .../daos/DynamicInstanceDAO.scala | 63 ++++++++++++++++++- .../instanceregistry/daos/InstanceDAO.scala | 16 ++++- .../swagger/client/model/InstanceLink.scala | 16 +++++ 5 files changed, 130 insertions(+), 27 deletions(-) create mode 100644 src/main/scala/de/upb/cs/swt/delphi/instanceregistry/io/swagger/client/model/InstanceLink.scala diff --git a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandler.scala b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandler.scala index 2e88ddf..e2e34f0 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandler.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandler.scala @@ -9,6 +9,7 @@ import akka.stream.scaladsl.{Keep, Sink, Source} import akka.stream.{ActorMaterializer, Materializer, OverflowStrategy} import de.upb.cs.swt.delphi.instanceregistry.daos.{DynamicInstanceDAO, InstanceDAO} import de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model.InstanceEnums.{ComponentType, InstanceState} +import de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model.LinkEnums.LinkState import de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model._ import scala.concurrent.duration._ @@ -161,9 +162,10 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection) if(!instanceDao.hasInstance(instanceIdFrom) || !instanceDao.hasInstance(instanceIdTo)){ OperationResult.IdUnknown } else { - //TODO: Verify there is no link present that is not failed! - //TODO: Insert Link into database, state should be 'Assigned' - OperationResult.Ok + instanceDao.addLink(InstanceLink(instanceIdFrom, instanceIdTo, LinkState.Assigned)) match { + case Success(_) => OperationResult.Ok + case Failure(_) => OperationResult.InternalError //Should not happen, as ids are being verified above! + } } } @@ -177,20 +179,23 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection) val dependency = instanceDao.getInstance(instanceId).get if(assignmentAllowed(instance.componentType) && compatibleTypes(instance.componentType, dependency.componentType)){ - //TODO: Update database with assignment. Verify there is only one link present, remove it and replace it with new one - implicit val timeout : Timeout = Timeout(10 seconds) - - (dockerActor ? restart(instance.dockerId.get)).map{ - _ => log.info(s"Instance $instanceId restarted.") - instanceDao.setStateFor(instance.id.get, InstanceState.Stopped) //Set to stopped, will report start automatically - fireStateChangedEvent(instanceDao.getInstance(instanceId).get) - }.recover { - case ex: Exception => - log.warning(s"Failed to restart container with id $instanceId. Message is: ${ex.getMessage}") - fireDockerOperationErrorEvent(Some(instance), s"Pause failed with message: ${ex.getMessage}") + if(instanceDao.addLink(InstanceLink(instanceId, newDependencyId, LinkState.Assigned)).isFailure){ + //This should not happen, as ids are being verified above! + OperationResult.InternalError + } else { + implicit val timeout : Timeout = Timeout(10 seconds) + + (dockerActor ? restart(instance.dockerId.get)).map{ + _ => log.info(s"Instance $instanceId restarted.") + instanceDao.setStateFor(instance.id.get, InstanceState.Stopped) //Set to stopped, will report start automatically + fireStateChangedEvent(instanceDao.getInstance(instanceId).get) + }.recover { + case ex: Exception => + log.warning(s"Failed to restart container with id $instanceId. Message is: ${ex.getMessage}") + fireDockerOperationErrorEvent(Some(instance), s"Pause failed with message: ${ex.getMessage}") + } + OperationResult.Ok } - - OperationResult.Ok } else { OperationResult.InvalidTypeForOperation } @@ -199,27 +204,34 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection) } } - def handleMatchingResult(callerId: Long, matchedInstanceId: Long, result: Boolean): OperationResult.Value = { + def handleMatchingResult(callerId: Long, matchedInstanceId: Long, matchingSuccess: Boolean): OperationResult.Value = { if (!instanceDao.hasInstance(callerId) || !instanceDao.hasInstance(matchedInstanceId)) { OperationResult.IdUnknown } else { - val callingInstance = instanceDao.getInstance(callerId).get val matchedInstance = instanceDao.getInstance(matchedInstanceId).get //Update list of matching results - instanceDao.addMatchingResult(matchedInstanceId, result) + instanceDao.addMatchingResult(matchedInstanceId, matchingSuccess) //Update state of matchedInstance accordingly - if (result && matchedInstance.instanceState == InstanceState.NotReachable) { + if (matchingSuccess && matchedInstance.instanceState == InstanceState.NotReachable) { instanceDao.setStateFor(matchedInstanceId, InstanceState.Running) fireStateChangedEvent(instanceDao.getInstance(matchedInstanceId).get) //Re-retrieve instance bc reference was invalidated by 'setStateFor' - } else if (!result && matchedInstance.instanceState == InstanceState.Running) { + } else if (!matchingSuccess && matchedInstance.instanceState == InstanceState.Running) { instanceDao.setStateFor(matchedInstanceId, InstanceState.NotReachable) fireStateChangedEvent(instanceDao.getInstance(matchedInstanceId).get)//Re-retrieve instance bc reference was invalidated by 'setStateFor' } - log.info(s"Applied matching result $result to instance with id $matchedInstanceId.") + log.info(s"Applied matching result $matchingSuccess to instance with id $matchedInstanceId.") + + //Update link state + if(!matchingSuccess){ + instanceDao.updateLink(InstanceLink(callerId, matchedInstanceId, LinkState.Failed)) match { + case Success(_) => OperationResult.Ok + case Failure(_) => OperationResult.InternalError //Should not happen + } + } else { + OperationResult.Ok + } - //TODO: Handle Link state, set it to failed if needed. - OperationResult.Ok } } diff --git a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/connection/Server.scala b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/connection/Server.scala index 47eb984..f46e542 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/connection/Server.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/connection/Server.scala @@ -165,6 +165,8 @@ object Server extends HttpApp with InstanceJsonSupport with EventJsonSupport wit complete(HttpResponse(StatusCodes.NotFound, entity = s"Could not find instance with id $id.")) case handler.OperationResult.Ok => complete(matchedInstance.toJson(instanceFormat)) + case handler.OperationResult.InternalError => + complete{HttpResponse(StatusCodes.InternalServerError, entity = s"An internal error occurred")} } case Failure(x) => log.warning(s"Could not find matching instance for type $compType, message was ${x.getMessage}.") diff --git a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/daos/DynamicInstanceDAO.scala b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/daos/DynamicInstanceDAO.scala index cef9f27..370dc88 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/daos/DynamicInstanceDAO.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/daos/DynamicInstanceDAO.scala @@ -5,8 +5,9 @@ import java.io.{File, IOException, PrintWriter} import akka.actor.ActorSystem import akka.stream.ActorMaterializer import de.upb.cs.swt.delphi.instanceregistry.{AppLogging, Configuration, Registry} -import de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model.{Instance, InstanceJsonSupport, RegistryEvent} +import de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model.{Instance, InstanceJsonSupport, InstanceLink, RegistryEvent} import de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model.InstanceEnums.{ComponentType, InstanceState} +import de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model.LinkEnums.LinkState import scala.collection.mutable import scala.concurrent.ExecutionContext @@ -24,6 +25,7 @@ class DynamicInstanceDAO (configuration : Configuration) extends InstanceDAO wit private val instances : mutable.Set[Instance] = new mutable.HashSet[Instance]() private val instanceMatchingResults : mutable.Map[Long, mutable.MutableList[Boolean]] = new mutable.HashMap[Long,mutable.MutableList[Boolean]]() private val instanceEvents : mutable.Map[Long, mutable.MutableList[RegistryEvent]] = new mutable.HashMap[Long, mutable.MutableList[RegistryEvent]]() + private val instanceLinks: mutable.Set[InstanceLink] = new mutable.HashSet[InstanceLink]() implicit val system : ActorSystem = Registry.system implicit val materializer : ActorMaterializer = ActorMaterializer() @@ -95,6 +97,7 @@ class DynamicInstanceDAO (configuration : Configuration) extends InstanceDAO wit instances.clear() instanceMatchingResults.clear() instanceEvents.clear() + instanceLinks.clear() dumpToRecoveryFile() } @@ -177,10 +180,67 @@ class DynamicInstanceDAO (configuration : Configuration) extends InstanceDAO wit } } + override def addLink(link: InstanceLink) : Try[Unit] = { + if(hasInstance(link.idFrom) && hasInstance(link.idTo)){ + + if(getLinksFrom(link.idFrom).exists(l => l.idTo == link.idTo)){ + //There already is a link between the two instances. Update it instead of adding a new one + updateLink(link) + } else { + //If new link is in state 'Assigned': Set any link that previously was assigned to 'outdated' + //IMPORTANT: Only works bc every component has exactly one dependency! + if(link.linkState == LinkState.Assigned){ + for (prevLink <- getLinksFrom(link.idFrom, Some(LinkState.Assigned))){ + updateLink(InstanceLink(prevLink.idFrom, prevLink.idTo, LinkState.Outdated)) + } + } + instanceLinks.add(link) + } + Success() + } else { + Failure(new RuntimeException("Cannot add link, ids not known.")) + } + } + + override def updateLink(link: InstanceLink) : Try[Unit] = { + val linksMatching = instanceLinks.filter(l => l.idFrom == link.idFrom && l.idTo == link.idTo) + + if(linksMatching.nonEmpty){ + for(l <- linksMatching){ + instanceLinks.remove(l) + instanceLinks.add(link) + } + Success() + } else { + Failure(new RuntimeException(s"Cannot update link $link, this link is not present in the dao.")) + } + } + + def getLinksFrom(id: Long, state: Option[LinkState] = None) : List[InstanceLink] = { + val links = instanceLinks.filter(link => link.idFrom == id) + + if(state.isDefined){ + List() ++ links.filter(link => link.linkState == state.get) + } else { + List() ++ links + } + } + + def getLinksTo(id:Long, state: Option[LinkState] = None) : List[InstanceLink] = { + val links = instanceLinks.filter(link => link.idTo == id) + + if(state.isDefined){ + List() ++ links.filter(link => link.linkState == state.get) + } else { + List() ++ links + } + } + private[daos] def clearData() : Unit = { instances.clear() instanceMatchingResults.clear() instanceEvents.clear() + instanceLinks.clear() } private[daos] def dumpToRecoveryFile() : Unit = { @@ -232,5 +292,4 @@ class DynamicInstanceDAO (configuration : Configuration) extends InstanceDAO wit } } - } diff --git a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/daos/InstanceDAO.scala b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/daos/InstanceDAO.scala index 8db4dff..5038b05 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/daos/InstanceDAO.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/daos/InstanceDAO.scala @@ -1,6 +1,6 @@ package de.upb.cs.swt.delphi.instanceregistry.daos -import de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model.{Instance, RegistryEvent} +import de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model.{Instance, InstanceLink, RegistryEvent} import de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model.InstanceEnums.{ComponentType, InstanceState} import scala.util.Try @@ -108,4 +108,18 @@ trait InstanceDAO { */ def getEventsFor(id: Long) : Try[List[RegistryEvent]] + /** + * Adds a new instance link to the dao. Will fail if the ids referenced in the link object are not present. + * @param link Link to add + * @return Success if both ids are present, Failure otherwise + */ + def addLink(link: InstanceLink) : Try[Unit] + + /** + * Update the link between the two instances specified by the parameter. + * @param link Link to update + * @return Success if link is present, Failure otherwise + */ + def updateLink(link: InstanceLink) : Try[Unit] + } \ No newline at end of file diff --git a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/io/swagger/client/model/InstanceLink.scala b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/io/swagger/client/model/InstanceLink.scala new file mode 100644 index 0000000..d7aaa8a --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/io/swagger/client/model/InstanceLink.scala @@ -0,0 +1,16 @@ +package de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model + +import LinkEnums.LinkState + +final case class InstanceLink(idFrom: Long, idTo:Long, linkState: LinkState) + +object LinkEnums { + type LinkState = LinkState.Value + + object LinkState extends Enumeration { + val Assigned: Value = Value("Assigned") + val Failed: Value = Value("Failed") + val Outdated: Value = Value("Outdated") + } +} + From 917fe4e40a0ebfbb7bca965349ec620b11f0acbc Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Fri, 2 Nov 2018 12:26:03 +0100 Subject: [PATCH 03/14] Integrated link-based matching. Still need to implement fallback algorithm and update the tests. --- .../instanceregistry/RequestHandler.scala | 90 +++++++++++++++++-- .../instanceregistry/connection/Server.scala | 7 +- .../daos/DynamicInstanceDAO.scala | 2 +- .../instanceregistry/daos/InstanceDAO.scala | 8 ++ 4 files changed, 98 insertions(+), 9 deletions(-) diff --git a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandler.scala b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandler.scala index e2e34f0..4601d24 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandler.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandler.scala @@ -105,8 +105,22 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection) instanceDao.getEventsFor(id) } - def getMatchingInstanceOfType(compType: ComponentType): Try[Instance] = { - //TODO: Check for links in state 'Assigned' + def getMatchingInstanceOfType(callerId: Long, compType: ComponentType): Try[Instance] = { + log.info(s"Started matching: Instance with id $callerId is looking for instance of type $compType.") + if(!instanceDao.hasInstance(callerId)){ + log.warning(s"Matching failed: No instance with id $callerId was found.") + Failure(new RuntimeException(s"Id $callerId not present.")) + } else { + tryLinkMatching(callerId, compType) match { + case Success(instance) => + log.info(s"Matching finished: First try yielded result $instance.") + Success(instance) + case Failure(ex) => + log.warning(s"Matching pending: First try failed, message was ${ex.getMessage}") + Failure(ex) //TODO: Integrate below code here + } + } + /* log.info(s"Trying to match to instance of type $compType ...") getNumberOfInstances(compType) match { case 0 => @@ -155,16 +169,21 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection) Success(instanceToMatch) } } - } + }*/ } def handleInstanceLinkCreated(instanceIdFrom: Long, instanceIdTo: Long): OperationResult.Value = { if(!instanceDao.hasInstance(instanceIdFrom) || !instanceDao.hasInstance(instanceIdTo)){ OperationResult.IdUnknown } else { - instanceDao.addLink(InstanceLink(instanceIdFrom, instanceIdTo, LinkState.Assigned)) match { - case Success(_) => OperationResult.Ok - case Failure(_) => OperationResult.InternalError //Should not happen, as ids are being verified above! + val (instanceFrom, instanceTo) = (instanceDao.getInstance(instanceIdFrom).get, instanceDao.getInstance(instanceIdTo).get) + if(compatibleTypes(instanceFrom.componentType, instanceTo.componentType)){ + instanceDao.addLink(InstanceLink(instanceIdFrom, instanceIdTo, LinkState.Assigned)) match { + case Success(_) => OperationResult.Ok + case Failure(_) => OperationResult.InternalError //Should not happen, as ids are being verified above! + } + } else { + OperationResult.InvalidTypeForOperation } } } @@ -548,6 +567,65 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection) } } + /** + * Tries to match caller to specified component type based on links stored in the dao. If one link is present, it will + * be selected regardless of its state. If multiple links are present, the assigned link will be returned. If none of + * the links is assigned, matching will fail. If the component types stored in the links do not match the required + * component type, matching will fail. + * @param callerId Id of the calling instance + * @param componentType ComponentType to look for + * @return Try[Instance], Success if matching was successful, Failure otherwise + */ + private def tryLinkMatching(callerId: Long, componentType: ComponentType) : Try[Instance] = { + log.info(s"Matching first try: Analyzing links for $callerId...") + + val links = instanceDao.getLinksFrom(callerId) + + links.size match { + case 0 => + log.info(s"Matching first try failed: No links present.") + Failure(new RuntimeException("No links for instance.")) + case 1 => + val instanceAssigned = instanceDao.getInstance(links.head.idTo) + + if(instanceAssigned.isDefined && instanceAssigned.get.componentType == componentType){ + log.info(s"Finished matching first try: Successfully matched based on 1 link found. Target is ${instanceAssigned.get}.") + Success(instanceAssigned.get) + } else if(instanceAssigned.isDefined && instanceAssigned.get.componentType != componentType){ + log.error(s"Matching first try failed: There was one link present, but the target type ${instanceAssigned.get.componentType} did not match expected type $componentType") + instanceDao.updateLink(InstanceLink(links.head.idFrom, links.head.idTo, LinkState.Outdated)) + Failure(new RuntimeException("Invalid target type.")) + } else { + log.error(s"Matching first try failed: There was one link present, but the target id ${links.head.idTo} was not found.") + instanceDao.updateLink(InstanceLink(links.head.idFrom, links.head.idTo, LinkState.Outdated)) + Failure(new RuntimeException("Invalid link for instance.")) + } + case x => + //Multiple links. Try to match to the one assigned link + links.find(link => link.linkState == LinkState.Assigned) match { + case Some(instanceLink) => + val instanceAssigned = instanceDao.getInstance(instanceLink.idTo) + + if(instanceAssigned.isDefined && instanceAssigned.get.componentType == componentType){ + log.info(s"Finished matching first try: Successfully matched based on one assigned link found out of $x total links. Target is ${instanceAssigned.get}.") + Success(instanceAssigned.get) + } else if(instanceAssigned.isDefined && instanceAssigned.get.componentType != componentType){ + log.error(s"Matching first try failed: There was one assigned link present, but the target type ${instanceAssigned.get.componentType} did not match expected type $componentType") + instanceDao.updateLink(InstanceLink(instanceLink.idFrom, instanceLink.idTo, LinkState.Outdated)) + Failure(new RuntimeException("Invalid target type.")) + } else { + log.error(s"Matching first try failed: There was one assigned link present, but the target id ${instanceLink.idTo} was not found.") + instanceDao.updateLink(InstanceLink(instanceLink.idFrom, instanceLink.idTo, LinkState.Outdated)) + Failure(new RuntimeException("Invalid link for instance.")) + } + case None => + log.error(s"Matching first try failed: There were multiple links present, but none of them was assigned.") + Failure(new RuntimeException("No links assigned.")) + + } + } + } + def isInstanceIdPresent(id: Long): Boolean = { instanceDao.hasInstance(id) diff --git a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/connection/Server.scala b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/connection/Server.scala index f46e542..8d0bedc 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/connection/Server.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/connection/Server.scala @@ -156,13 +156,16 @@ object Server extends HttpApp with InstanceJsonSupport with EventJsonSupport wit log.info(s"Looking for instance of type $compType ...") if(compType != null){ - handler.getMatchingInstanceOfType(compType) match { + handler.getMatchingInstanceOfType(id, compType) match { case Success(matchedInstance) => log.info(s"Matched request from $id to $matchedInstance.") handler.handleInstanceLinkCreated(id, matchedInstance.id.get) match { case handler.OperationResult.IdUnknown => log.warning(s"Could not handle the creation of instance link, the id $id seems to be invalid.") complete(HttpResponse(StatusCodes.NotFound, entity = s"Could not find instance with id $id.")) + case handler.OperationResult.InvalidTypeForOperation => + log.warning(s"Could not handle the creation of instance link, incompatible types found.") + complete{HttpResponse(StatusCodes.BadRequest, entity = s"Invalid dependency type $compType")} case handler.OperationResult.Ok => complete(matchedInstance.toJson(instanceFormat)) case handler.OperationResult.InternalError => @@ -170,7 +173,7 @@ object Server extends HttpApp with InstanceJsonSupport with EventJsonSupport wit } case Failure(x) => log.warning(s"Could not find matching instance for type $compType, message was ${x.getMessage}.") - complete(HttpResponse(StatusCodes.NotFound, entity = s"Could not find matching instance for type $compType")) + complete(HttpResponse(StatusCodes.NotFound, entity = s"Could not find matching instance of type $compType for instance with id $id.")) } } else { log.error(s"Failed to deserialize parameter string $compTypeString to ComponentType.") diff --git a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/daos/DynamicInstanceDAO.scala b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/daos/DynamicInstanceDAO.scala index 370dc88..4c60b55 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/daos/DynamicInstanceDAO.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/daos/DynamicInstanceDAO.scala @@ -216,7 +216,7 @@ class DynamicInstanceDAO (configuration : Configuration) extends InstanceDAO wit } } - def getLinksFrom(id: Long, state: Option[LinkState] = None) : List[InstanceLink] = { + override def getLinksFrom(id: Long, state: Option[LinkState] = None) : List[InstanceLink] = { val links = instanceLinks.filter(link => link.idFrom == id) if(state.isDefined){ diff --git a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/daos/InstanceDAO.scala b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/daos/InstanceDAO.scala index 5038b05..1e633d0 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/daos/InstanceDAO.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/daos/InstanceDAO.scala @@ -2,6 +2,7 @@ package de.upb.cs.swt.delphi.instanceregistry.daos import de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model.{Instance, InstanceLink, RegistryEvent} import de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model.InstanceEnums.{ComponentType, InstanceState} +import de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model.LinkEnums.LinkState import scala.util.Try @@ -122,4 +123,11 @@ trait InstanceDAO { */ def updateLink(link: InstanceLink) : Try[Unit] + /** + * Get all outgoing links from the specified instance. Optionally a LinkState can be specified as filter + * @param idFrom Id of the instance + * @param state Option[LinkState] to filter for certain LinkStates. If None, no filter will be applied. + * @return List of matching InstanceLinks + */ + def getLinksFrom(idFrom: Long, state: Option[LinkState] = None) : List[InstanceLink] } \ No newline at end of file From 634d814306453ea15e4634ebae4f35480d7fd45d Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Fri, 2 Nov 2018 16:10:12 +0100 Subject: [PATCH 04/14] Matching algorithm now correctly uses fallback-solution if link-based matching does not yield result. Stop is now able to call /stop on instances that are not running inside a docker container. --- OpenAPISpecification.yaml | 11 +- .../instanceregistry/RequestHandler.scala | 127 +++++++++++------- .../instanceregistry/connection/Server.scala | 6 +- 3 files changed, 84 insertions(+), 60 deletions(-) diff --git a/OpenAPISpecification.yaml b/OpenAPISpecification.yaml index 4832fa8..6667d70 100644 --- a/OpenAPISpecification.yaml +++ b/OpenAPISpecification.yaml @@ -430,9 +430,10 @@ paths: - Docker Operations summary: Stops the specified instances' docker container description: >- - This command stops the docker container of the instance with the - specified ID. The instance will be properly shut down by calling its - /stop command first. Will change the instance state to 'Stopped'. + This command stops the specified instance. If the instance is running inside a docker container, + the container will be stopped. If not, the instance will be gracefully shut down by calling its + /stop endpoint. Will change the instance state to 'Stopped' for docker containers, will remove the + instance for non-docker instances. operationId: stop parameters: - in: query @@ -446,8 +447,8 @@ paths: description: 'Accepted, the operation will be completed in the future.' '400': description: >- - Bad request, the instance with the specified ID is either already - stopped or not deployed as a docker container at all. + Bad request, the instance with the specified ID is already + stopped. '404': description: ID not found on server '500': diff --git a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandler.scala b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandler.scala index 4601d24..a3020cd 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandler.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandler.scala @@ -1,12 +1,14 @@ package de.upb.cs.swt.delphi.instanceregistry import akka.actor._ +import akka.http.scaladsl.model.StatusCodes import akka.pattern.ask import akka.util.Timeout import de.upb.cs.swt.delphi.instanceregistry.Docker.DockerActor._ import de.upb.cs.swt.delphi.instanceregistry.Docker.{DockerActor, DockerConnection} import akka.stream.scaladsl.{Keep, Sink, Source} import akka.stream.{ActorMaterializer, Materializer, OverflowStrategy} +import de.upb.cs.swt.delphi.instanceregistry.connection.RestClient import de.upb.cs.swt.delphi.instanceregistry.daos.{DynamicInstanceDAO, InstanceDAO} import de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model.InstanceEnums.{ComponentType, InstanceState} import de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model.LinkEnums.LinkState @@ -117,59 +119,16 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection) Success(instance) case Failure(ex) => log.warning(s"Matching pending: First try failed, message was ${ex.getMessage}") - Failure(ex) //TODO: Integrate below code here + tryDefaultMatching(compType) match { + case Success(instance) => + log.info(s"Matching finished: Default matching yielded result $instance.") + Success(instance) + case Failure(ex2) => + log.warning(s"Matching failed: Default matching did not yield result, message was ${ex2.getMessage}.") + Failure(ex2) + } } } - /* - log.info(s"Trying to match to instance of type $compType ...") - getNumberOfInstances(compType) match { - case 0 => - log.error(s"Cannot match to any instance of type $compType, no such instance present.") - Failure(new RuntimeException(s"Cannot match to any instance of type $compType, no instance present.")) - case 1 => - val instance: Instance = instanceDao.getInstancesOfType(compType).head - log.info(s"Only one instance of that type present, matching to instance with id ${instance.id.get}.") - Success(instance) - case x => - log.info(s"Found $x instances of type $compType.") - - //First try: Match to instance with most consecutive positive matching results - var maxConsecutivePositiveResults = 0 - var instanceToMatch: Instance = null - - for (instance <- instanceDao.getInstancesOfType(compType)) { - if (countConsecutivePositiveMatchingResults(instance.id.get) > maxConsecutivePositiveResults) { - maxConsecutivePositiveResults = countConsecutivePositiveMatchingResults(instance.id.get) - instanceToMatch = instance - } - } - - if (instanceToMatch != null) { - log.info(s"Matching to instance with id ${instanceToMatch.id}, as it has $maxConsecutivePositiveResults positive results in a row.") - Success(instanceToMatch) - } else { - //Second try: Match to instance with most positive matching results - var maxPositiveResults = 0 - - for (instance <- instanceDao.getInstancesOfType(compType)) { - val noOfPositiveResults: Int = instanceDao.getMatchingResultsFor(instance.id.get).get.count(i => i) - if (noOfPositiveResults > maxPositiveResults) { - maxPositiveResults = noOfPositiveResults - instanceToMatch = instance - } - } - - if (instanceToMatch != null) { - log.info(s"Matching to instance with id ${instanceToMatch.id}, as it has $maxPositiveResults positive results.") - Success(instanceToMatch) - } else { - //All instances are equally good (or bad), match to any of them - instanceToMatch = instanceDao.getInstancesOfType(compType).head - log.info(s"Matching to instance with id ${instanceToMatch.id}, no differences between instances have been found.") - Success(instanceToMatch) - } - } - }*/ } def handleInstanceLinkCreated(instanceIdFrom: Long, instanceIdTo: Long): OperationResult.Value = { @@ -467,7 +426,28 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection) if (!instanceDao.hasInstance(id)) { OperationResult.IdUnknown } else if (!isInstanceDockerContainer(id)) { - OperationResult.NoDockerContainer + val instance = instanceDao.getInstance(id).get + + if(instance.componentType == ComponentType.ElasticSearch || instance.componentType == ComponentType.DelphiManagement){ + log.warning(s"Cannot stop instance of type ${instance.componentType}.") + OperationResult.InvalidTypeForOperation + } else { + log.info(s"Calling /stop on non-docker instance $instance..") + RestClient.executePost(RestClient.getUri(instance) + "/stop").map{ + response => + log.info(s"Request to /stop returned $response") + if (response.status == StatusCodes.OK){ + log.info(s"Instance with id $id has been shut down successfully.") + } else { + log.warning(s"Failed to shut down instance with id $id. Status code was: ${response.status}") + } + }.recover{ + case ex: Exception => + log.warning(s"Failed to shut down instance with id $id. Message is: ${ex.getMessage}") + } + handleDeregister(id) + OperationResult.Ok + } } else { log.info(s"Handling /stop for instance with id $id...") @@ -626,6 +606,49 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection) } } + private def tryDefaultMatching(componentType: ComponentType) : Try[Instance] = { + log.info(s"Matching fallback: Searching for instances of type $componentType ...") + getNumberOfInstances(componentType) match { + case 0 => + log.error(s"Matching failed: Cannot match to any instance of type $componentType, no such instance present.") + Failure(new RuntimeException(s"Cannot match to any instance of type $componentType, no instance present.")) + case 1 => + val instance: Instance = instanceDao.getInstancesOfType(componentType).head + log.info(s"Finished fallback matching: Only one instance of that type present, matching to instance with id ${instance.id.get}.") + Success(instance) + case x => + log.info(s"Matching fallback: Found $x instances of type $componentType.") + + instanceDao.getInstancesOfType(componentType).find(instance => instance.instanceState == InstanceState.Running) match { + case Some(instance) => + log.info(s"Finished fallback matching: A running instance of type $componentType was found. Matching to $instance") + Success(instance) + case None => + log.info(s"Matching fallback: Found $x instance of type $componentType, but none of them is running.") + + //Match to instance with maxmum number of consecutive positive matching results + var maxConsecutivePositiveResults = 0 + var instanceToMatch: Instance = null + + for (instance <- instanceDao.getInstancesOfType(componentType)) { + if (countConsecutivePositiveMatchingResults(instance.id.get) > maxConsecutivePositiveResults) { + maxConsecutivePositiveResults = countConsecutivePositiveMatchingResults(instance.id.get) + instanceToMatch = instance + } + } + + if (instanceToMatch != null) { + log.info(s"Finished fallback matching: Matching to instance with id ${instanceToMatch.id}, as it has $maxConsecutivePositiveResults positive results in a row.") + Success(instanceToMatch) + } else { + instanceToMatch = instanceDao.getInstancesOfType(componentType).head + log.info(s"Finished fallback matching: No difference in available instances found, matching to $instanceToMatch") + Success(instanceToMatch) + } + } + } + } + def isInstanceIdPresent(id: Long): Boolean = { instanceDao.hasInstance(id) diff --git a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/connection/Server.scala b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/connection/Server.scala index 8d0bedc..b33e3de 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/connection/Server.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/connection/Server.scala @@ -385,9 +385,9 @@ object Server extends HttpApp with InstanceJsonSupport with EventJsonSupport wit case handler.OperationResult.IdUnknown => log.warning(s"Cannot stop id $id, that id was not found.") complete{HttpResponse(StatusCodes.NotFound, entity = s"Id $id not found.")} - case handler.OperationResult.NoDockerContainer => - log.warning(s"Cannot stop id $id, that instance is not running in a docker container.") - complete{HttpResponse(StatusCodes.BadRequest, entity = s"Id $id is not running in a docker container.")} + case handler.OperationResult.InvalidTypeForOperation => + log.warning(s"Cannot stop id $id, this component type cannot be stopped.") + complete{HttpResponse(StatusCodes.BadRequest, entity = s"Cannot stop instance of this type.")} case handler.OperationResult.Ok => complete{HttpResponse(StatusCodes.Accepted, entity = "Operation accepted.")} case r => From 373058b292ebdf11701e8d2ef0310546a637bdd2 Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Sat, 3 Nov 2018 15:06:35 +0100 Subject: [PATCH 05/14] Implemeted the "Matching-Events". LinkAddedEvent and LinkStateChangedEvent added. Fixed tests not compiling due to api change. --- .../instanceregistry/RequestHandler.scala | 53 ++++++++++++--- .../daos/DynamicInstanceDAO.scala | 1 + .../io/swagger/client/model/Event.scala | 37 ++++++++++- .../swagger/client/model/InstanceLink.scala | 23 +++++++ .../instanceregistry/RequestHandlerTest.scala | 65 +++++++++---------- 5 files changed, 133 insertions(+), 46 deletions(-) diff --git a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandler.scala b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandler.scala index a3020cd..39513b7 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandler.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandler.scala @@ -137,8 +137,12 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection) } else { val (instanceFrom, instanceTo) = (instanceDao.getInstance(instanceIdFrom).get, instanceDao.getInstance(instanceIdTo).get) if(compatibleTypes(instanceFrom.componentType, instanceTo.componentType)){ - instanceDao.addLink(InstanceLink(instanceIdFrom, instanceIdTo, LinkState.Assigned)) match { - case Success(_) => OperationResult.Ok + val link = InstanceLink(instanceIdFrom, instanceIdTo, LinkState.Assigned) + + instanceDao.addLink(link) match { + case Success(_) => + fireLinkAddedEvent(link) + OperationResult.Ok case Failure(_) => OperationResult.InternalError //Should not happen, as ids are being verified above! } } else { @@ -157,10 +161,14 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection) val dependency = instanceDao.getInstance(instanceId).get if(assignmentAllowed(instance.componentType) && compatibleTypes(instance.componentType, dependency.componentType)){ - if(instanceDao.addLink(InstanceLink(instanceId, newDependencyId, LinkState.Assigned)).isFailure){ + val link = InstanceLink(instanceId, newDependencyId, LinkState.Assigned) + if(instanceDao.addLink(link).isFailure){ //This should not happen, as ids are being verified above! OperationResult.InternalError } else { + + fireLinkAddedEvent(link) + implicit val timeout : Timeout = Timeout(10 seconds) (dockerActor ? restart(instance.dockerId.get)).map{ @@ -202,8 +210,11 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection) //Update link state if(!matchingSuccess){ - instanceDao.updateLink(InstanceLink(callerId, matchedInstanceId, LinkState.Failed)) match { - case Success(_) => OperationResult.Ok + val link = InstanceLink(callerId, matchedInstanceId, LinkState.Failed) + instanceDao.updateLink(link) match { + case Success(_) => + fireLinkStateChangedEvent(link) + OperationResult.Ok case Failure(_) => OperationResult.InternalError //Should not happen } } else { @@ -573,11 +584,15 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection) Success(instanceAssigned.get) } else if(instanceAssigned.isDefined && instanceAssigned.get.componentType != componentType){ log.error(s"Matching first try failed: There was one link present, but the target type ${instanceAssigned.get.componentType} did not match expected type $componentType") - instanceDao.updateLink(InstanceLink(links.head.idFrom, links.head.idTo, LinkState.Outdated)) + val link = InstanceLink(links.head.idFrom, links.head.idTo, LinkState.Outdated) + instanceDao.updateLink(link) + fireLinkStateChangedEvent(link) Failure(new RuntimeException("Invalid target type.")) } else { log.error(s"Matching first try failed: There was one link present, but the target id ${links.head.idTo} was not found.") - instanceDao.updateLink(InstanceLink(links.head.idFrom, links.head.idTo, LinkState.Outdated)) + val link = InstanceLink(links.head.idFrom, links.head.idTo, LinkState.Outdated) + instanceDao.updateLink(link) + fireLinkStateChangedEvent(link) Failure(new RuntimeException("Invalid link for instance.")) } case x => @@ -591,11 +606,15 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection) Success(instanceAssigned.get) } else if(instanceAssigned.isDefined && instanceAssigned.get.componentType != componentType){ log.error(s"Matching first try failed: There was one assigned link present, but the target type ${instanceAssigned.get.componentType} did not match expected type $componentType") - instanceDao.updateLink(InstanceLink(instanceLink.idFrom, instanceLink.idTo, LinkState.Outdated)) + val link = InstanceLink(links.head.idFrom, links.head.idTo, LinkState.Outdated) + instanceDao.updateLink(link) + fireLinkStateChangedEvent(link) Failure(new RuntimeException("Invalid target type.")) } else { log.error(s"Matching first try failed: There was one assigned link present, but the target id ${instanceLink.idTo} was not found.") - instanceDao.updateLink(InstanceLink(instanceLink.idFrom, instanceLink.idTo, LinkState.Outdated)) + val link = InstanceLink(links.head.idFrom, links.head.idTo, LinkState.Outdated) + instanceDao.updateLink(link) + fireLinkStateChangedEvent(link) Failure(new RuntimeException("Invalid link for instance.")) } case None => @@ -699,6 +718,22 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection) } } + private def fireLinkAddedEvent(link: InstanceLink): Unit = { + val event = RegistryEventFactory.createLinkAddedEvent(link) + eventActor ! event + + instanceDao.addEventFor(link.idFrom, event) + instanceDao.addEventFor(link.idTo, event) + } + + private def fireLinkStateChangedEvent(link: InstanceLink): Unit = { + val event = RegistryEventFactory.createLinkStateChangedEvent(link) + eventActor ! event + + instanceDao.addEventFor(link.idFrom, event) + instanceDao.addEventFor(link.idTo, event) + } + private def countConsecutivePositiveMatchingResults(id: Long): Int = { if (!instanceDao.hasInstance(id) || instanceDao.getMatchingResultsFor(id).get.isEmpty) { 0 diff --git a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/daos/DynamicInstanceDAO.scala b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/daos/DynamicInstanceDAO.scala index 4c60b55..19c8290 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/daos/DynamicInstanceDAO.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/daos/DynamicInstanceDAO.scala @@ -66,6 +66,7 @@ class DynamicInstanceDAO (configuration : Configuration) extends InstanceDAO wit instances.remove(instances.find(i => i.id.get == id).get) instanceMatchingResults.remove(id) instanceEvents.remove(id) + instanceLinks.retain(link => link.idFrom != id && link.idTo != id) dumpToRecoveryFile() Success(log.info(s"Successfully removed instance with id $id.")) } else { diff --git a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/io/swagger/client/model/Event.scala b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/io/swagger/client/model/Event.scala index 5431154..a69a613 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/io/swagger/client/model/Event.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/io/swagger/client/model/Event.scala @@ -8,7 +8,7 @@ import spray.json.{DefaultJsonProtocol, DeserializationException, JsObject, JsSt /** * Trait defining the implicit JSON formats needed to work with RegistryEvents */ -trait EventJsonSupport extends SprayJsonSupport with DefaultJsonProtocol with InstanceJsonSupport { +trait EventJsonSupport extends SprayJsonSupport with DefaultJsonProtocol with InstanceJsonSupport with InstanceLinkJsonSupport{ //Custom JSON format for an EventType implicit val eventTypeFormat : JsonFormat[EventType] = new JsonFormat[EventType] { @@ -33,6 +33,8 @@ trait EventJsonSupport extends SprayJsonSupport with DefaultJsonProtocol with In case "InstanceRemovedEvent" => EventType.InstanceRemovedEvent case "NumbersChangedEvent" => EventType.NumbersChangedEvent case "DockerOperationErrorEvent" => EventType.DockerOperationErrorEvent + case "LinkAddedEvent" => EventType.LinkStateChangedEvent + case "LinkStateChangedEvent" => EventType.LinkStateChangedEvent case x => throw DeserializationException(s"Unexpected string value $x for event type.") } case y => throw DeserializationException(s"Unexpected type $y during deserialization event type.") @@ -51,6 +53,7 @@ trait EventJsonSupport extends SprayJsonSupport with DefaultJsonProtocol with In case ncp: NumbersChangedPayload => numbersChangedPayloadFormat.write(ncp) case ip: InstancePayload => instancePayloadFormat.write(ip) case doep: DockerOperationErrorPayload => dockerOperationErrorPayloadFormat.write(doep) + case ilp: InstanceLinkPayload => instanceLinkPayloadFormat.write(ilp) case _ => throw new RuntimeException("Unsupported type of payload!") } @@ -67,7 +70,9 @@ trait EventJsonSupport extends SprayJsonSupport with DefaultJsonProtocol with In numbersChangedPayloadFormat.read(jso) } else if(jso.fields.isDefinedAt("errorMessage")) { dockerOperationErrorPayloadFormat.read(jso) - } else { + } else if(jso.fields.isDefinedAt("link")){ + instanceLinkPayloadFormat.read(jso) + } else { throw DeserializationException("Unexpected type for event payload!") } case _ => throw DeserializationException("Unexpected type for event payload!") @@ -88,6 +93,10 @@ trait EventJsonSupport extends SprayJsonSupport with DefaultJsonProtocol with In implicit val dockerOperationErrorPayloadFormat: JsonFormat[DockerOperationErrorPayload] = jsonFormat2(DockerOperationErrorPayload) + //JSON format for an InstanceLinkPayload + implicit val instanceLinkPayloadFormat: JsonFormat[InstanceLinkPayload] = + jsonFormat1(InstanceLinkPayload) + } /** @@ -147,6 +156,21 @@ object RegistryEventFactory { def createDockerOperationErrorEvent(affectedInstance: Option[Instance], message: String) : RegistryEvent = RegistryEvent(EventType.DockerOperationErrorEvent, DockerOperationErrorPayload(affectedInstance, message)) + /** + * Creates a new LinkAddedEvent. Sets EventType and payload accordingly + * @param link Link that was added + * @return RegistryEvent with the respective type and payload + */ + def createLinkAddedEvent(link: InstanceLink) : RegistryEvent = + RegistryEvent(EventType.LinkAddedEvent, InstanceLinkPayload(link)) + + /** + * Creates a new LinkStateChangedEvent. Sets EventType and payload accordingly. + * @param link Link whichs state has been changed + * @return RegistryEvent with the respective type and payload + */ + def createLinkStateChangedEvent(link: InstanceLink) : RegistryEvent = + RegistryEvent(EventType.LinkStateChangedEvent, InstanceLinkPayload(link)) } /** @@ -179,6 +203,13 @@ final case class InstancePayload(instance: Instance) extends RegistryEventPayloa final case class DockerOperationErrorPayload(affectedInstance: Option[Instance], errorMessage: String) extends RegistryEventPayload +/** + * This InstanceLinkPayload is sent with event of type LinkAddedEvent & LinkStateChangedEvent. It contains the respective + * link that was added / changed. + * @param link Link that caused the event + */ +final case class InstanceLinkPayload(link: InstanceLink) extends RegistryEventPayload + /** * Enumerations concerning Events @@ -197,5 +228,7 @@ object EventEnums { val InstanceRemovedEvent: Value = Value("InstanceRemovedEvent") val NumbersChangedEvent: Value = Value("NumbersChangedEvent") val DockerOperationErrorEvent: Value = Value("DockerOperationErrorEvent") + val LinkAddedEvent: Value = Value("LinkAddedEvent") + val LinkStateChangedEvent: Value = Value("LinkStateChangedEvent") } } diff --git a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/io/swagger/client/model/InstanceLink.scala b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/io/swagger/client/model/InstanceLink.scala index d7aaa8a..36a1d51 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/io/swagger/client/model/InstanceLink.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/io/swagger/client/model/InstanceLink.scala @@ -1,6 +1,29 @@ package de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model import LinkEnums.LinkState +import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport +import spray.json.{DefaultJsonProtocol, DeserializationException, JsString, JsValue, JsonFormat} + +trait InstanceLinkJsonSupport extends SprayJsonSupport with DefaultJsonProtocol { + + implicit val linkStateFormat: JsonFormat[LinkState] = new JsonFormat[LinkState] { + override def read(value: JsValue): LinkState = value match { + case JsString(s) => s match { + case "Assigned" => LinkState.Assigned + case "Outdated" => LinkState.Outdated + case "Failed" => LinkState.Failed + case x => throw DeserializationException(s"Unexpected string value $x for LinkState.") + } + case y => throw DeserializationException(s"Unexpected type $y during deserialization of LinkState") + } + + override def write(linkState: LinkState): JsValue = JsString(linkState.toString) + } + + implicit val instanceLinkFormat: JsonFormat[InstanceLink] = + jsonFormat3(InstanceLink) +} + final case class InstanceLink(idFrom: Long, idTo:Long, linkState: LinkState) diff --git a/src/test/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandlerTest.scala b/src/test/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandlerTest.scala index 5cf36e1..8a92b21 100644 --- a/src/test/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandlerTest.scala +++ b/src/test/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandlerTest.scala @@ -5,8 +5,9 @@ import java.io.File import akka.actor.ActorSystem import akka.stream.ActorMaterializer import de.upb.cs.swt.delphi.instanceregistry.Docker._ -import de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model.Instance +import de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model.{Instance, InstanceLink} import de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model.InstanceEnums.{ComponentType, InstanceState} +import de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model.LinkEnums.LinkState import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} import scala.concurrent.ExecutionContext @@ -78,7 +79,7 @@ class RequestHandlerTest extends FlatSpec with Matchers with BeforeAndAfterEach } it must "validate the id before applying a matching result" in { - assert(handler.handleMatchingResult(42, result = false) == handler.OperationResult.IdUnknown) + assert(handler.handleMatchingResult(callerId = 41, matchedInstanceId = 42, matchingSuccess = false) == handler.OperationResult.IdUnknown) } it must "change the instance state when matching results are applied" in { @@ -87,17 +88,23 @@ class RequestHandlerTest extends FlatSpec with Matchers with BeforeAndAfterEach assert(register1.isSuccess) assert(register2.isSuccess) - assert(handler.handleMatchingResult(42, result = true) == handler.OperationResult.Ok) + //Add Link to prevent internal error later + assert(handler.instanceDao.addLink(InstanceLink(42,43, LinkState.Assigned)).isSuccess) + + assert(handler.handleMatchingResult(callerId = 43, matchedInstanceId = 42, matchingSuccess = true) == handler.OperationResult.Ok) assert(handler.getInstance(42).get.instanceState == InstanceState.Running) - assert(handler.handleMatchingResult(43, result = false) == handler.OperationResult.Ok) + assert(handler.handleMatchingResult(callerId = 42, matchedInstanceId = 43, matchingSuccess = false) == handler.OperationResult.Ok) assert(handler.getInstance(43).get.instanceState == InstanceState.NotReachable) } it must "not change the instance state on invalid state transitions" in { val register = handler.instanceDao.addInstance(buildInstance(42, Some("RandomDockerId"), InstanceState.Failed)) + val register2 = handler.instanceDao.addInstance(buildInstance(43, Some("RandomDockerId2"), InstanceState.Running)) + assert(register.isSuccess) + assert(register2.isSuccess) - assert(handler.handleMatchingResult(42, result = true) == handler.OperationResult.Ok) + assert(handler.handleMatchingResult(callerId = 43, matchedInstanceId = 42, matchingSuccess = true) == handler.OperationResult.Ok) assert(handler.getInstance(42).get.instanceState == InstanceState.Failed) } @@ -193,11 +200,7 @@ class RequestHandlerTest extends FlatSpec with Matchers with BeforeAndAfterEach }*/ it must "validate preconditions on handleStop" in { - val register1 = handler.instanceDao.addInstance(buildInstance(1, None)) - assert(register1.isSuccess) - assert(handler.handleStop(Int.MaxValue) == handler.OperationResult.IdUnknown) - assert(handler.handleStop(1) == handler.OperationResult.NoDockerContainer) } //Below test is not applicable anymore, state change is managed in futures! @@ -253,46 +256,38 @@ class RequestHandlerTest extends FlatSpec with Matchers with BeforeAndAfterEach assert(handler.getNumberOfInstances(ComponentType.ElasticSearch) == 1) } - it must "match to instance with most consecutive positive matching results" in { + it must "match to instance with most consecutive positive matching results if no links are present" in { val esInstance = handler.handleRegister(buildInstance(2)) - assert(esInstance.isSuccess) - assert(esInstance.get == 1) - - assert(handler.handleMatchingResult(0, result = false) == handler.OperationResult.Ok) - assert(handler.handleMatchingResult(0, result = true) == handler.OperationResult.Ok) - assert(handler.handleMatchingResult(0, result = true) == handler.OperationResult.Ok) + val crawlerId = handler.handleRegister(Instance(Some(2), "foo", 42, "bar", ComponentType.Crawler, None, InstanceState.Running)) - assert(handler.handleMatchingResult(1, result = true) == handler.OperationResult.Ok) - assert(handler.handleMatchingResult(1, result = false) == handler.OperationResult.Ok) - - val matchingInstance = handler.getMatchingInstanceOfType(ComponentType.ElasticSearch) - assert(matchingInstance.isSuccess) - assert(matchingInstance.get.id.get == 0) - - assert(handler.handleDeregister(1L) == handler.OperationResult.Ok) - } - - it must "match to instance with most positive matching results" in { - val esInstance = handler.handleRegister(buildInstance(2)) assert(esInstance.isSuccess) assert(esInstance.get == 1) + assert(crawlerId.isSuccess) + assert(crawlerId.get == 2) + + //Add Links to prevent errors later + assert(handler.instanceDao.addLink(InstanceLink(1,0,LinkState.Assigned)).isSuccess) + assert(handler.instanceDao.addLink(InstanceLink(0,1,LinkState.Assigned)).isSuccess) - assert(handler.handleMatchingResult(0, result = true) == handler.OperationResult.Ok) - assert(handler.handleMatchingResult(0, result = true) == handler.OperationResult.Ok) - assert(handler.handleMatchingResult(0, result = false) == handler.OperationResult.Ok) + assert(handler.handleMatchingResult(callerId = 1, matchedInstanceId = 0, matchingSuccess = false) == handler.OperationResult.Ok) + assert(handler.handleMatchingResult(callerId = 1, matchedInstanceId = 0, matchingSuccess = true) == handler.OperationResult.Ok) + assert(handler.handleMatchingResult(callerId = 1, matchedInstanceId = 0, matchingSuccess = true) == handler.OperationResult.Ok) - assert(handler.handleMatchingResult(1, result = false) == handler.OperationResult.Ok) - assert(handler.handleMatchingResult(1, result = false) == handler.OperationResult.Ok) + assert(handler.handleMatchingResult(callerId = 0, matchedInstanceId = 1, matchingSuccess = true) == handler.OperationResult.Ok) + assert(handler.handleMatchingResult(callerId = 0, matchedInstanceId = 1, matchingSuccess = false) == handler.OperationResult.Ok) - val matchingInstance = handler.getMatchingInstanceOfType(ComponentType.ElasticSearch) + val matchingInstance = handler.getMatchingInstanceOfType(callerId = 2, ComponentType.ElasticSearch) assert(matchingInstance.isSuccess) assert(matchingInstance.get.id.get == 0) assert(handler.handleDeregister(1L) == handler.OperationResult.Ok) + assert(handler.handleDeregister(2L) == handler.OperationResult.Ok) } it must "fail to match if no instance of type is present" in { - assert(handler.getMatchingInstanceOfType(ComponentType.Crawler).isFailure) + val register = handler.handleRegister(Instance(None, "foo", 42, "bar", ComponentType.WebApp, None, InstanceState.Running)) + assert(register.isSuccess && register.get == 1) + assert(handler.getMatchingInstanceOfType(1, ComponentType.WebApi).isFailure) } override protected def afterEach(): Unit = { From fe5f75f46370ef8b724d5f298befe67ba9286861 Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Sun, 4 Nov 2018 18:11:06 +0100 Subject: [PATCH 06/14] InstanceLinks are now accessible in the API (linksFrom?Id=42, linksTo?Id=42). Whole network is now accessible (new type InstanceNetwork) via /network. --- OpenAPISpecification.yaml | 127 ++++++++++++++++-- .../instanceregistry/RequestHandler.scala | 34 +++++ .../instanceregistry/connection/Server.scala | 59 +++++++- .../daos/DynamicInstanceDAO.scala | 9 +- .../instanceregistry/daos/InstanceDAO.scala | 16 ++- .../client/model/InstanceNetwork.scala | 10 ++ 6 files changed, 238 insertions(+), 17 deletions(-) create mode 100644 src/main/scala/de/upb/cs/swt/delphi/instanceregistry/io/swagger/client/model/InstanceNetwork.scala diff --git a/OpenAPISpecification.yaml b/OpenAPISpecification.yaml index 6667d70..df04ec6 100644 --- a/OpenAPISpecification.yaml +++ b/OpenAPISpecification.yaml @@ -239,6 +239,67 @@ paths: $ref: '#/definitions/Event' '404': description: Instance not found + /linksFrom: + get: + tags: + - Basic Operations + summary: Retrieves outgoing links from an instance + description: >- + This command retreives a list of outgoing links from the instance with the specified id. + operationId: linksFrom + parameters: + - name: Id + in: query + description: Id of the instance + required: true + type: integer + format: int64 + responses: + '200': + description: List of InstanceLinks from the specified instance + schema: + type: array + items: + $ref: '#/definitions/InstanceLink' + '404': + description: Instance not found + /linksTo: + get: + tags: + - Basic Operations + summary: Retrieves incoming links to an instance + description: >- + This command retreives a list of incoming links from the instance with the specified id. + operationId: linksTo + parameters: + - name: Id + in: query + description: Id of the instance + required: true + type: integer + format: int64 + responses: + '200': + description: List of InstanceLinks to the specified instance + schema: + type: array + items: + $ref: '#/definitions/InstanceLink' + '404': + description: Instance not found + /network: + get: + tags: + - Basic Operations + summary: Retrieves the current instance network + description: >- + Retrieves the instance network, meaning a list of all instances as well as a list of all links currently registered at the registry. + operationId: network + responses: + '200': + description: The instance network + schema: + $ref: '#/definitions/InstanceNetwork' /deploy: post: tags: @@ -430,10 +491,11 @@ paths: - Docker Operations summary: Stops the specified instances' docker container description: >- - This command stops the specified instance. If the instance is running inside a docker container, - the container will be stopped. If not, the instance will be gracefully shut down by calling its - /stop endpoint. Will change the instance state to 'Stopped' for docker containers, will remove the - instance for non-docker instances. + This command stops the specified instance. If the instance is running + inside a docker container, the container will be stopped. If not, the + instance will be gracefully shut down by calling its /stop endpoint. + Will change the instance state to 'Stopped' for docker containers, will + remove the instance for non-docker instances. operationId: stop parameters: - in: query @@ -446,9 +508,7 @@ paths: '202': description: 'Accepted, the operation will be completed in the future.' '400': - description: >- - Bad request, the instance with the specified ID is already - stopped. + description: 'Bad request, the instance with the specified ID is already stopped.' '404': description: ID not found on server '500': @@ -515,8 +575,9 @@ paths: - Docker Operations summary: Assignes a new dependency to the specified instance description: >- - This command assignes a new dependency to the instance with the specified id. Internally, this will - stop the instance, assign the new dependency and start the instance again. This is why this is only + This command assignes a new dependency to the instance with the + specified id. Internally, this will stop the instance, assign the new + dependency and start the instance again. This is why this is only applicable to docker instances. operationId: assignInstance parameters: @@ -533,15 +594,55 @@ paths: type: integer format: int64 responses: - '202' : + '202': description: 'Accepted, the operation will be completed in the future.' - '400' : - description: 'Bad request, the instance with the specified ID is no running inside a docker container or the assigned instance is of the wrong component type.' + '400': + description: >- + Bad request, the instance with the specified ID is no running inside + a docker container or the assigned instance is of the wrong + component type. '404': - description: 'One of the ids was not found on the server' + description: One of the ids was not found on the server '500': description: Internal server error definitions: + InstanceNetwork: + type: object + required: + - instances + - links + properties: + instances: + type: array + items: + $ref: '#/definitions/Instance' + links: + type: array + items: + $ref: '#/definitions/InstanceLink' + InstanceLink: + type: object + required: + - idFrom + - idTo + - linkState + properties: + idFrom: + type: integer + format: int64 + example: 0 + idTo: + type: integer + format: int64 + example: 42 + linkState: + type: string + description: Valid states for an InstanceLink + example: Assigned + enum: + - Assigned + - Outdated + - Failed Event: type: object required: diff --git a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandler.scala b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandler.scala index 39513b7..923151b 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandler.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandler.scala @@ -558,6 +558,40 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection) } } + /** + * Retrieves links from the instance with the specified id. + * @param id Id of the specified instance + * @return Success(listOfLinks) if id is present, Failure otherwise + */ + def handleGetLinksFrom(id: Long) : Try[List[InstanceLink]] = { + if(!instanceDao.hasInstance(id)){ + Failure(new RuntimeException(s"Cannot get links from $id, that id is unknown.")) + } else { + Success(instanceDao.getLinksFrom(id)) + } + } + + /** + * Retrieves links to the instance with the specified id. + * @param id Id of the specified instance + * @return Success(listOfLinks) if id is present, Failure otherwise + */ + def handleGetLinksTo(id: Long) : Try[List[InstanceLink]] = { + if(!instanceDao.hasInstance(id)){ + Failure(new RuntimeException(s"Cannot get links to $id, that id is unknown.")) + } else { + Success(instanceDao.getLinksTo(id)) + } + } + + /** + * Retrieves the current instance network, containing all instances and instance links. + * @return InstanceNetwork + */ + def handleGetNetwork() : InstanceNetwork = { + instanceDao.getNetwork() + } + /** * Tries to match caller to specified component type based on links stored in the dao. If one link is present, it will * be selected regardless of its state. If multiple links are present, the assigned link will be returned. If none of diff --git a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/connection/Server.scala b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/connection/Server.scala index b33e3de..955bd34 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/connection/Server.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/connection/Server.scala @@ -19,7 +19,12 @@ import scala.util.{Failure, Success} /** * Web server configuration for Instance Registry API. */ -object Server extends HttpApp with InstanceJsonSupport with EventJsonSupport with AppLogging { +object Server extends HttpApp + with InstanceJsonSupport + with EventJsonSupport + with InstanceLinkJsonSupport + with InstanceNetworkJsonSupport + with AppLogging { implicit val system : ActorSystem = Registry.system implicit val materializer : ActorMaterializer = ActorMaterializer() @@ -37,6 +42,9 @@ object Server extends HttpApp with InstanceJsonSupport with EventJsonSupport wit path("matchingInstance") { matchingInstance()} ~ path("matchingResult") { matchInstance()} ~ path("eventList") { eventList()} ~ + path("linksFrom") { linksFrom()} ~ + path("linksTo") { linksTo()} ~ + path("network") { network()} ~ /****************DOCKER OPERATIONS****************/ path("deploy") { deployContainer()} ~ path("reportStart") { reportStart()} ~ @@ -476,6 +484,55 @@ object Server extends HttpApp with InstanceJsonSupport with EventJsonSupport wit } } + /** + * Called to get a list of links from the instance with the specified id. The id is passed as query argument named + * 'Id' (so the resulting call is /linksFrom?Id=42). + * @return Server route that either maps to 200 OK (and the list of links as content), or the respective error code. + */ + def linksFrom() : server.Route = parameters('Id.as[Long]) { id => + get { + log.debug(s"GET /linksFrom?Id=$id has been called.") + + handler.handleGetLinksFrom(id) match { + case Success(linkList) => + complete{linkList} + case Failure(ex) => + log.warning(s"Failed to get links from $id with message: ${ex.getMessage}") + complete{HttpResponse(StatusCodes.NotFound, entity = s"Failed to get links from $id, that id is not known.")} + } + } + } + + /** + * Called to get a list of links to the instance with the specified id. The id is passed as query argument named + * 'Id' (so the resulting call is /linksTo?Id=42). + * @return Server route that either maps to 200 OK (and the list of links as content), or the respective error code. + */ + def linksTo() : server.Route = parameters('Id.as[Long]) {id => + get { + log.debug(s"GET /linksTo?Id=$id has been called.") + + handler.handleGetLinksTo(id) match { + case Success(linkList) => + complete{linkList} + case Failure(ex) => + log.warning(s"Failed to get links to $id with message: ${ex.getMessage}") + complete{HttpResponse(StatusCodes.NotFound, entity = s"Failed to get links to $id, that id is not known.")} + } + } + } + + /** + * Called to get the whole network graph of the current registry. Contains a list of all instances and all links + * currently registered. + * @return Server route that maps to 200 OK and the current InstanceNetwork as content. + */ + def network() : server.Route = { + get { + complete{handler.handleGetNetwork().toJson(InstanceNetworkFormat)} + } + } + /** * Creates a WebSocketConnection that streams events that are issued by the registry to all connected clients. * @return Server route that maps to the WebSocketConnection diff --git a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/daos/DynamicInstanceDAO.scala b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/daos/DynamicInstanceDAO.scala index 19c8290..57ce44d 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/daos/DynamicInstanceDAO.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/daos/DynamicInstanceDAO.scala @@ -5,7 +5,7 @@ import java.io.{File, IOException, PrintWriter} import akka.actor.ActorSystem import akka.stream.ActorMaterializer import de.upb.cs.swt.delphi.instanceregistry.{AppLogging, Configuration, Registry} -import de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model.{Instance, InstanceJsonSupport, InstanceLink, RegistryEvent} +import de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model._ import de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model.InstanceEnums.{ComponentType, InstanceState} import de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model.LinkEnums.LinkState @@ -227,7 +227,7 @@ class DynamicInstanceDAO (configuration : Configuration) extends InstanceDAO wit } } - def getLinksTo(id:Long, state: Option[LinkState] = None) : List[InstanceLink] = { + override def getLinksTo(id:Long, state: Option[LinkState] = None) : List[InstanceLink] = { val links = instanceLinks.filter(link => link.idTo == id) if(state.isDefined){ @@ -237,6 +237,11 @@ class DynamicInstanceDAO (configuration : Configuration) extends InstanceDAO wit } } + override def getNetwork() : InstanceNetwork = { + InstanceNetwork( List() ++ instances, + List() ++ instanceLinks) + } + private[daos] def clearData() : Unit = { instances.clear() instanceMatchingResults.clear() diff --git a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/daos/InstanceDAO.scala b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/daos/InstanceDAO.scala index 1e633d0..951524b 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/daos/InstanceDAO.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/daos/InstanceDAO.scala @@ -1,6 +1,6 @@ package de.upb.cs.swt.delphi.instanceregistry.daos -import de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model.{Instance, InstanceLink, RegistryEvent} +import de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model.{Instance, InstanceLink, InstanceNetwork, RegistryEvent} import de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model.InstanceEnums.{ComponentType, InstanceState} import de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model.LinkEnums.LinkState @@ -130,4 +130,18 @@ trait InstanceDAO { * @return List of matching InstanceLinks */ def getLinksFrom(idFrom: Long, state: Option[LinkState] = None) : List[InstanceLink] + + /** + * Get all incoming links to the specified instance. Optionally a LinkState can be specified as filter + * @param idFrom Id of the instance + * @param state Option[LinkState] to filter for certain LinkStates. If None, no filter will be applied. + * @return List of matching InstanceLinks + */ + def getLinksTo(idFrom: Long, state: Option[LinkState] = None) : List[InstanceLink] + + /** + * Get the current Instance Network + * @return InstanceNetwork object + */ + def getNetwork() : InstanceNetwork } \ No newline at end of file diff --git a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/io/swagger/client/model/InstanceNetwork.scala b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/io/swagger/client/model/InstanceNetwork.scala new file mode 100644 index 0000000..6407096 --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/io/swagger/client/model/InstanceNetwork.scala @@ -0,0 +1,10 @@ +package de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model + +import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport +import spray.json.{DefaultJsonProtocol, JsonFormat} + +trait InstanceNetworkJsonSupport extends SprayJsonSupport with DefaultJsonProtocol with InstanceJsonSupport with InstanceLinkJsonSupport { + implicit val InstanceNetworkFormat : JsonFormat[InstanceNetwork] = jsonFormat2(InstanceNetwork) +} + +final case class InstanceNetwork (instances: List[Instance], links: List[InstanceLink]) From b4c6b1307c3703546846a03d5d44fb8e56298b87 Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Sun, 4 Nov 2018 18:23:13 +0100 Subject: [PATCH 07/14] Fixed two http query argument names being lowercase instead of uppercase. --- OpenAPISpecification.yaml | 4 ++-- .../cs/swt/delphi/instanceregistry/connection/Server.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/OpenAPISpecification.yaml b/OpenAPISpecification.yaml index df04ec6..1113ce7 100644 --- a/OpenAPISpecification.yaml +++ b/OpenAPISpecification.yaml @@ -191,13 +191,13 @@ paths: operationId: matchInstance parameters: - in: query - name: callerId + name: CallerId description: The ID of the instance that is calling this endpoint required: true type: integer format: int64 - in: query - name: matchedInstanceId + name: MatchedInstanceId description: The ID of the instance that the sender was matched to. required: true type: integer diff --git a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/connection/Server.scala b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/connection/Server.scala index 955bd34..ba91b08 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/connection/Server.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/connection/Server.scala @@ -195,7 +195,7 @@ object Server extends HttpApp * parameters named 'Id' and 'MatchingSuccessful' (so the call is /matchingResult?Id=42&MatchingSuccessful=True). * @return Server route that either maps to 200 OK or to the respective error codes */ - def matchInstance() : server.Route = parameters('callerId.as[Long], 'matchedInstanceId.as[Long], 'MatchingSuccessful.as[Boolean]){ (callerId, matchedInstanceId, matchingResult) => + def matchInstance() : server.Route = parameters('CallerId.as[Long], 'MatchedInstanceId.as[Long], 'MatchingSuccessful.as[Boolean]){ (callerId, matchedInstanceId, matchingResult) => post { log.debug(s"POST /matchingResult?callerId=$callerId&matchedInstanceId=$matchedInstanceId&MatchingSuccessful=$matchingResult has been called") From 95ecb97f7ad2c72d431f06834ec0e4d87f8d0549 Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Mon, 5 Nov 2018 17:19:31 +0100 Subject: [PATCH 08/14] Moved URIs of default ES instance and registry to configuration file. Added hint in Requirements section of readme. --- README.md | 1 + .../cs/swt/delphi/instanceregistry/Configuration.scala | 10 ++++++++++ .../delphi/instanceregistry/Docker/DockerActor.scala | 3 ++- .../swt/delphi/instanceregistry/RequestHandler.scala | 8 +++++++- 4 files changed, 20 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 0a3bc2e..21d8ab3 100644 --- a/README.md +++ b/README.md @@ -39,6 +39,7 @@ To obtain these images, checkout the respective repositories ([here](https://git sbt docker:publishLocal ``` inside their root directory. This will build the docker images and register them directly at the local docker registry. +The registry requires an initial instance of ElasticSearch to be running. The default location for this is *elasticsearch://172.17.0.1:9200*, however this can be changed in the *Configuration.scala* file at *src/main/scala/de/upb/cs/swt/delphi/instanceregistry*. ## Contributing diff --git a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/Configuration.scala b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/Configuration.scala index 70ed253..2444ee2 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/Configuration.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/Configuration.scala @@ -1,19 +1,29 @@ package de.upb.cs.swt.delphi.instanceregistry class Configuration( ) { + //Where to host the http server val bindHost: String = "0.0.0.0" val bindPort: Int = 8087 + + val recoveryFileName : String = "dump.temp" + //Default ports for the Delphi components val defaultCrawlerPort: Int = 8882 val defaultWebApiPort: Int = 8080 val defaultWepAppPort: Int = 8085 + //Names of the docker images for the Delphi components val crawlerDockerImageName: String = "delphi-crawler:1.0.0-SNAPSHOT" val webApiDockerImageName: String = "delphi-webapi:1.0.0-SNAPSHOT" val webAppDockerImageName: String = "delphi-webapp:1.0.0-SNAPSHOT" + //Where the initial ElasticSearch instance is located at + val defaultElasticSearchInstanceHost: String = "elasticsearch://172.17.0.1" + val defaultElasticSearchInstancePort: Int = 9200 + //Where this registry can be contacted at inside the LAN + val uriInLocalNetwork: String = "http://172.17.0.1:8087" } diff --git a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/Docker/DockerActor.scala b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/Docker/DockerActor.scala index ffeb8db..ceed7d6 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/Docker/DockerActor.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/Docker/DockerActor.scala @@ -34,7 +34,8 @@ class DockerActor(connection: DockerConnection) extends Actor with ActorLogging } case create(componentType, instanceId, containerName) => - val containerConfig = ContainerConfig(Image = DockerImage.getImageName(componentType), Env = Seq(s"INSTANCE_ID=$instanceId", "DELPHI_IR_URI=http://172.17.0.1:8087")) + val containerConfig = ContainerConfig(Image = DockerImage.getImageName(componentType), + Env = Seq(s"INSTANCE_ID=$instanceId", s"DELPHI_IR_URI=${Registry.configuration.uriInLocalNetwork}")) val createCommand = Try(Await.result(container.create(containerConfig, containerName), Duration.Inf)) createCommand match { diff --git a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandler.scala b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandler.scala index 923151b..be7aacd 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandler.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandler.scala @@ -39,7 +39,13 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection) instanceDao.initialize() if (!instanceDao.allInstances().exists(instance => instance.name.equals("Default ElasticSearch Instance"))) { //Add default ES instance - handleRegister(Instance(None, "elasticsearch://172.17.0.1", 9200, "Default ElasticSearch Instance", ComponentType.ElasticSearch, None, InstanceState.Running)) + handleRegister(Instance(None, + configuration.defaultElasticSearchInstanceHost, + configuration.defaultElasticSearchInstancePort, + "Default ElasticSearch Instance", + ComponentType.ElasticSearch, + None, + InstanceState.Running)) } log.info("Done initializing request handler.") } From c11ec38796bb371b7b23a5f112ff75458546f0a8 Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Mon, 5 Nov 2018 17:55:00 +0100 Subject: [PATCH 09/14] Instances now have a list of string labels assigned to them. Registry now provides an endpoint to add labels to an instance. Label length is limited to a maximum number of 50 characters (may be adapted in configuration file). --- .../instanceregistry/Configuration.scala | 2 ++ .../instanceregistry/RequestHandler.scala | 32 +++++++++++++++-- .../instanceregistry/connection/Server.scala | 23 ++++++++++++ .../daos/DynamicInstanceDAO.scala | 36 ++++++++++++++++++- .../instanceregistry/daos/InstanceDAO.scala | 8 +++++ .../io/swagger/client/model/Instance.scala | 5 +-- 6 files changed, 100 insertions(+), 6 deletions(-) diff --git a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/Configuration.scala b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/Configuration.scala index 2444ee2..7bc92ed 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/Configuration.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/Configuration.scala @@ -25,6 +25,8 @@ class Configuration( ) { //Where this registry can be contacted at inside the LAN val uriInLocalNetwork: String = "http://172.17.0.1:8087" + val maxLabelLength: Int = 50 + } diff --git a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandler.scala b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandler.scala index be7aacd..30487b1 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandler.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandler.scala @@ -45,7 +45,8 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection) "Default ElasticSearch Instance", ComponentType.ElasticSearch, None, - InstanceState.Running)) + InstanceState.Running, + List("Default"))) } log.info("Done initializing request handler.") } @@ -69,7 +70,7 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection) val newInstance = Instance(id = Some(newID), name = instance.name, host = instance.host, portNumber = instance.portNumber, componentType = instance.componentType, - dockerId = None, instanceState = InstanceState.Running) + dockerId = None, instanceState = InstanceState.Running, labels = instance.labels) instanceDao.addInstance(newInstance) match { case Success(_) => @@ -249,7 +250,15 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection) val normalizedHost = host.substring(1,host.length - 1) log.info(s"Deployed new container with id $dockerId, host $normalizedHost and port $port.") - val newInstance = Instance(Some(newId), normalizedHost, port, name.getOrElse(s"Generic $componentType"), componentType, Some(dockerId), InstanceState.Deploying) + val newInstance = Instance(Some(newId), + normalizedHost, + port, + name.getOrElse(s"Generic $componentType"), + componentType, + Some(dockerId), + InstanceState.Deploying, + List.empty[String] + ) log.info(s"Registering instance $newInstance....") instanceDao.addInstance(newInstance) match { @@ -598,6 +607,23 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection) instanceDao.getNetwork() } + /** + * Add label to instance with specified id + * @param id Instance id + * @param label Label to add + * @return OperationResult + */ + def handleAddLabel(id: Long, label: String) : OperationResult.Value = { + if(!instanceDao.hasInstance(id)){ + OperationResult.IdUnknown + } else { + instanceDao.addLabelFor(id, label) match { + case Success(_) => OperationResult.Ok + case Failure(_) => OperationResult.InternalError + } + } + } + /** * Tries to match caller to specified component type based on links stored in the dao. If one link is present, it will * be selected regardless of its state. If multiple links are present, the assigned link will be returned. If none of diff --git a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/connection/Server.scala b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/connection/Server.scala index ba91b08..03e5ee0 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/connection/Server.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/connection/Server.scala @@ -45,6 +45,7 @@ object Server extends HttpApp path("linksFrom") { linksFrom()} ~ path("linksTo") { linksTo()} ~ path("network") { network()} ~ + path("addLabel") { addLabel()} ~ /****************DOCKER OPERATIONS****************/ path("deploy") { deployContainer()} ~ path("reportStart") { reportStart()} ~ @@ -533,6 +534,28 @@ object Server extends HttpApp } } + /** + * Called to add a generic label to the instance with the specified id. The Id and label are passed as query arguments + * named 'Id' and 'Label', resp. (so the resulting call is /addLabel?Id=42&Label=private) + * @return Server route that either maps to 200 OK or the respective error codes. + */ + def addLabel() : server.Route = parameters('Id.as[Long], 'Label.as[String]){ (id, label) => + post { + handler.handleAddLabel(id, label) match { + case handler.OperationResult.IdUnknown => + log.warning(s"Cannot add label $label to $id, id not found.") + complete{HttpResponse(StatusCodes.NotFound, entity = s"Cannot add label, id $id not found.")} + case handler.OperationResult.InternalError => + log.warning(s"Error while adding label $label to $id: Label exceeds character limit.") + complete{HttpResponse(StatusCodes.BadRequest, + entity = s"Cannot add label to $id, label exceeds character limit of ${Registry.configuration.maxLabelLength}")} + case handler.OperationResult.Ok => + log.info(s"Successfully added label $label to instance with id $id.") + complete("Successfully added label") + } + } + } + /** * Creates a WebSocketConnection that streams events that are issued by the registry to all connected clients. * @return Server route that maps to the WebSocketConnection diff --git a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/daos/DynamicInstanceDAO.scala b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/daos/DynamicInstanceDAO.scala index 57ce44d..1df28e4 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/daos/DynamicInstanceDAO.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/daos/DynamicInstanceDAO.scala @@ -154,7 +154,14 @@ class DynamicInstanceDAO (configuration : Configuration) extends InstanceDAO wit override def setStateFor(id: Long, state: InstanceState.Value): Try[Unit] = { if(hasInstance(id)){ val instance = getInstance(id).get - val newInstance = Instance(instance.id, instance.host, instance.portNumber, instance.name, instance.componentType, instance.dockerId, state) + val newInstance = Instance(instance.id, + instance.host, + instance.portNumber, + instance.name, + instance.componentType, + instance.dockerId, + state, + instance.labels) instances.remove(instance) instances.add(newInstance) Success() @@ -163,6 +170,33 @@ class DynamicInstanceDAO (configuration : Configuration) extends InstanceDAO wit } } + override def addLabelFor(id: Long, label: String): Try[Unit] = { + if(hasInstance(id)){ + val instance = getInstance(id).get + if(instance.labels.exists(l => l.equalsIgnoreCase(label))){ + Success() //Label already present, Success! + } else { + if(label.length > configuration.maxLabelLength){ + Failure(new RuntimeException(s"Label exceeds character limit of ${configuration.maxLabelLength}.")) + } else { + val newInstance = Instance(instance.id, + instance.host, + instance.portNumber, + instance.name, + instance.componentType, + instance.dockerId, + instance.instanceState, + instance.labels ++ List[String](label)) + instances.remove(instance) + instances.add(newInstance) + Success() + } + } + } else { + Failure(new RuntimeException(s"Instance with id $id was not found.")) + } + } + override def addEventFor(id: Long, event: RegistryEvent) : Try[Unit] = { if(hasInstance(id)){ instanceEvents(id) += event diff --git a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/daos/InstanceDAO.scala b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/daos/InstanceDAO.scala index 951524b..dd14942 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/daos/InstanceDAO.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/daos/InstanceDAO.scala @@ -144,4 +144,12 @@ trait InstanceDAO { * @return InstanceNetwork object */ def getNetwork() : InstanceNetwork + + /** + * Adds a label to the instance with the specified id + * @param id Id of the instance + * @param label Label to add + * @return Success if instance is present and label does not exceed character limit, false otherwise. + */ + def addLabelFor(id: Long, label: String) : Try[Unit] } \ No newline at end of file diff --git a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/io/swagger/client/model/Instance.scala b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/io/swagger/client/model/Instance.scala index 7e01fa2..fd144d9 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/io/swagger/client/model/Instance.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/io/swagger/client/model/Instance.scala @@ -69,7 +69,7 @@ trait InstanceJsonSupport extends SprayJsonSupport with DefaultJsonProtocol { } //JSON format for Instances - implicit val instanceFormat : JsonFormat[Instance] = jsonFormat7(Instance) + implicit val instanceFormat : JsonFormat[Instance] = jsonFormat8(Instance) } /** @@ -89,7 +89,8 @@ final case class Instance ( name: String, componentType: ComponentType, dockerId: Option[String], - instanceState: InstanceState + instanceState: InstanceState, + labels: List[String] ) /** From e7c9d1d0c0c0477fce455c9e983d3f8ad2245aba Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Mon, 5 Nov 2018 17:59:31 +0100 Subject: [PATCH 10/14] Add swagger file that i missed in last commit. --- OpenAPISpecification.yaml | 50 +++++++++++++++++++++++++++++++++------ 1 file changed, 43 insertions(+), 7 deletions(-) diff --git a/OpenAPISpecification.yaml b/OpenAPISpecification.yaml index 1113ce7..c8534e7 100644 --- a/OpenAPISpecification.yaml +++ b/OpenAPISpecification.yaml @@ -241,11 +241,12 @@ paths: description: Instance not found /linksFrom: get: - tags: + tags: - Basic Operations summary: Retrieves outgoing links from an instance description: >- - This command retreives a list of outgoing links from the instance with the specified id. + This command retreives a list of outgoing links from the instance with + the specified id. operationId: linksFrom parameters: - name: Id @@ -265,11 +266,12 @@ paths: description: Instance not found /linksTo: get: - tags: + tags: - Basic Operations summary: Retrieves incoming links to an instance description: >- - This command retreives a list of incoming links from the instance with the specified id. + This command retreives a list of incoming links from the instance with + the specified id. operationId: linksTo parameters: - name: Id @@ -293,13 +295,42 @@ paths: - Basic Operations summary: Retrieves the current instance network description: >- - Retrieves the instance network, meaning a list of all instances as well as a list of all links currently registered at the registry. + Retrieves the instance network, meaning a list of all instances as well + as a list of all links currently registered at the registry. operationId: network responses: '200': description: The instance network schema: $ref: '#/definitions/InstanceNetwork' + /addLabel: + post: + tags: + - Basic Operations + summary: Add a label to the instance with the specified id + description: >- + This command will add the specified label to the instance with the specified + id. + operationId: addLabel + parameters: + - name: Id + in: query + description: Id of the instance + required: true + type: integer + format: int64 + - name: Label + in: query + description: The label to add to the instance + required: true + type: string + responses: + '200': + description: Label successfully added + '400': + description: Bad request, your label exceeded the character limit + '404': + description: Not found, the id you specified could not be found /deploy: post: tags: @@ -615,11 +646,11 @@ definitions: instances: type: array items: - $ref: '#/definitions/Instance' + $ref: '#/definitions/Instance' links: type: array items: - $ref: '#/definitions/InstanceLink' + $ref: '#/definitions/InstanceLink' InstanceLink: type: object required: @@ -704,3 +735,8 @@ definitions: - Stopped - Paused - NotReachable + labels: + type: array + items: + type: string + example: ["private", "debug"] From 470f65a9305b16a3396ec3d332186a7e60a030df Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Tue, 6 Nov 2018 20:03:57 +0100 Subject: [PATCH 11/14] Implemented label-based matching as second try after link-based matching. Adapted RequestHandlerTest to new feautures. Fixed a bug in requesthandler. --- .../instanceregistry/RequestHandler.scala | 57 ++++++- .../instanceregistry/RequestHandlerTest.scala | 149 ++++++++++++++---- .../daos/DynamicInstanceDAOTest.scala | 4 +- 3 files changed, 167 insertions(+), 43 deletions(-) diff --git a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandler.scala b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandler.scala index 30487b1..6a5981a 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandler.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandler.scala @@ -126,13 +126,20 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection) Success(instance) case Failure(ex) => log.warning(s"Matching pending: First try failed, message was ${ex.getMessage}") - tryDefaultMatching(compType) match { + tryLabelMatching(callerId, compType) match { case Success(instance) => - log.info(s"Matching finished: Default matching yielded result $instance.") + log.info(s"Matching finished: Second try yielded result $instance.") Success(instance) case Failure(ex2) => - log.warning(s"Matching failed: Default matching did not yield result, message was ${ex2.getMessage}.") - Failure(ex2) + log.warning(s"Matching pending: Second try failed, message was ${ex2.getMessage}") + tryDefaultMatching(compType) match { + case Success(instance) => + log.info(s"Matching finished: Default matching yielded result $instance.") + Success(instance) + case Failure(ex3) => + log.warning(s"Matching failed: Default matching did not yield result, message was ${ex3.getMessage}.") + Failure(ex3) + } } } } @@ -165,7 +172,7 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection) OperationResult.NoDockerContainer } else { val instance = instanceDao.getInstance(instanceId).get - val dependency = instanceDao.getInstance(instanceId).get + val dependency = instanceDao.getInstance(newDependencyId).get if(assignmentAllowed(instance.componentType) && compatibleTypes(instance.componentType, dependency.componentType)){ val link = InstanceLink(instanceId, newDependencyId, LinkState.Assigned) @@ -636,7 +643,7 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection) private def tryLinkMatching(callerId: Long, componentType: ComponentType) : Try[Instance] = { log.info(s"Matching first try: Analyzing links for $callerId...") - val links = instanceDao.getLinksFrom(callerId) + val links = instanceDao.getLinksFrom(callerId).filter(link => link.linkState == LinkState.Assigned) links.size match { case 0 => @@ -691,6 +698,42 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection) } } + /** + * Tries to match caller to instance of the specified type based on which instance has the most labels in common with + * the caller. Will fail if no such instance is found. + * @param callerId Id of the calling instance + * @param componentType ComponentType to match to + * @return Success(Instance) if successful, Failure otherwise. + */ + private def tryLabelMatching(callerId: Long, componentType: ComponentType) : Try[Instance] = { + log.info(s"Matching second try: Analyzing labels for $callerId...") + + val possibleMatches = instanceDao.getInstancesOfType(componentType) + + possibleMatches.size match { + case 0 => + log.warning(s"Matching second try failed: There are no instances of type $componentType present.") + Failure(new RuntimeException(s"Type $componentType not present.")) + case _ => + val labels = instanceDao.getInstance(callerId).get.labels + + val intersectionList = possibleMatches + .filter(instance => instance.labels.intersect(labels).nonEmpty) + .sortBy(instance => instance.labels.intersect(labels).size) + .reverse + + if(intersectionList.nonEmpty){ + val result = intersectionList.head + val noOfSharedLabels = result.labels.intersect(labels).size + log.info(s"Finished matching second try: Successfully matched to $result based on $noOfSharedLabels shared labels.") + Success(result) + } else { + log.warning(s"Matching second try failed: There are no instance with shared labels to $labels.") + Failure(new RuntimeException(s"No instance with shared labels.")) + } + } + } + private def tryDefaultMatching(componentType: ComponentType) : Try[Instance] = { log.info(s"Matching fallback: Searching for instances of type $componentType ...") getNumberOfInstances(componentType) match { @@ -711,7 +754,7 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection) case None => log.info(s"Matching fallback: Found $x instance of type $componentType, but none of them is running.") - //Match to instance with maxmum number of consecutive positive matching results + //Match to instance with maximum number of consecutive positive matching results var maxConsecutivePositiveResults = 0 var instanceToMatch: Instance = null diff --git a/src/test/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandlerTest.scala b/src/test/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandlerTest.scala index 8a92b21..2c557c6 100644 --- a/src/test/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandlerTest.scala +++ b/src/test/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandlerTest.scala @@ -18,8 +18,8 @@ class RequestHandlerTest extends FlatSpec with Matchers with BeforeAndAfterEach implicit val ec: ExecutionContext = system.dispatcher val handler: RequestHandler = new RequestHandler(new Configuration(), DockerConnection.fromEnvironment()) - private def buildInstance(id: Long, dockerId: Option[String] = None, state: InstanceState.Value = InstanceState.Stopped): Instance = { - Instance(Some(id), "https://localhost", 12345, "TestInstance", ComponentType.ElasticSearch, dockerId, state) + private def buildInstance(id: Long, componentType: ComponentType = ComponentType.ElasticSearch, dockerId: Option[String] = None, state: InstanceState.Value = InstanceState.Stopped, labels: List[String] = List.empty[String]): Instance = { + Instance(Some(id), "https://localhost", 12345, "TestInstance", componentType, dockerId, state, labels) } override protected def beforeEach(): Unit = { @@ -40,7 +40,7 @@ class RequestHandlerTest extends FlatSpec with Matchers with BeforeAndAfterEach } it must "ignore the dockerId and instanceState on registration" in { - val registerInstance = handler.handleRegister(buildInstance(1, Some("RandomDockerId"), InstanceState.Failed)) + val registerInstance = handler.handleRegister(buildInstance(id = 1, dockerId = Some("RandomDockerId"), state = InstanceState.Failed)) assert(registerInstance.isSuccess) val instance = handler.getInstance(registerInstance.get) assert(instance.isDefined) @@ -60,7 +60,7 @@ class RequestHandlerTest extends FlatSpec with Matchers with BeforeAndAfterEach it must "validate preconditions on deregister" in { //Bypass register as it would ignore dockerId! - val registerDockerInstance = handler.instanceDao.addInstance(buildInstance(42, Some("RandomDockerId"))) + val registerDockerInstance = handler.instanceDao.addInstance(buildInstance(id = 42, dockerId = Some("RandomDockerId"))) assert(registerDockerInstance.isSuccess) val dockerInstance = handler.getInstance(42).get @@ -71,7 +71,7 @@ class RequestHandlerTest extends FlatSpec with Matchers with BeforeAndAfterEach } it must "successfully deregister an instance that meets the required preconditions" in { - val registerInstance = handler.handleRegister(buildInstance(1, None)) + val registerInstance = handler.handleRegister(buildInstance(1)) assert(registerInstance.isSuccess) assert(handler.handleDeregister(registerInstance.get) == handler.OperationResult.Ok) @@ -83,8 +83,8 @@ class RequestHandlerTest extends FlatSpec with Matchers with BeforeAndAfterEach } it must "change the instance state when matching results are applied" in { - val register1 = handler.instanceDao.addInstance(buildInstance(42, Some("RandomDockerId"), InstanceState.NotReachable)) - val register2 = handler.instanceDao.addInstance(buildInstance(43, Some("AnotherRandomDockerID"), InstanceState.Running)) + val register1 = handler.instanceDao.addInstance(buildInstance(id = 42, dockerId = Some("RandomDockerId"), state = InstanceState.NotReachable)) + val register2 = handler.instanceDao.addInstance(buildInstance(id = 43, dockerId = Some("AnotherRandomDockerID"), state = InstanceState.Running)) assert(register1.isSuccess) assert(register2.isSuccess) @@ -98,8 +98,8 @@ class RequestHandlerTest extends FlatSpec with Matchers with BeforeAndAfterEach } it must "not change the instance state on invalid state transitions" in { - val register = handler.instanceDao.addInstance(buildInstance(42, Some("RandomDockerId"), InstanceState.Failed)) - val register2 = handler.instanceDao.addInstance(buildInstance(43, Some("RandomDockerId2"), InstanceState.Running)) + val register = handler.instanceDao.addInstance(buildInstance(id = 42, dockerId = Some("RandomDockerId"), state = InstanceState.Failed)) + val register2 = handler.instanceDao.addInstance(buildInstance(id = 43, dockerId = Some("RandomDockerId2"), state = InstanceState.Running)) assert(register.isSuccess) assert(register2.isSuccess) @@ -109,7 +109,7 @@ class RequestHandlerTest extends FlatSpec with Matchers with BeforeAndAfterEach } it must "validate preconditions on report operations" in { - val register = handler.instanceDao.addInstance(buildInstance(42, None)) + val register = handler.instanceDao.addInstance(buildInstance(42)) assert(register.isSuccess) assert(handler.handleReportStart(-1) == handler.OperationResult.IdUnknown) @@ -123,8 +123,8 @@ class RequestHandlerTest extends FlatSpec with Matchers with BeforeAndAfterEach } it must "change the state on reportStart" in { - val register1 = handler.instanceDao.addInstance(buildInstance(42, Some("RandomDockerId"), InstanceState.Stopped)) - val register2 = handler.instanceDao.addInstance(buildInstance(43, Some("RandomDockerId2"), InstanceState.Failed)) + val register1 = handler.instanceDao.addInstance(buildInstance(id = 42, dockerId = Some("RandomDockerId"), state = InstanceState.Stopped)) + val register2 = handler.instanceDao.addInstance(buildInstance(id = 43, dockerId = Some("RandomDockerId2"), state = InstanceState.Failed)) assert(register1.isSuccess) assert(register2.isSuccess) @@ -135,8 +135,8 @@ class RequestHandlerTest extends FlatSpec with Matchers with BeforeAndAfterEach } it must "change states only for valid state transitions on reportStop" in { - val register1 = handler.instanceDao.addInstance(buildInstance(42, Some("RandomDockerId"), InstanceState.Running)) - val register2 = handler.instanceDao.addInstance(buildInstance(43, Some("RandomDockerId2"), InstanceState.Failed)) + val register1 = handler.instanceDao.addInstance(buildInstance(id = 42, dockerId = Some("RandomDockerId"), state = InstanceState.Running)) + val register2 = handler.instanceDao.addInstance(buildInstance(id = 43, dockerId = Some("RandomDockerId2"), state = InstanceState.Failed)) assert(register1.isSuccess) assert(register2.isSuccess) @@ -147,8 +147,8 @@ class RequestHandlerTest extends FlatSpec with Matchers with BeforeAndAfterEach } it must "change the state on reportFailure" in { - val register1 = handler.instanceDao.addInstance(buildInstance(42, Some("RandomDockerId"), InstanceState.Stopped)) - val register2 = handler.instanceDao.addInstance(buildInstance(43, Some("RandomDockerId2"), InstanceState.Running)) + val register1 = handler.instanceDao.addInstance(buildInstance(id = 42, dockerId = Some("RandomDockerId"), state = InstanceState.Stopped)) + val register2 = handler.instanceDao.addInstance(buildInstance(id = 43, dockerId = Some("RandomDockerId2"), state = InstanceState.Running)) assert(register1.isSuccess) assert(register2.isSuccess) @@ -159,8 +159,8 @@ class RequestHandlerTest extends FlatSpec with Matchers with BeforeAndAfterEach } it must "validate preconditions on handlePause" in { - val register1 = handler.instanceDao.addInstance(buildInstance(1, None)) - val register2 = handler.instanceDao.addInstance(buildInstance(2, Some("RandomDockerId"), InstanceState.Failed)) + val register1 = handler.instanceDao.addInstance(buildInstance(1)) + val register2 = handler.instanceDao.addInstance(buildInstance(id = 2, dockerId = Some("RandomDockerId"), state = InstanceState.Failed)) assert(register1.isSuccess) assert(register2.isSuccess) @@ -179,8 +179,8 @@ class RequestHandlerTest extends FlatSpec with Matchers with BeforeAndAfterEach }*/ it must "validate preconditions on handleResume" in { - val register1 = handler.instanceDao.addInstance(buildInstance(1, None)) - val register2 = handler.instanceDao.addInstance(buildInstance(2, Some("RandomDockerId"), InstanceState.Failed)) + val register1 = handler.instanceDao.addInstance(buildInstance(1)) + val register2 = handler.instanceDao.addInstance(buildInstance(id = 2, dockerId = Some("RandomDockerId"), state = InstanceState.Failed)) assert(register1.isSuccess) assert(register2.isSuccess) @@ -213,8 +213,8 @@ class RequestHandlerTest extends FlatSpec with Matchers with BeforeAndAfterEach }*/ it must "validate preconditions on handleStart" in { - val register1 = handler.instanceDao.addInstance(buildInstance(1, None)) - val register2 = handler.instanceDao.addInstance(buildInstance(2, Some("RandomDockerId"), InstanceState.Paused)) + val register1 = handler.instanceDao.addInstance(buildInstance(1)) + val register2 = handler.instanceDao.addInstance(buildInstance(id = 2, dockerId = Some("RandomDockerId"), state = InstanceState.Paused)) assert(register1.isSuccess) assert(register2.isSuccess) @@ -224,7 +224,7 @@ class RequestHandlerTest extends FlatSpec with Matchers with BeforeAndAfterEach } it must "not change the state of the instance on handleStart" in { - val register1 = handler.instanceDao.addInstance(buildInstance(1, Some("RandomDockerId"), InstanceState.Stopped)) + val register1 = handler.instanceDao.addInstance(buildInstance(id = 1, dockerId = Some("RandomDockerId"), state = InstanceState.Stopped)) assert(register1.isSuccess) assert(handler.handleStop(1) == handler.OperationResult.Ok) @@ -232,8 +232,8 @@ class RequestHandlerTest extends FlatSpec with Matchers with BeforeAndAfterEach } it must "validate preconditions on handleDeleteContainer" in { - val register1 = handler.instanceDao.addInstance(buildInstance(1, None)) - val register2 = handler.instanceDao.addInstance(buildInstance(2, Some("RandomDockerId"), InstanceState.Running)) + val register1 = handler.instanceDao.addInstance(buildInstance(1)) + val register2 = handler.instanceDao.addInstance(buildInstance(id = 2, dockerId = Some("RandomDockerId"), state = InstanceState.Running)) assert(register1.isSuccess) assert(register2.isSuccess) @@ -243,7 +243,7 @@ class RequestHandlerTest extends FlatSpec with Matchers with BeforeAndAfterEach } it must "remove instances on handleDeleteContainer" in { - val register1 = handler.instanceDao.addInstance(buildInstance(1, Some("RandomDockerId"), InstanceState.Stopped)) + val register1 = handler.instanceDao.addInstance(buildInstance(id = 1, dockerId = Some("RandomDockerId"), state = InstanceState.Stopped)) assert(register1.isSuccess) assert(handler.handleDeleteContainer(1) == handler.OperationResult.Ok) @@ -256,9 +256,96 @@ class RequestHandlerTest extends FlatSpec with Matchers with BeforeAndAfterEach assert(handler.getNumberOfInstances(ComponentType.ElasticSearch) == 1) } - it must "match to instance with most consecutive positive matching results if no links are present" in { + it must "validate preconditions before adding a label" in { + assert(handler.instanceDao.addInstance(buildInstance(id = 1, labels = List("private"))).isSuccess) + + assert(handler.handleAddLabel(42, "private") == handler.OperationResult.IdUnknown) + assert(handler.handleAddLabel(1, "PrivATe") == handler.OperationResult.Ok) + assert(handler.instanceDao.getInstance(1).get.labels.size == 1) //Do not add same value twice (ignore case) + + val sb: StringBuilder = new StringBuilder("foo") + while(sb.length <= Registry.configuration.maxLabelLength){ + sb.append("x") + } + assert(handler.handleAddLabel(1, sb.toString()) == handler.OperationResult.InternalError) + + assert(handler.handleAddLabel(1, "public") == handler.OperationResult.Ok) + assert(handler.instanceDao.getInstance(1).get.labels.size == 2) + } + + it must "validate preconditions before creating a link" in { + assert(handler.instanceDao.addInstance(buildInstance(id = 1, componentType = ComponentType.WebApi)).isSuccess) + assert(handler.instanceDao.addInstance(buildInstance(id = 2, componentType = ComponentType.WebApp)).isSuccess) + + assert(handler.handleInstanceLinkCreated(-1, Int.MaxValue) == handler.OperationResult.IdUnknown) + assert(handler.handleInstanceLinkCreated(Int.MaxValue, 0) == handler.OperationResult.IdUnknown) + assert(handler.handleInstanceLinkCreated(0, 1) == handler.OperationResult.InvalidTypeForOperation) + assert(handler.handleInstanceLinkCreated(2,0) == handler.OperationResult.InvalidTypeForOperation) + + assert(handler.handleInstanceLinkCreated(2,1) == handler.OperationResult.Ok) + assert(handler.instanceDao.getLinksFrom(2).size == 1) + } + + it must "validate preconditions before assigning new dependencies" in { + assert(handler.instanceDao.addInstance(buildInstance(id = 1, componentType = ComponentType.WebApi)).isSuccess) + assert(handler.instanceDao.addInstance(buildInstance(id = 2, componentType = ComponentType.WebApi)).isSuccess) + assert(handler.instanceDao.addInstance(buildInstance(id = 3, dockerId = Some("random"), componentType = ComponentType.WebApp)).isSuccess) + assert(handler.instanceDao.addInstance(buildInstance(id = 4, dockerId = None, componentType = ComponentType.WebApp)).isSuccess) + + assert(handler.instanceDao.addLink(InstanceLink(3,1, linkState = LinkState.Assigned)).isSuccess) + + assert(handler.handleInstanceAssignment(3, Integer.MAX_VALUE) == handler.OperationResult.IdUnknown) + assert(handler.handleInstanceAssignment(4, 3) == handler.OperationResult.NoDockerContainer) + assert(handler.handleInstanceAssignment(3,2) == handler.OperationResult.Ok) + + assert(handler.instanceDao.getLinksFrom(3).filter(i => i.linkState == LinkState.Assigned).head.idTo == 2) + } + + /** + * MATCHING TESTS + */ + + it must "not match to any instance if no instance of requested type is present" in { + assert(handler.isInstanceIdPresent(0) && handler.instanceDao.getInstance(0).get.componentType == ComponentType.ElasticSearch) + assert(handler.instanceDao.addInstance(buildInstance(id = 1, componentType = ComponentType.WebApp, labels = List("private"))).isSuccess) + + //No WebApi present, must fail + assert(handler.getMatchingInstanceOfType(callerId = 1, compType = ComponentType.WebApi).isFailure) + + //Shared label with elastic search instance, still no WebApi present, must fail + assert(handler.handleAddLabel(id = 0, label = "private") == handler.OperationResult.Ok) + assert(handler.getMatchingInstanceOfType(callerId = 1, compType = ComponentType.WebApi).isFailure) + + //Try component type crawler: Must also fail + assert(handler.getMatchingInstanceOfType(callerId = 1, compType = ComponentType.Crawler).isFailure) + + //Assign a link to an invalid type in the db. Must also fail + assert(handler.instanceDao.addLink(InstanceLink(idFrom = 1, idTo = 0, linkState = LinkState.Assigned)).isSuccess) + assert(handler.getMatchingInstanceOfType(callerId = 1, compType = ComponentType.WebApi).isFailure) + } + + it must "rank assigned links higher than shared labels in matching" in { + assert(handler.instanceDao.addInstance(buildInstance(id = 1, componentType = ComponentType.WebApp, labels = List("private", "new"))).isSuccess) + assert(handler.instanceDao.addInstance(buildInstance(id = 2, componentType = ComponentType.WebApi, labels = List("public", "new"))).isSuccess) + assert(handler.instanceDao.addInstance(buildInstance(id = 3, componentType = ComponentType.WebApi, labels = List("private", "new"))).isSuccess) + + assert(handler.instanceDao.addLink(InstanceLink(idFrom = 1, idTo = 2, linkState = LinkState.Assigned)).isSuccess) + + //Matching must yield the instance that was assigned! + val matching = handler.getMatchingInstanceOfType(callerId = 1, ComponentType.WebApi) + assert(matching.isSuccess) + assert(matching.get.id.get == 2) + + //Now that link is outdated, shared labels "private" & "new" must be deciding factor! + assert(handler.instanceDao.updateLink(InstanceLink(idFrom = 1, idTo = 2, linkState = LinkState.Outdated)).isSuccess) + val matching2 = handler.getMatchingInstanceOfType(callerId = 1, ComponentType.WebApi) + assert(matching2.isSuccess) + assert(matching2.get.id.get == 3) + } + + it must "match to instance with most consecutive positive matching results in fallback matching" in { val esInstance = handler.handleRegister(buildInstance(2)) - val crawlerId = handler.handleRegister(Instance(Some(2), "foo", 42, "bar", ComponentType.Crawler, None, InstanceState.Running)) + val crawlerId = handler.handleRegister(Instance(Some(2), "foo", 42, "bar", ComponentType.Crawler, None, InstanceState.Running, List.empty[String])) assert(esInstance.isSuccess) assert(esInstance.get == 1) @@ -284,12 +371,6 @@ class RequestHandlerTest extends FlatSpec with Matchers with BeforeAndAfterEach assert(handler.handleDeregister(2L) == handler.OperationResult.Ok) } - it must "fail to match if no instance of type is present" in { - val register = handler.handleRegister(Instance(None, "foo", 42, "bar", ComponentType.WebApp, None, InstanceState.Running)) - assert(register.isSuccess && register.get == 1) - assert(handler.getMatchingInstanceOfType(1, ComponentType.WebApi).isFailure) - } - override protected def afterEach(): Unit = { handler.shutdown() } diff --git a/src/test/scala/de/upb/cs/swt/delphi/instanceregistry/daos/DynamicInstanceDAOTest.scala b/src/test/scala/de/upb/cs/swt/delphi/instanceregistry/daos/DynamicInstanceDAOTest.scala index c0fcc51..0f308f6 100644 --- a/src/test/scala/de/upb/cs/swt/delphi/instanceregistry/daos/DynamicInstanceDAOTest.scala +++ b/src/test/scala/de/upb/cs/swt/delphi/instanceregistry/daos/DynamicInstanceDAOTest.scala @@ -10,7 +10,7 @@ class DynamicInstanceDAOTest extends FlatSpec with Matchers with BeforeAndAfterE val dao : DynamicInstanceDAO = new DynamicInstanceDAO(new Configuration()) private def buildInstance(id : Int) : Instance = { - Instance(Some(id), "https://localhost", 12345, "TestInstance", ComponentType.Crawler, None, InstanceState.Stopped) + Instance(Some(id), "https://localhost", 12345, "TestInstance", ComponentType.Crawler, None, InstanceState.Stopped, List.empty[String]) } override protected def beforeEach() : Unit = { @@ -125,7 +125,7 @@ class DynamicInstanceDAOTest extends FlatSpec with Matchers with BeforeAndAfterE it must "return the correct docker ids for instances with a docker id" in { assert(dao.addInstance (Instance(Some(42), "http://localhost", 33449, "AnyName", - ComponentType.WebApi, Some("dockerId"), InstanceState.Running )).isSuccess) + ComponentType.WebApi, Some("dockerId"), InstanceState.Running, List.empty[String] )).isSuccess) assert(dao.getDockerIdFor(42).isSuccess) assert(dao.getDockerIdFor(42).get.equals("dockerId")) } From f036df5bc7380732debc7456a23961a6d406fbb2 Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Tue, 6 Nov 2018 20:23:06 +0100 Subject: [PATCH 12/14] Added tests for InstanceDAO. --- .../daos/DynamicInstanceDAOTest.scala | 33 ++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/src/test/scala/de/upb/cs/swt/delphi/instanceregistry/daos/DynamicInstanceDAOTest.scala b/src/test/scala/de/upb/cs/swt/delphi/instanceregistry/daos/DynamicInstanceDAOTest.scala index 0f308f6..79b9f1c 100644 --- a/src/test/scala/de/upb/cs/swt/delphi/instanceregistry/daos/DynamicInstanceDAOTest.scala +++ b/src/test/scala/de/upb/cs/swt/delphi/instanceregistry/daos/DynamicInstanceDAOTest.scala @@ -1,8 +1,9 @@ package de.upb.cs.swt.delphi.instanceregistry.daos import de.upb.cs.swt.delphi.instanceregistry.Configuration -import de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model.Instance +import de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model.{Instance, InstanceLink, RegistryEventFactory} import de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model.InstanceEnums.{ComponentType, InstanceState} +import de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model.LinkEnums.LinkState import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} class DynamicInstanceDAOTest extends FlatSpec with Matchers with BeforeAndAfterEach{ @@ -130,6 +131,36 @@ class DynamicInstanceDAOTest extends FlatSpec with Matchers with BeforeAndAfterE assert(dao.getDockerIdFor(42).get.equals("dockerId")) } + it must "add events only to instances that have been registered" in { + assert(dao.getEventsFor(1).isSuccess) + assert(dao.getEventsFor(1).get.isEmpty) + + val eventToAdd = RegistryEventFactory.createInstanceAddedEvent(dao.getInstance(1).get) + assert(dao.addEventFor(-1, eventToAdd).isFailure) + assert(dao.addEventFor(1, eventToAdd).isSuccess) + assert(dao.getEventsFor(1).get.size == 1) + assert(dao.getEventsFor(1).get.head == eventToAdd) + } + + it must "verify the presence of instance ids when a link is added" in { + assert(dao.addLink(InstanceLink(-1,2, LinkState.Assigned)).isFailure) + assert(dao.addLink(InstanceLink(42, Integer.MAX_VALUE, LinkState.Assigned)).isFailure) + assert(dao.addLink(InstanceLink(1,2, LinkState.Assigned)).isSuccess) + assert(dao.getLinksFrom(1).size == 1) + } + + it must "update old links in state 'Assigned' on adding a new assigned link." in { + assert(dao.addLink(InstanceLink(1,2, LinkState.Assigned)).isSuccess) + assert(dao.getLinksFrom(1, Some(LinkState.Assigned)).size == 1) + assert(dao.addLink(InstanceLink(1,3, LinkState.Assigned)).isSuccess) + + assert(dao.getLinksFrom(1, Some(LinkState.Outdated)).size == 1) + assert(dao.getLinksFrom(1, Some(LinkState.Outdated)).head.idTo == 2) + + assert(dao.getLinksFrom(1, Some(LinkState.Assigned)).size == 1) + assert(dao.getLinksFrom(1, Some(LinkState.Assigned)).head.idTo == 3) + } + override protected def afterEach() : Unit = { dao.removeAll() dao.deleteRecoveryFile() From 72de30570ef8db121474a667a310dd34a073b2bf Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Wed, 7 Nov 2018 15:21:42 +0100 Subject: [PATCH 13/14] Exposed logs in API. Only works for one line so far, will try to improve this. --- .../Docker/ContainerCommands.scala | 7 ++-- .../instanceregistry/Docker/DockerActor.scala | 4 +- .../instanceregistry/RequestHandler.scala | 27 ++++++++++++++ .../instanceregistry/connection/Server.scala | 37 +++++++++++++++++++ 4 files changed, 70 insertions(+), 5 deletions(-) diff --git a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/Docker/ContainerCommands.scala b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/Docker/ContainerCommands.scala index 1862fa5..3f894f2 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/Docker/ContainerCommands.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/Docker/ContainerCommands.scala @@ -185,14 +185,15 @@ class ContainerCommands(connection: DockerConnection) extends JsonSupport with C def logs( containerId: String )(implicit ec: ExecutionContext): Source[String, NotUsed] = { - val query = Query("all") - val request = Get(buildUri(containersPath / containerId / "logs", query)) + val query = Query("stdout" -> "true" ) + val request = Get(buildUri(containersPath / containerId.substring(0,11) / "logs", query)) val flow = Flow[HttpResponse].map { case HttpResponse(StatusCodes.OK, _, HttpEntity.Chunked(_, chunks), _) => chunks.map(_.data().utf8String) - case HttpResponse(StatusCodes.NotFound, _, _, _) => + case HttpResponse(StatusCodes.NotFound, _, HttpEntity.Strict(_, data), _) => + log.warning(s"DOCKER LOGS FAILED: ${data.utf8String}") throw ContainerNotFoundException(containerId) case response => unknownResponse(response) diff --git a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/Docker/DockerActor.scala b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/Docker/DockerActor.scala index ceed7d6..7849754 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/Docker/DockerActor.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/Docker/DockerActor.scala @@ -105,8 +105,8 @@ class DockerActor(connection: DockerConnection) extends Actor with ActorLogging } case logs(containerId: String) => - log.info(s"Fetching Container logs") - container.logs(containerId) + log.info(s"Fetching Container logs") + sender ! container.logs(containerId) case x => log.warning("Received unknown message: [{}] ", x) } diff --git a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandler.scala b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandler.scala index 6a5981a..cde5156 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandler.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandler.scala @@ -1,5 +1,6 @@ package de.upb.cs.swt.delphi.instanceregistry +import akka.NotUsed import akka.actor._ import akka.http.scaladsl.model.StatusCodes import akka.pattern.ask @@ -631,6 +632,32 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection) } } + /** + * + * Returns a source streaming the container logs of the instance with the specified id + * @param id Id of the instance + * @return Tuple of OperationResult and Option[Source[...] ] + */ + def handleGetLogs(id: Long) : (OperationResult.Value, Option[Source[String, NotUsed]]) = { + if(!instanceDao.hasInstance(id)){ + (OperationResult.IdUnknown, None) + } else if(!isInstanceDockerContainer(id)){ + (OperationResult.NoDockerContainer, None) + } else { + val instance = instanceDao.getInstance(id).get + + val f : Future[(OperationResult.Value, Option[Source[String, NotUsed]])]= (dockerActor ? logs(instance.dockerId.get))(Timeout(10 seconds)).map{ + source: Any => + (OperationResult.Ok, Option(source.asInstanceOf[Source[String, NotUsed]])) + }.recover{ + case ex: Exception => + fireDockerOperationErrorEvent(Some(instance), errorMessage = s"Failed to get logs with message: ${ex.getMessage}") + (OperationResult.InternalError, None) + } + Await.result(f, Duration.Inf) + } + } + /** * Tries to match caller to specified component type based on links stored in the dao. If one link is present, it will * be selected regardless of its state. If multiple links are present, the assigned link will be returned. If none of diff --git a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/connection/Server.scala b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/connection/Server.scala index 03e5ee0..133eef3 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/connection/Server.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/connection/Server.scala @@ -1,5 +1,6 @@ package de.upb.cs.swt.delphi.instanceregistry.connection +import akka.NotUsed import akka.actor.ActorSystem import akka.http.scaladsl.model.ws.{Message, TextMessage} import akka.http.scaladsl.model.{HttpResponse, StatusCodes} @@ -56,6 +57,7 @@ object Server extends HttpApp path("stop") { stop()} ~ path("start") { start()} ~ path("delete") { deleteContainer()} ~ + path("logs") { streamLogs()} ~ /****************EVENT OPERATIONS****************/ path("events") { streamEvents()} @@ -530,6 +532,7 @@ object Server extends HttpApp */ def network() : server.Route = { get { + log.debug(s"GET /network has been called.") complete{handler.handleGetNetwork().toJson(InstanceNetworkFormat)} } } @@ -541,6 +544,7 @@ object Server extends HttpApp */ def addLabel() : server.Route = parameters('Id.as[Long], 'Label.as[String]){ (id, label) => post { + log.debug(s"POST /addLabel?Id=$id&Label=$label has been called.") handler.handleAddLabel(id, label) match { case handler.OperationResult.IdUnknown => log.warning(s"Cannot add label $label to $id, id not found.") @@ -584,9 +588,42 @@ object Server extends HttpApp } } } + } + + + def streamLogs() : server.Route = parameters('Id.as[Long]) { id => + log.debug(s"WS-Request to /logs?Id=$id has been called.") + + handler.handleGetLogs(id) match { + case (handler.OperationResult.IdUnknown, _) => + complete{HttpResponse(StatusCodes.NotFound, entity = s"Cannot find instance with id $id.")} + case (handler.OperationResult.NoDockerContainer, _) => + complete{HttpResponse(StatusCodes.BadRequest, entity = s"Instance with id $id is no docker container.")} + case (handler.OperationResult.Ok, sourceOption) => + val source : Source[String, NotUsed] = sourceOption.get + handleWebSocketMessages{ + Flow[Message] + .map{ + case TextMessage.Strict(msg: String) => msg + case _ => println("Ignored non-text message while streaming logs.") + } + .via( + Flow.fromSinkAndSource(Sink.foreach(println), source)) + .map{msg: String => TextMessage.Strict(msg + "\n")} + .watchTermination() { (_, done) => + done.onComplete { + case Success(_) => + log.info("Stream route completed successfully") + case Failure(ex) => + log.error(s"Stream route completed with failure : $ex") + } + } + } + } } + } From 2fe9128e37d30b1c950615a4963d6d39f29dbb91 Mon Sep 17 00:00:00 2001 From: Johannes Duesing Date: Thu, 8 Nov 2018 09:45:35 +0100 Subject: [PATCH 14/14] Added endpoint to retrieve instance by id. Remove logs endpoint. Logs will be provided in a separate branch. --- OpenAPISpecification.yaml | 37 +++++++++++-- .../instanceregistry/connection/Server.scala | 54 +++++++------------ 2 files changed, 52 insertions(+), 39 deletions(-) diff --git a/OpenAPISpecification.yaml b/OpenAPISpecification.yaml index c8534e7..9e28731 100644 --- a/OpenAPISpecification.yaml +++ b/OpenAPISpecification.yaml @@ -116,6 +116,31 @@ paths: $ref: '#/definitions/Instance' '400': description: Invalid status value + /instance: + get: + tags: + - Basic Operations + summary: Get the instance with the specified id + description: >- + This command retrieves the instance with the specified id from the server. + If that id is not present, 404 will be returned. + operationId: instance + parameters: + - in: query + name: Id + description: Id of the instance + required: true + type: integer + format: int64 + responses: + '200': + description: The instance that was requested + schema: + $ref: '#/definitions/Instance' + '404': + description: The id was not found on the server + '500': + description: Internal server error /instances: get: tags: @@ -309,8 +334,8 @@ paths: - Basic Operations summary: Add a label to the instance with the specified id description: >- - This command will add the specified label to the instance with the specified - id. + This command will add the specified label to the instance with the + specified id. operationId: addLabel parameters: - name: Id @@ -328,9 +353,9 @@ paths: '200': description: Label successfully added '400': - description: Bad request, your label exceeded the character limit + description: 'Bad request, your label exceeded the character limit' '404': - description: Not found, the id you specified could not be found + description: 'Not found, the id you specified could not be found' /deploy: post: tags: @@ -739,4 +764,6 @@ definitions: type: array items: type: string - example: ["private", "debug"] + example: + - private + - debug diff --git a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/connection/Server.scala b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/connection/Server.scala index 133eef3..1c322ae 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/connection/Server.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/instanceregistry/connection/Server.scala @@ -39,6 +39,7 @@ object Server extends HttpApp path("register") {entity(as[String]) { jsonString => register(jsonString) }} ~ path("deregister") { deregister() } ~ path("instances") { fetchInstancesOfType() } ~ + path("instance") { retrieveInstance() } ~ path("numberOfInstances") { numberOfInstances() } ~ path("matchingInstance") { matchingInstance()} ~ path("matchingResult") { matchInstance()} ~ @@ -57,7 +58,6 @@ object Server extends HttpApp path("stop") { stop()} ~ path("start") { start()} ~ path("delete") { deleteContainer()} ~ - path("logs") { streamLogs()} ~ /****************EVENT OPERATIONS****************/ path("events") { streamEvents()} @@ -154,6 +154,25 @@ object Server extends HttpApp } } + /** + * Returns an instance with the specified id. Id is passed as query argument named 'Id' (so the resulting call is + * /instance?Id=42) + * @return Server route that either maps to 200 OK and the respective instance as entity, or 404. + */ + def retrieveInstance() : server.Route = parameters('Id.as[Long]) { id => + get { + log.debug(s"GET /instance?Id=$id has been called") + + val instanceOption = handler.getInstance(id) + + if(instanceOption.isDefined){ + complete(instanceOption.get.toJson(instanceFormat)) + } else { + complete{HttpResponse(StatusCodes.NotFound, entity = s"Id $id was not found on the server.")} + } + } + } + /** * Returns an instance of the specified ComponentType that can be used to resolve dependencies. The ComponentType must * be passed as an query argument named 'ComponentType' (so the call is /matchingInstance?ComponentType=Crawler). @@ -591,39 +610,6 @@ object Server extends HttpApp } - def streamLogs() : server.Route = parameters('Id.as[Long]) { id => - log.debug(s"WS-Request to /logs?Id=$id has been called.") - - handler.handleGetLogs(id) match { - case (handler.OperationResult.IdUnknown, _) => - complete{HttpResponse(StatusCodes.NotFound, entity = s"Cannot find instance with id $id.")} - case (handler.OperationResult.NoDockerContainer, _) => - complete{HttpResponse(StatusCodes.BadRequest, entity = s"Instance with id $id is no docker container.")} - case (handler.OperationResult.Ok, sourceOption) => - val source : Source[String, NotUsed] = sourceOption.get - handleWebSocketMessages{ - Flow[Message] - .map{ - case TextMessage.Strict(msg: String) => msg - case _ => println("Ignored non-text message while streaming logs.") - } - .via( - Flow.fromSinkAndSource(Sink.foreach(println), source)) - .map{msg: String => TextMessage.Strict(msg + "\n")} - .watchTermination() { (_, done) => - done.onComplete { - case Success(_) => - log.info("Stream route completed successfully") - case Failure(ex) => - log.error(s"Stream route completed with failure : $ex") - } - } - } - } - - } - - }