Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sequence and timestamp to request handler #139

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ package com.comcast.xfinity.sirius.api
trait RequestHandler {

/**
* Handle a GET request
*
* @param key String identifying the search query
*
* @return a SiriusResult wrapping the result of the query
*/
* Handle a GET request
*
* @param key String identifying the search query
*
* @return a SiriusResult wrapping the result of the query
*/
def handleGet(key: String): SiriusResult

/**
Expand Down Expand Up @@ -63,5 +63,4 @@ trait RequestHandler {
* In the future the API may be modified to return void.
*/
def handleDelete(key: String): SiriusResult

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2012-2019 Comcast Cable Communications Management, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.comcast.xfinity.sirius.api

/**
* Factory object for wrapping a [[RequestHandler]] into a [[RequestWithMetadataHandler]]
*/
object RequestWithMetadataHandler {

def apply(requestHandler: RequestHandler): RequestWithMetadataHandler =
new RequestWithMetadataHandler {
override def handleGet(key: String): SiriusResult =
requestHandler.handleGet(key)
override def handlePut(sequence: Long, timestamp: Long, key: String, body: Array[Byte]): SiriusResult =
requestHandler.handlePut(key, body)
override def handleDelete(sequence: Long, timestamp: Long, key: String): SiriusResult =
requestHandler.handleDelete(key)
}
}

/**
* Interface for a Sirius wrapped data structure for applying
* operations to the in memory dataset. All interactions with
* this object should go through Sirius. Operations should be
* short and to the point and should at all costs not throw
* exceptions.
*
* Access to this object is kept synchronized by Sirius.
*
* Puts and Deletes to the same key should cancel each other out.
* Also successive puts should cancel each other out.
*/
trait RequestWithMetadataHandler {

/**
* Handle a GET request
*
* @param key String identifying the search query
*
* @return a SiriusResult wrapping the result of the query
*/
def handleGet(key: String): SiriusResult

/**
* Handle a PUT request
*
* @param sequence unique sequence number indicating the order of
* the event in the cluster
* @param timestamp indicates when the event was created
* @param key unique identifier for the item to which the
* operation is being applied
* @param body data passed in along with this request used
* for modifying the state at key
*
* @return a SiriusResult wrapping the result of the operation.
* This should almost always be SiriusResult.none().
* In the future the API may be modified to return void.
*/
def handlePut(sequence: Long, timestamp: Long, key: String, body: Array[Byte]): SiriusResult

/**
* Handle a DELETE request

* @param sequence unique sequence number indicating the order of
* the event in the cluster
* @param timestamp indicates when the event was created
* @param key unique identifier for the item to which the
* operation is being applied
*
* @return a SiriusResult wrapping the result of the operation.
* This should almost always be SiriusResult.none().
* In the future the API may be modified to return void.
*/
def handleDelete(sequence: Long, timestamp: Long, key: String): SiriusResult
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import java.net.InetAddress
import java.util.{HashMap => JHashMap}

import com.comcast.xfinity.sirius.admin.ObjectNameHelper
import com.comcast.xfinity.sirius.api.RequestHandler
import com.comcast.xfinity.sirius.api.SiriusConfiguration
import com.comcast.xfinity.sirius.api.{RequestHandler, RequestWithMetadataHandler, SiriusConfiguration}
import com.comcast.xfinity.sirius.info.SiriusInfo
import com.comcast.xfinity.sirius.writeaheadlog.CachedSiriusLog
import com.comcast.xfinity.sirius.writeaheadlog.SiriusLog
Expand Down Expand Up @@ -57,7 +56,21 @@ object SiriusFactory {
*
* @return A SiriusImpl constructed using the parameters
*/
def createInstance(requestHandler: RequestHandler, siriusConfig: SiriusConfiguration): SiriusImpl = {
def createInstance(requestHandler: RequestHandler, siriusConfig: SiriusConfiguration): SiriusImpl =
createInstance(RequestWithMetadataHandler(requestHandler), siriusConfig)

/**
* SiriusImpl factory method, takes parameters to construct a SiriusImplementation and the dependent
* ActorSystem and return the created instance. Calling shutdown on the produced SiriusImpl will also
* shutdown the dependent ActorSystem.
*
* @param requestHandler the RequestHandler containing callbacks for manipulating the system's state
* @param siriusConfig a SiriusConfiguration containing configuration info needed for this node.
* @see SiriusConfiguration for info on needed config.
*
* @return A SiriusImpl constructed using the parameters
*/
def createInstance(requestHandler: RequestWithMetadataHandler, siriusConfig: SiriusConfiguration): SiriusImpl = {
val uberStoreDir = siriusConfig.getProp[String](SiriusConfiguration.LOG_LOCATION) match {
case Some(dir) => dir
case None =>
Expand Down Expand Up @@ -94,8 +107,8 @@ object SiriusFactory {
*
* @return A SiriusImpl constructed using the parameters
*/
private[sirius] def createInstance(requestHandler: RequestHandler, siriusConfig: SiriusConfiguration,
siriusLog: SiriusLog): SiriusImpl = {
private[sirius] def createInstance(requestHandler: RequestWithMetadataHandler, siriusConfig: SiriusConfiguration,
siriusLog: SiriusLog): SiriusImpl = {

val systemName = siriusConfig.getProp(SiriusConfiguration.AKKA_SYSTEM_NAME, "sirius-system")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,21 @@
package com.comcast.xfinity.sirius.api.impl

import compat.AkkaFutureAdapter
import com.comcast.xfinity.sirius.api.RequestHandler
import com.comcast.xfinity.sirius.api.Sirius
import com.comcast.xfinity.sirius.api.{RequestHandler, RequestWithMetadataHandler, Sirius, SiriusConfiguration, SiriusResult}
import akka.pattern.ask
import membership.MembershipActor._
import com.comcast.xfinity.sirius.api.SiriusResult
import akka.actor._
import java.util.concurrent.Future

import com.comcast.xfinity.sirius.writeaheadlog.SiriusLog
import com.comcast.xfinity.sirius.api.SiriusConfiguration

import scala.concurrent.{Await, Future => AkkaFuture}
import akka.util.Timeout

import scala.concurrent.duration._
import status.NodeStats.FullNodeStatus
import status.StatusWorker._

import scala.util.Try

object SiriusImpl {
Expand All @@ -42,7 +43,7 @@ object SiriusImpl {
* @param config SiriusConfiguration object full of all kinds of configuration goodies, see SiriusConfiguration
* for more information
*/
def apply(requestHandler: RequestHandler,
def apply(requestHandler: RequestWithMetadataHandler,
siriusLog: SiriusLog,
config: SiriusConfiguration)
(implicit actorSystem: ActorSystem): SiriusImpl = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,27 @@
package com.comcast.xfinity.sirius.api.impl

import bridge.PaxosStateBridge
import com.comcast.xfinity.sirius.api.impl.membership.{MembershipHelper, MembershipActor}
import com.comcast.xfinity.sirius.api.impl.membership.{MembershipActor, MembershipHelper}
import paxos.PaxosMessages.PaxosMessage
import akka.actor._
import akka.agent.Agent
import com.comcast.xfinity.sirius.api.impl.paxos.Replica

import scala.concurrent.duration._
import paxos.PaxosSupervisor
import state.SiriusPersistenceActor.LogQuery
import state.StateSup
import com.comcast.xfinity.sirius.writeaheadlog.SiriusLog
import akka.event.Logging
import com.comcast.xfinity.sirius.api.{SiriusConfiguration, RequestHandler}
import com.comcast.xfinity.sirius.api.{RequestHandler, RequestWithMetadataHandler, SiriusConfiguration}
import status.StatusWorker
import com.comcast.xfinity.sirius.util.AkkaExternalAddressResolver
import status.StatusWorker.StatusQuery
import com.comcast.xfinity.sirius.uberstore.CompactionManager
import com.comcast.xfinity.sirius.uberstore.CompactionManager.CompactionMessage
import com.comcast.xfinity.sirius.api.impl.SiriusSupervisor.{ChildProvider, CheckPaxosMembership}
import com.comcast.xfinity.sirius.api.impl.SiriusSupervisor.{CheckPaxosMembership, ChildProvider}
import com.comcast.xfinity.sirius.api.impl.membership.MembershipActor.MembershipMessage

import scala.language.postfixOps

object SiriusSupervisor {
Expand All @@ -52,7 +54,7 @@ object SiriusSupervisor {
* @param siriusLog Interface into the Sirius persistent log.
* @param config the SiriusConfiguration for this node
*/
protected[impl] class ChildProvider(requestHandler: RequestHandler, siriusLog: SiriusLog, config: SiriusConfiguration){
protected[impl] class ChildProvider(requestHandler: RequestWithMetadataHandler, siriusLog: SiriusLog, config: SiriusConfiguration){

val akkaExternalAddressResolver = config.getProp[AkkaExternalAddressResolver](SiriusConfiguration.AKKA_EXTERNAL_ADDRESS_RESOLVER).
getOrElse(throw new IllegalStateException("SiriusConfiguration.AKKA_EXTERNAL_ADDRESS_RESOLVER returned nothing"))
Expand Down Expand Up @@ -98,7 +100,7 @@ object SiriusSupervisor {
* @return Props for creating this actor, which can then be further configured
* (e.g. calling `.withDispatcher()` on it)
*/
def props(requestHandler: RequestHandler,
def props(requestHandler: RequestWithMetadataHandler,
siriusLog: SiriusLog,
config: SiriusConfiguration): Props = {
Props(classOf[SiriusSupervisor], new ChildProvider(requestHandler, siriusLog, config), config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class SiriusPersistenceActor(stateActor: ActorRef,
case event: OrderedEvent =>
val now = System.currentTimeMillis()
siriusLog.writeEntry(event)
stateActor ! event.request
stateActor ! event

val thisWriteTime = System.currentTimeMillis() - now
numWrites += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
*/
package com.comcast.xfinity.sirius.api.impl.state

import akka.actor.{Props, Actor}
import com.comcast.xfinity.sirius.api.impl._
import com.comcast.xfinity.sirius.api.{SiriusResult, RequestHandler}
import akka.actor.{Actor, Props}
import akka.event.Logging
import com.comcast.xfinity.sirius.api.impl.SiriusRequest
import com.comcast.xfinity.sirius.api.impl.{SiriusRequest, _}
import com.comcast.xfinity.sirius.api.{RequestWithMetadataHandler, SiriusResult}

object SiriusStateActor {

Expand All @@ -30,7 +29,7 @@ object SiriusStateActor {
* @return Props for creating this actor, which can then be further configured
* (e.g. calling `.withDispatcher()` on it)
*/
def props(requestHandler: RequestHandler): Props = {
def props(requestHandler: RequestWithMetadataHandler): Props = {
Props(classOf[SiriusStateActor], requestHandler)
}
}
Expand All @@ -43,24 +42,26 @@ object SiriusStateActor {
*
* @param requestHandler the request handler containing callbacks for manipulating state
*/
class SiriusStateActor(requestHandler: RequestHandler) extends Actor {
class SiriusStateActor(requestHandler: RequestWithMetadataHandler) extends Actor {

val logger = Logging(context.system, "Sirius")

def receive = {
case event: OrderedEvent =>
sender ! processSiriusRequestSafely(event.sequence, event.timestamp, event.request)
case req: SiriusRequest =>
// XXX: With the way things work now, we probably shouldn't
// be responding to Puts and Deletes... These are
// responded to when an order has been decided on
sender ! processSiriusRequestSafely(req)
sender ! processSiriusRequestSafely(0L, 0L, req)
}

private def processSiriusRequestSafely(req: SiriusRequest): SiriusResult = {
private def processSiriusRequestSafely(sequence: Long, timestamp: Long, req: SiriusRequest): SiriusResult = {
try {
req match {
case Get(key) => requestHandler.handleGet(key)
case Delete(key) => requestHandler.handleDelete(key)
case Put(key, body) => requestHandler.handlePut(key, body)
case Delete(key) => requestHandler.handleDelete(sequence, timestamp, key)
case Put(key, body) => requestHandler.handlePut(sequence, timestamp, key, body)
}
} catch {
case e: RuntimeException =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package com.comcast.xfinity.sirius.api.impl.state

import com.comcast.xfinity.sirius.api.{SiriusConfiguration, RequestHandler}
import com.comcast.xfinity.sirius.api.{RequestWithMetadataHandler, SiriusConfiguration}
import akka.agent.Agent
import com.comcast.xfinity.sirius.writeaheadlog.SiriusLog
import com.comcast.xfinity.sirius.api.impl._
Expand All @@ -33,7 +33,7 @@ object StateSup {
* @param siriusLog the SiriusLog to persist OrderedEvents to
* @param config SiriusConfiguration object for configuring children actors.
*/
private[state] class ChildProvider(requestHandler: RequestHandler, siriusLog: SiriusLog, config: SiriusConfiguration) {
private[state] class ChildProvider(requestHandler: RequestWithMetadataHandler, siriusLog: SiriusLog, config: SiriusConfiguration) {
def createStateActor()(implicit context: ActorContext): ActorRef =
context.actorOf(SiriusStateActor.props(requestHandler), "state")

Expand All @@ -52,7 +52,7 @@ object StateSup {
* @return Props for creating this actor, which can then be further configured
* (e.g. calling `.withDispatcher()` on it)
*/
def props(requestHandler: RequestHandler,
def props(requestHandler: RequestWithMetadataHandler,
siriusLog: SiriusLog,
siriusStateAgent: Agent[SiriusState],
config: SiriusConfiguration): Props = {
Expand All @@ -71,7 +71,7 @@ object StateSup {
* @param config SiriusCOnfiguration object for configuring children actors.
*/
// TODO rename this StateSupervisor
class StateSup(requestHandler: RequestHandler,
class StateSup(requestHandler: RequestWithMetadataHandler,
siriusLog: SiriusLog,
siriusStateAgent: Agent[SiriusState],
childProvider: StateSup.ChildProvider,
Expand Down Expand Up @@ -120,8 +120,8 @@ class StateSup(requestHandler: RequestHandler,
(_, orderedEvent) =>
try {
orderedEvent.request match {
case Put(key, body) => requestHandler.handlePut(key, body)
case Delete(key) => requestHandler.handleDelete(key)
case Put(key, body) => requestHandler.handlePut(orderedEvent.sequence, orderedEvent.timestamp, key, body)
case Delete(key) => requestHandler.handleDelete(orderedEvent.sequence, orderedEvent.timestamp, key)
}
} catch {
case rte: RuntimeException =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,24 @@

package com.comcast.xfinity.sirius

import api.{SiriusResult, RequestHandler}
import java.util.concurrent.{TimeUnit, CountDownLatch}
import api.{RequestWithMetadataHandler, SiriusResult}
import java.util.concurrent.{CountDownLatch, TimeUnit}

/**
* Request handler for testing purposes that includes a countdown
* latch so that we can monitor it for completion of events
*/
class LatchedRequestHandler(expectedTicks: Int) extends RequestHandler {
class LatchedRequestHandler(expectedTicks: Int) extends RequestWithMetadataHandler {
var latch = new CountDownLatch(expectedTicks)

def handleGet(key: String): SiriusResult = SiriusResult.none()

def handlePut(key: String, body: Array[Byte]): SiriusResult = {
def handlePut(sequence: Long, timestamp: Long, key: String, body: Array[Byte]): SiriusResult = {
latch.countDown()
SiriusResult.none()
}

def handleDelete(key: String): SiriusResult = {
def handleDelete(sequence: Long, timestamp: Long, key: String): SiriusResult = {
latch.countDown()
SiriusResult.none()
}
Expand Down