diff --git a/src/main/scala/com/comcast/xfinity/sirius/api/RequestHandler.scala b/src/main/scala/com/comcast/xfinity/sirius/api/RequestHandler.scala index eb3e9a6c..b2b09f3d 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/api/RequestHandler.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/api/RequestHandler.scala @@ -30,12 +30,12 @@ package com.comcast.xfinity.sirius.api trait RequestHandler { /** - * Handle a GET request - * - * @param key String identifying the search query - * - * @return a SiriusResult wrapping the result of the query - */ + * Handle a GET request + * + * @param key String identifying the search query + * + * @return a SiriusResult wrapping the result of the query + */ def handleGet(key: String): SiriusResult /** @@ -63,5 +63,4 @@ trait RequestHandler { * In the future the API may be modified to return void. */ def handleDelete(key: String): SiriusResult - } diff --git a/src/main/scala/com/comcast/xfinity/sirius/api/RequestWithMetadataHandler.scala b/src/main/scala/com/comcast/xfinity/sirius/api/RequestWithMetadataHandler.scala new file mode 100644 index 00000000..c2c24e13 --- /dev/null +++ b/src/main/scala/com/comcast/xfinity/sirius/api/RequestWithMetadataHandler.scala @@ -0,0 +1,88 @@ +/* + * Copyright 2012-2019 Comcast Cable Communications Management, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.comcast.xfinity.sirius.api + +/** + * Factory object for wrapping a [[RequestHandler]] into a [[RequestWithMetadataHandler]] + */ +object RequestWithMetadataHandler { + + def apply(requestHandler: RequestHandler): RequestWithMetadataHandler = + new RequestWithMetadataHandler { + override def handleGet(key: String): SiriusResult = + requestHandler.handleGet(key) + override def handlePut(sequence: Long, timestamp: Long, key: String, body: Array[Byte]): SiriusResult = + requestHandler.handlePut(key, body) + override def handleDelete(sequence: Long, timestamp: Long, key: String): SiriusResult = + requestHandler.handleDelete(key) + } +} + +/** + * Interface for a Sirius wrapped data structure for applying + * operations to the in memory dataset. All interactions with + * this object should go through Sirius. Operations should be + * short and to the point and should at all costs not throw + * exceptions. + * + * Access to this object is kept synchronized by Sirius. + * + * Puts and Deletes to the same key should cancel each other out. + * Also successive puts should cancel each other out. + */ +trait RequestWithMetadataHandler { + + /** + * Handle a GET request + * + * @param key String identifying the search query + * + * @return a SiriusResult wrapping the result of the query + */ + def handleGet(key: String): SiriusResult + + /** + * Handle a PUT request + * + * @param sequence unique sequence number indicating the order of + * the event in the cluster + * @param timestamp indicates when the event was created + * @param key unique identifier for the item to which the + * operation is being applied + * @param body data passed in along with this request used + * for modifying the state at key + * + * @return a SiriusResult wrapping the result of the operation. + * This should almost always be SiriusResult.none(). + * In the future the API may be modified to return void. + */ + def handlePut(sequence: Long, timestamp: Long, key: String, body: Array[Byte]): SiriusResult + + /** + * Handle a DELETE request + + * @param sequence unique sequence number indicating the order of + * the event in the cluster + * @param timestamp indicates when the event was created + * @param key unique identifier for the item to which the + * operation is being applied + * + * @return a SiriusResult wrapping the result of the operation. + * This should almost always be SiriusResult.none(). + * In the future the API may be modified to return void. + */ + def handleDelete(sequence: Long, timestamp: Long, key: String): SiriusResult +} diff --git a/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusFactory.scala b/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusFactory.scala index 40deced2..bc9c0ee2 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusFactory.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusFactory.scala @@ -21,8 +21,7 @@ import java.net.InetAddress import java.util.{HashMap => JHashMap} import com.comcast.xfinity.sirius.admin.ObjectNameHelper -import com.comcast.xfinity.sirius.api.RequestHandler -import com.comcast.xfinity.sirius.api.SiriusConfiguration +import com.comcast.xfinity.sirius.api.{RequestHandler, RequestWithMetadataHandler, SiriusConfiguration} import com.comcast.xfinity.sirius.info.SiriusInfo import com.comcast.xfinity.sirius.writeaheadlog.CachedSiriusLog import com.comcast.xfinity.sirius.writeaheadlog.SiriusLog @@ -57,7 +56,21 @@ object SiriusFactory { * * @return A SiriusImpl constructed using the parameters */ - def createInstance(requestHandler: RequestHandler, siriusConfig: SiriusConfiguration): SiriusImpl = { + def createInstance(requestHandler: RequestHandler, siriusConfig: SiriusConfiguration): SiriusImpl = + createInstance(RequestWithMetadataHandler(requestHandler), siriusConfig) + + /** + * SiriusImpl factory method, takes parameters to construct a SiriusImplementation and the dependent + * ActorSystem and return the created instance. Calling shutdown on the produced SiriusImpl will also + * shutdown the dependent ActorSystem. + * + * @param requestHandler the RequestHandler containing callbacks for manipulating the system's state + * @param siriusConfig a SiriusConfiguration containing configuration info needed for this node. + * @see SiriusConfiguration for info on needed config. + * + * @return A SiriusImpl constructed using the parameters + */ + def createInstance(requestHandler: RequestWithMetadataHandler, siriusConfig: SiriusConfiguration): SiriusImpl = { val uberStoreDir = siriusConfig.getProp[String](SiriusConfiguration.LOG_LOCATION) match { case Some(dir) => dir case None => @@ -94,8 +107,8 @@ object SiriusFactory { * * @return A SiriusImpl constructed using the parameters */ - private[sirius] def createInstance(requestHandler: RequestHandler, siriusConfig: SiriusConfiguration, - siriusLog: SiriusLog): SiriusImpl = { + private[sirius] def createInstance(requestHandler: RequestWithMetadataHandler, siriusConfig: SiriusConfiguration, + siriusLog: SiriusLog): SiriusImpl = { val systemName = siriusConfig.getProp(SiriusConfiguration.AKKA_SYSTEM_NAME, "sirius-system") diff --git a/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusImpl.scala b/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusImpl.scala index 416c93bc..76ee7105 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusImpl.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusImpl.scala @@ -16,20 +16,21 @@ package com.comcast.xfinity.sirius.api.impl import compat.AkkaFutureAdapter -import com.comcast.xfinity.sirius.api.RequestHandler -import com.comcast.xfinity.sirius.api.Sirius +import com.comcast.xfinity.sirius.api.{RequestHandler, RequestWithMetadataHandler, Sirius, SiriusConfiguration, SiriusResult} import akka.pattern.ask import membership.MembershipActor._ -import com.comcast.xfinity.sirius.api.SiriusResult import akka.actor._ import java.util.concurrent.Future + import com.comcast.xfinity.sirius.writeaheadlog.SiriusLog -import com.comcast.xfinity.sirius.api.SiriusConfiguration + import scala.concurrent.{Await, Future => AkkaFuture} import akka.util.Timeout + import scala.concurrent.duration._ import status.NodeStats.FullNodeStatus import status.StatusWorker._ + import scala.util.Try object SiriusImpl { @@ -42,7 +43,7 @@ object SiriusImpl { * @param config SiriusConfiguration object full of all kinds of configuration goodies, see SiriusConfiguration * for more information */ - def apply(requestHandler: RequestHandler, + def apply(requestHandler: RequestWithMetadataHandler, siriusLog: SiriusLog, config: SiriusConfiguration) (implicit actorSystem: ActorSystem): SiriusImpl = { diff --git a/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusSupervisor.scala b/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusSupervisor.scala index bb892567..153b8b18 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusSupervisor.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusSupervisor.scala @@ -16,25 +16,27 @@ package com.comcast.xfinity.sirius.api.impl import bridge.PaxosStateBridge -import com.comcast.xfinity.sirius.api.impl.membership.{MembershipHelper, MembershipActor} +import com.comcast.xfinity.sirius.api.impl.membership.{MembershipActor, MembershipHelper} import paxos.PaxosMessages.PaxosMessage import akka.actor._ import akka.agent.Agent import com.comcast.xfinity.sirius.api.impl.paxos.Replica + import scala.concurrent.duration._ import paxos.PaxosSupervisor import state.SiriusPersistenceActor.LogQuery import state.StateSup import com.comcast.xfinity.sirius.writeaheadlog.SiriusLog import akka.event.Logging -import com.comcast.xfinity.sirius.api.{SiriusConfiguration, RequestHandler} +import com.comcast.xfinity.sirius.api.{RequestHandler, RequestWithMetadataHandler, SiriusConfiguration} import status.StatusWorker import com.comcast.xfinity.sirius.util.AkkaExternalAddressResolver import status.StatusWorker.StatusQuery import com.comcast.xfinity.sirius.uberstore.CompactionManager import com.comcast.xfinity.sirius.uberstore.CompactionManager.CompactionMessage -import com.comcast.xfinity.sirius.api.impl.SiriusSupervisor.{ChildProvider, CheckPaxosMembership} +import com.comcast.xfinity.sirius.api.impl.SiriusSupervisor.{CheckPaxosMembership, ChildProvider} import com.comcast.xfinity.sirius.api.impl.membership.MembershipActor.MembershipMessage + import scala.language.postfixOps object SiriusSupervisor { @@ -52,7 +54,7 @@ object SiriusSupervisor { * @param siriusLog Interface into the Sirius persistent log. * @param config the SiriusConfiguration for this node */ - protected[impl] class ChildProvider(requestHandler: RequestHandler, siriusLog: SiriusLog, config: SiriusConfiguration){ + protected[impl] class ChildProvider(requestHandler: RequestWithMetadataHandler, siriusLog: SiriusLog, config: SiriusConfiguration){ val akkaExternalAddressResolver = config.getProp[AkkaExternalAddressResolver](SiriusConfiguration.AKKA_EXTERNAL_ADDRESS_RESOLVER). getOrElse(throw new IllegalStateException("SiriusConfiguration.AKKA_EXTERNAL_ADDRESS_RESOLVER returned nothing")) @@ -98,7 +100,7 @@ object SiriusSupervisor { * @return Props for creating this actor, which can then be further configured * (e.g. calling `.withDispatcher()` on it) */ - def props(requestHandler: RequestHandler, + def props(requestHandler: RequestWithMetadataHandler, siriusLog: SiriusLog, config: SiriusConfiguration): Props = { Props(classOf[SiriusSupervisor], new ChildProvider(requestHandler, siriusLog, config), config) diff --git a/src/main/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActor.scala b/src/main/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActor.scala index ba27caaa..181ac616 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActor.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActor.scala @@ -121,7 +121,7 @@ class SiriusPersistenceActor(stateActor: ActorRef, case event: OrderedEvent => val now = System.currentTimeMillis() siriusLog.writeEntry(event) - stateActor ! event.request + stateActor ! event val thisWriteTime = System.currentTimeMillis() - now numWrites += 1 diff --git a/src/main/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusStateActor.scala b/src/main/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusStateActor.scala index b8d43054..cddd8abb 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusStateActor.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusStateActor.scala @@ -15,11 +15,10 @@ */ package com.comcast.xfinity.sirius.api.impl.state -import akka.actor.{Props, Actor} -import com.comcast.xfinity.sirius.api.impl._ -import com.comcast.xfinity.sirius.api.{SiriusResult, RequestHandler} +import akka.actor.{Actor, Props} import akka.event.Logging -import com.comcast.xfinity.sirius.api.impl.SiriusRequest +import com.comcast.xfinity.sirius.api.impl.{SiriusRequest, _} +import com.comcast.xfinity.sirius.api.{RequestWithMetadataHandler, SiriusResult} object SiriusStateActor { @@ -30,7 +29,7 @@ object SiriusStateActor { * @return Props for creating this actor, which can then be further configured * (e.g. calling `.withDispatcher()` on it) */ - def props(requestHandler: RequestHandler): Props = { + def props(requestHandler: RequestWithMetadataHandler): Props = { Props(classOf[SiriusStateActor], requestHandler) } } @@ -43,24 +42,26 @@ object SiriusStateActor { * * @param requestHandler the request handler containing callbacks for manipulating state */ -class SiriusStateActor(requestHandler: RequestHandler) extends Actor { +class SiriusStateActor(requestHandler: RequestWithMetadataHandler) extends Actor { val logger = Logging(context.system, "Sirius") def receive = { + case event: OrderedEvent => + sender ! processSiriusRequestSafely(event.sequence, event.timestamp, event.request) case req: SiriusRequest => // XXX: With the way things work now, we probably shouldn't // be responding to Puts and Deletes... These are // responded to when an order has been decided on - sender ! processSiriusRequestSafely(req) + sender ! processSiriusRequestSafely(0L, 0L, req) } - private def processSiriusRequestSafely(req: SiriusRequest): SiriusResult = { + private def processSiriusRequestSafely(sequence: Long, timestamp: Long, req: SiriusRequest): SiriusResult = { try { req match { case Get(key) => requestHandler.handleGet(key) - case Delete(key) => requestHandler.handleDelete(key) - case Put(key, body) => requestHandler.handlePut(key, body) + case Delete(key) => requestHandler.handleDelete(sequence, timestamp, key) + case Put(key, body) => requestHandler.handlePut(sequence, timestamp, key, body) } } catch { case e: RuntimeException => diff --git a/src/main/scala/com/comcast/xfinity/sirius/api/impl/state/StateSup.scala b/src/main/scala/com/comcast/xfinity/sirius/api/impl/state/StateSup.scala index 43b2d6c5..3548e519 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/api/impl/state/StateSup.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/api/impl/state/StateSup.scala @@ -15,7 +15,7 @@ */ package com.comcast.xfinity.sirius.api.impl.state -import com.comcast.xfinity.sirius.api.{SiriusConfiguration, RequestHandler} +import com.comcast.xfinity.sirius.api.{RequestWithMetadataHandler, SiriusConfiguration} import akka.agent.Agent import com.comcast.xfinity.sirius.writeaheadlog.SiriusLog import com.comcast.xfinity.sirius.api.impl._ @@ -33,7 +33,7 @@ object StateSup { * @param siriusLog the SiriusLog to persist OrderedEvents to * @param config SiriusConfiguration object for configuring children actors. */ - private[state] class ChildProvider(requestHandler: RequestHandler, siriusLog: SiriusLog, config: SiriusConfiguration) { + private[state] class ChildProvider(requestHandler: RequestWithMetadataHandler, siriusLog: SiriusLog, config: SiriusConfiguration) { def createStateActor()(implicit context: ActorContext): ActorRef = context.actorOf(SiriusStateActor.props(requestHandler), "state") @@ -52,7 +52,7 @@ object StateSup { * @return Props for creating this actor, which can then be further configured * (e.g. calling `.withDispatcher()` on it) */ - def props(requestHandler: RequestHandler, + def props(requestHandler: RequestWithMetadataHandler, siriusLog: SiriusLog, siriusStateAgent: Agent[SiriusState], config: SiriusConfiguration): Props = { @@ -71,7 +71,7 @@ object StateSup { * @param config SiriusCOnfiguration object for configuring children actors. */ // TODO rename this StateSupervisor -class StateSup(requestHandler: RequestHandler, +class StateSup(requestHandler: RequestWithMetadataHandler, siriusLog: SiriusLog, siriusStateAgent: Agent[SiriusState], childProvider: StateSup.ChildProvider, @@ -120,8 +120,8 @@ class StateSup(requestHandler: RequestHandler, (_, orderedEvent) => try { orderedEvent.request match { - case Put(key, body) => requestHandler.handlePut(key, body) - case Delete(key) => requestHandler.handleDelete(key) + case Put(key, body) => requestHandler.handlePut(orderedEvent.sequence, orderedEvent.timestamp, key, body) + case Delete(key) => requestHandler.handleDelete(orderedEvent.sequence, orderedEvent.timestamp, key) } } catch { case rte: RuntimeException => diff --git a/src/test/scala/com/comcast/xfinity/sirius/LatchedRequestHandler.scala b/src/test/scala/com/comcast/xfinity/sirius/LatchedRequestHandler.scala index 0812bb4f..f4c46458 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/LatchedRequestHandler.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/LatchedRequestHandler.scala @@ -16,24 +16,24 @@ package com.comcast.xfinity.sirius -import api.{SiriusResult, RequestHandler} -import java.util.concurrent.{TimeUnit, CountDownLatch} +import api.{RequestWithMetadataHandler, SiriusResult} +import java.util.concurrent.{CountDownLatch, TimeUnit} /** * Request handler for testing purposes that includes a countdown * latch so that we can monitor it for completion of events */ -class LatchedRequestHandler(expectedTicks: Int) extends RequestHandler { +class LatchedRequestHandler(expectedTicks: Int) extends RequestWithMetadataHandler { var latch = new CountDownLatch(expectedTicks) def handleGet(key: String): SiriusResult = SiriusResult.none() - def handlePut(key: String, body: Array[Byte]): SiriusResult = { + def handlePut(sequence: Long, timestamp: Long, key: String, body: Array[Byte]): SiriusResult = { latch.countDown() SiriusResult.none() } - def handleDelete(key: String): SiriusResult = { + def handleDelete(sequence: Long, timestamp: Long, key: String): SiriusResult = { latch.countDown() SiriusResult.none() } diff --git a/src/test/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActorTest.scala b/src/test/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActorTest.scala index 392b9a3e..71412003 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActorTest.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusPersistenceActorTest.scala @@ -80,26 +80,26 @@ class SiriusPersistenceActorTest extends NiceTest { senderProbe.expectMsg(500L) } - it ("should forward Put's to the state actor") { + it ("should forward Put events to the state actor") { when(mockSiriusLog.getNextSeq).thenReturn(0) val put = Put("key", "body".getBytes) val event = OrderedEvent(0L, 0L, put) underTestActor ! event - testStateWorkerProbe.expectMsg(put) + testStateWorkerProbe.expectMsg(event) verify(mockSiriusLog, times(1)).writeEntry(event) } - it ("should forward Delete's to the state actor") { + it ("should forward Delete events to the state actor") { when(mockSiriusLog.getNextSeq).thenReturn(0) val delete = Delete("key") val event = OrderedEvent(0L, 0L, delete) underTestActor ! event - testStateWorkerProbe.expectMsg(delete) + testStateWorkerProbe.expectMsg(event) verify(mockSiriusLog, times(1)).writeEntry(event) } diff --git a/src/test/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusStateActorTest.scala b/src/test/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusStateActorTest.scala index 149a9361..82a7bff2 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusStateActorTest.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/api/impl/state/SiriusStateActorTest.scala @@ -18,22 +18,23 @@ package com.comcast.xfinity.sirius.api.impl.state import org.mockito.Matchers import org.mockito.Mockito._ import akka.actor.ActorSystem -import com.comcast.xfinity.sirius.api.impl.{SiriusState, Delete, Get, Put} +import com.comcast.xfinity.sirius.api.impl.{Delete, Get, OrderedEvent, Put} import com.comcast.xfinity.sirius.NiceTest -import akka.agent.Agent -import com.comcast.xfinity.sirius.api.{SiriusResult, RequestHandler} -import akka.testkit.{TestProbe, TestActorRef} +import com.comcast.xfinity.sirius.api.{RequestWithMetadataHandler, SiriusResult} +import akka.testkit.{TestActorRef, TestProbe} import org.scalatest.BeforeAndAfterAll +import scala.util.Random + class SiriusStateActorTest extends NiceTest with BeforeAndAfterAll { implicit val actorSystem = ActorSystem("SiriusStateActorTest") - var mockRequestHandler: RequestHandler = _ + var mockRequestHandler: RequestWithMetadataHandler = _ var testActor: TestActorRef[SiriusStateActor] = _ before { - mockRequestHandler = mock[RequestHandler] + mockRequestHandler = mock[RequestWithMetadataHandler] testActor = TestActorRef(new SiriusStateActor(mockRequestHandler)) } @@ -48,15 +49,17 @@ class SiriusStateActorTest extends NiceTest with BeforeAndAfterAll { it ("must forward the message to handler and reply with the result " + "when it succeeds") { val senderProbe = TestProbe() + val seq = Random.nextLong() + val ts = Random.nextLong() val key = "key" val body = "value".getBytes // the following is a little delicate, Array[Byte] are Java jawns, so // we must use the same array for comparing doReturn(SiriusResult.some("It's alive!")).when(mockRequestHandler). - handlePut(Matchers.eq(key), Matchers.same(body)) + handlePut(Matchers.eq(seq), Matchers.eq(ts), Matchers.eq(key), Matchers.same(body)) - senderProbe.send(testActor, Put(key, body)) + senderProbe.send(testActor, OrderedEvent(seq, ts, Put(key, body))) senderProbe.expectMsg(SiriusResult.some("It's alive!")) } @@ -66,15 +69,17 @@ class SiriusStateActorTest extends NiceTest with BeforeAndAfterAll { val terminationProbe = TestProbe() terminationProbe.watch(testActor) val senderProbe = TestProbe() + val seq = Random.nextLong() + val ts = Random.nextLong() val key = "key" val body = "value".getBytes val theException = new RuntimeException("well this sucks") doThrow(theException).when(mockRequestHandler). - handlePut(Matchers.eq(key), Matchers.same(body)) + handlePut(Matchers.eq(seq), Matchers.eq(ts), Matchers.eq(key), Matchers.same(body)) - senderProbe.send(testActor, Put(key, body)) + senderProbe.send(testActor, OrderedEvent(seq, ts, Put(key, body))) senderProbe.expectMsg(SiriusResult.error(theException)) terminationProbe.expectNoMsg() @@ -118,12 +123,14 @@ class SiriusStateActorTest extends NiceTest with BeforeAndAfterAll { it ("should forward the message to handler and reply with the result when " + "it succeeds") { val senderProbe = TestProbe() + val seq = Random.nextLong() + val ts = Random.nextLong() val key = "key" doReturn(SiriusResult.some("I ate too much")).when(mockRequestHandler). - handleDelete(Matchers.eq(key)) + handleDelete(Matchers.eq(seq), Matchers.eq(ts), Matchers.eq(key)) - senderProbe.send(testActor, Delete(key)) + senderProbe.send(testActor, OrderedEvent(seq, ts, Delete(key))) senderProbe.expectMsg(SiriusResult.some("I ate too much")) } @@ -131,6 +138,8 @@ class SiriusStateActorTest extends NiceTest with BeforeAndAfterAll { it ("must forward the message to the handler and reply with a result " + "wrapping the exception when such occurs") { val senderProbe = TestProbe() + val seq = Random.nextLong() + val ts = Random.nextLong() val key = "key" val terminationProbe = TestProbe() terminationProbe.watch(testActor) @@ -138,9 +147,9 @@ class SiriusStateActorTest extends NiceTest with BeforeAndAfterAll { val theException = new RuntimeException("well this sucks") doThrow(theException).when(mockRequestHandler). - handleDelete(Matchers.eq(key)) + handleDelete(Matchers.eq(seq), Matchers.eq(ts), Matchers.eq(key)) - senderProbe.send(testActor, Delete(key)) + senderProbe.send(testActor, OrderedEvent(seq, ts, Delete(key))) senderProbe.expectMsg(SiriusResult.error(theException)) terminationProbe.expectNoMsg() diff --git a/src/test/scala/com/comcast/xfinity/sirius/api/impl/state/StateSupTest.scala b/src/test/scala/com/comcast/xfinity/sirius/api/impl/state/StateSupTest.scala index 87e53f5c..60c0ac2d 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/api/impl/state/StateSupTest.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/api/impl/state/StateSupTest.scala @@ -22,7 +22,7 @@ import com.comcast.xfinity.sirius.api.impl.state.SiriusPersistenceActor._ import com.comcast.xfinity.sirius.writeaheadlog.SiriusLog import com.comcast.xfinity.sirius.api.impl.{SiriusState, OrderedEvent, Delete, Get} import akka.agent.Agent -import com.comcast.xfinity.sirius.api.{SiriusConfiguration, RequestHandler} +import com.comcast.xfinity.sirius.api.{RequestWithMetadataHandler, SiriusConfiguration} import akka.actor.{ActorContext, ActorRef, ActorSystem} object StateSupTest { @@ -49,7 +49,7 @@ class StateSupTest extends NiceTest with BeforeAndAfterAll { describe("when receiving a Get") { it ("must forward the message to the in memory state subsystem") { - val mockRequestHandler = mock[RequestHandler] + val mockRequestHandler = mock[RequestWithMetadataHandler] val mockLog = mock[SiriusLog] val mockStateAgent = mock[Agent[SiriusState]] @@ -66,7 +66,7 @@ class StateSupTest extends NiceTest with BeforeAndAfterAll { describe("when receiving an OrderedEvent") { it ("must send the OrderedEvent to the persistence subsystem") { - val mockRequestHandler = mock[RequestHandler] + val mockRequestHandler = mock[RequestWithMetadataHandler] val mockLog = mock[SiriusLog] val mockStateAgent = mock[Agent[SiriusState]] @@ -83,7 +83,7 @@ class StateSupTest extends NiceTest with BeforeAndAfterAll { describe("when receiving a LogQuery message") { it ("must forward the message to the persistence subsystem") { - val mockRequestHandler = mock[RequestHandler] + val mockRequestHandler = mock[RequestWithMetadataHandler] val mockLog = mock[SiriusLog] val mockStateAgent = mock[Agent[SiriusState]] diff --git a/src/test/scala/com/comcast/xfinity/sirius/itest/FullSystemITest.scala b/src/test/scala/com/comcast/xfinity/sirius/itest/FullSystemITest.scala index bb956037..4f21775d 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/itest/FullSystemITest.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/itest/FullSystemITest.scala @@ -34,7 +34,7 @@ import com.comcast.xfinity.sirius.uberstore.UberStore import com.comcast.xfinity.sirius.api.impl.SiriusSupervisor.CheckPaxosMembership import annotation.tailrec -import com.comcast.xfinity.sirius.api.{RequestHandler, SiriusConfiguration, SiriusResult} +import com.comcast.xfinity.sirius.api.{RequestHandler, RequestWithMetadataHandler, SiriusConfiguration, SiriusResult} import com.comcast.xfinity.sirius.api.impl.membership.MembershipActor.CheckClusterConfig import org.slf4j.LoggerFactory import com.comcast.xfinity.sirius.uberstore.segmented.SegmentedUberStore @@ -89,7 +89,7 @@ class FullSystemITest extends NiceTest with TimedTest { // logs def makeSirius(port: Int, latchTicks: Int = 3, - handler: Option[RequestHandler] = None, + handler: Option[RequestWithMetadataHandler] = None, wal: Option[SiriusLog] = None, chunkSize: Int = 100, gapRequestFreqSecs: Int = 5, @@ -97,7 +97,7 @@ class FullSystemITest extends NiceTest with TimedTest { sslEnabled: Boolean = false, maxWindowSize: Int = 1000, membershipPath: String = new JFile(tempDir, "membership").getAbsolutePath): - (SiriusImpl, RequestHandler, SiriusLog) = { + (SiriusImpl, RequestWithMetadataHandler, SiriusLog) = { val finalHandler = handler match { case Some(requestHandler) => requestHandler diff --git a/src/test/scala/com/comcast/xfinity/sirius/itest/StringRequestHandler.scala b/src/test/scala/com/comcast/xfinity/sirius/itest/StringRequestHandler.scala index f06e9cd5..a6f0d565 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/itest/StringRequestHandler.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/itest/StringRequestHandler.scala @@ -16,11 +16,11 @@ package com.comcast.xfinity.sirius.itest -import com.comcast.xfinity.sirius.api.RequestHandler +import com.comcast.xfinity.sirius.api.{RequestHandler, RequestWithMetadataHandler, SiriusResult} + import collection.mutable.HashMap -import com.comcast.xfinity.sirius.api.SiriusResult -class StringRequestHandler extends RequestHandler { +class StringRequestHandler extends RequestWithMetadataHandler { var cmdsHandledCnt = 0; val map: HashMap[String, Array[Byte]] = new HashMap[String, Array[Byte]]() @@ -40,7 +40,7 @@ class StringRequestHandler extends RequestHandler { /** * Handle a PUT request */ - def handlePut(key: String, body: Array[Byte]): SiriusResult = { + def handlePut(sequence: Long, timestamp: Long, key: String, body: Array[Byte]): SiriusResult = { cmdsHandledCnt += 1; map.put(key, body) match { case Some(v) => SiriusResult.some(v) @@ -51,7 +51,7 @@ class StringRequestHandler extends RequestHandler { /** * Handle a DELETE request */ - def handleDelete(key: String): SiriusResult = { + def handleDelete(sequence: Long, timestamp: Long, key: String): SiriusResult = { cmdsHandledCnt += 1; map.remove(key) match { case Some(v) => SiriusResult.some(v)