-
Notifications
You must be signed in to change notification settings - Fork 2
RoutedStore redesign
It has been noted that the previous thread pool-based implementation of RoutedStore suffered from two main problems:
- The logic was too complicated
- Slow responses by slow servers resulted in thread pool exhaustion
To fix the second problem, we’ve discussed implementing a non-blocking model for client requests by overhauling the socket layer to use NIO. This is detailed in the SocketStore redesign wiki page.
For the first problem, we’re looking to model the operations (get, getVersions, getAll, delete, and put) via a processing pipeline, making the discrete actions comprising the entire process clearly defined, (more) easily understandable, and reusable.
Let’s first mention how to enable the pipeline routed store in your application:
For Voldemort clients, simply set the following property in your client configuration:
enable_pipeline_routed_store=true
In the server.properties file used by each node, you can set the following property:
enable.pipeline.routed.store=true
Note: as of 0.90, enable_pipeline_routed_store (client) and enable.pipeline.routed.store (server) default to true.
Let’s discuss some of the classes/interfaces.
An Action is a discrete portion of logic that forms part of the overall process that executes a given operation. Its interface is very simple:
public interface Action {
public void execute(Pipeline pipeline, Object eventData);
}
There’s no clear standard about how much or how little logic is performed in a given Action, but there are intuitive separations in the logic that form natural boundaries.
A Pipeline is the main conduit through which an Action is run. An Action is executed in response to the Pipeline receiving an event. The majority of the events are self-initiated from within the Pipeline itself. The only case thus-far where external entities create events are in response to asynchronous responses from servers. A Response instance is created on completion of an asynchronous request and is fed back into the Pipeline where an appropriate ‘response handler’ action is executed.
A Pipeline instance is created per-request. This is due to the fact that it includes internal state, specific to each request (get, getAll, getVersions, put, and delete) invocation.
Pipeline includes two addEvent methods:
public void addEvent(Event event); public void addEvent(Event event, Object data);
Both forms take an Event that describes the event that was received. The second form takes an ‘event data’ object that is some data specific to the event. It is assumed that the event handler will know what type of data is to be passed (if any).
public void execute();
This method pulls the Event instances off of an internal queue and processes them one-by-one, in order of receipt, until an event of type Event.COMPLETED is received at which point the method will exit.
An event is simply an enum that details what sort of event occurred. Here is the definition of Event:
public enum Event {
STARTED,
CONFIGURED,
COMPLETED,
INSUFFICIENT_SUCCESSES,
RESPONSE_RECEIVED,
RESPONSES_RECEIVED,
NOP,
ERROR,
MASTER_DETERMINED;
}
PipelineData includes a common set of data that is used to represent the state within the Pipeline as it moves from action to action. There’s a one-to-one correspondence between a Pipeline and PipelineData, though the latter is not included as an instance variable. Action implementations usually include the PipelineData as an instance variable upon creation. There are a handful of subclasses of PipelineData that are used to handle the different types of operations.
Response represents a response from a call to a remote Voldemort node to perform some operation (get, put, etc.). It wraps the following values:
- Node node
- K key
- V value
- long requestTime
It includes the Node and request time as these are needed by the FailureDetector that will be used by the user of the Response. A Response is usually used in conjunction with asynchronous requests as a sort of callback mechanism, though this isn’t always the case. In the case where they are the result of asynchronous requests, the NonblockingStore will invoke the NonblockingStoreCallback instances’s requestComplete method which will in turn package up the data in a Response object that is sent to the Pipeline via an Event.RESPONSE_RECEIVED event.
Response instances are stored in the PipelineData to represent the responses to requests made during execution of the Pipeline.
This class uses generics for the value to support the return types used by the different operations. Often the key type is simply ByteArray, but in the case of the “get all” operation, the key is actually an Iterable.
Let’s now take a look at how the processing pipeline works for the “get” operation.
Here’s a very simplified version of how the get operation from RoutedStore looks now (unholy amounts of generics omitted):
public List<Versioned<byte[]>> get(final ByteArray key) {
BasicPipelineData pipelineData = new BasicPipelineData();
Pipeline pipeline = new Pipeline(Operation.GET, timeoutMs, TimeUnit.MILLISECONDS);
pipeline.addEventAction(Event.STARTED, new ConfigureNodes());
pipeline.addEventAction(Event.CONFIGURED, new PerformParallelRequests());
pipeline.addEventAction(Event.RESPONSE_RECEIVED, new AcknowledgeResponse());
pipeline.addEventAction(Event.INSUFFICIENT_SUCCESSES, new PerformSerialRequests());
if(repairReads)
pipeline.addEventAction(Event.RESPONSES_RECEIVED, new ReadRepair());
pipeline.execute();
if(pipelineData.getFatalError() != null)
throw pipelineData.getFatalError();
List<Versioned<byte[]>> results = new ArrayList<Versioned<byte[]>>();
for(Response response: pipelineData.getResponses()) {
List<Versioned<byte[]>> value = response.getValue();
if(value != null)
results.addAll(value);
}
return results;
}
Here’s a brief run down of each of the actions.
First, ConfigureNodes is called in response to the Event.STARTED event. It is tasked with determining the nodes used by the operation’s key. The nodes are determined and stored in the PipelineData instance used by all of the Actions in the pipeline. If there aren’t enough nodes, an Event.ERROR is pushed onto the event queue and processing is complete. Otherwise our pre-configured next event (Event.CONFIGURED) is pushed onto the event queue.
If all goes well, the PerformParallelRequests action is invoked. It is in charge of initiating all the asynchronous requests to the nodes relevant to the key. We also keep track of the number of requests we’re making so that later we can determine how many have arrived. Notice too that there’s a bit of logic to handle the case where the preferred number of responses is 0. That is, in some cases we don’t care what the responses are. (To be honest, I’m not sure what that case is, but it’s in the original code as well.)
At this point we’ve initiated the get requests asynchronously and are awaiting responses. The Pipeline logic uses a BlockingQueue causing it to block until an event is received. The NonblockingStore call contains a callback object which will call back into the Pipeline when a response is received, generating an event (Event.RESPONSE_RECEIVED) that will cause the Pipeline to unblock and proceed to process it.
Processing of received responses is done via the AcknowledgeResponse action. This is probably the most complicated logic in any of the Action implementations, and thus deserves some explanation.
First, we need to increment our internal counter of received responses (we’ll use it later on).
Second, we distinguish between failed responses and successful responses. In the case of a failure we record the error in the PipelineData (for possible later use) and in some cases record the error with the FailureDetector. In the case of success, we increment our success counter, add the received response to our list, and update the FailureDetector.
Thirdly, if we’ve received as many responses as we’ve requested and our number of successes isn’t what we require, we can do one of two things:
- Try to invoke another Action. This is used in an attempt to perform serial requests to auxiliary nodes in an attempt to meet our required number of successes. More on that later.
- Otherwise, simply give up with a fatal error.
Fourthly(?), if we’ve succeeded in receiving responses from our preferred number of nodes, then move to the next event. (isComplete is used to prevent any future responses from causing the “completeEvent” from being added again.)
The next event—if our requests were successful—is to perform read repair, though I won’t explain that here. However, if the number of successes from our first batch of asynchronous requests was less than required, we attempt to perform some “catch up” requests serially by executing the PerformSerialRequests action. The logic is pretty much a blend of PerformParallelRequests and AcknowledgeResponse in one class.
And that’s the end of processing of the “get” request.
If you have found any problems, errors, etc. please feel free to describe them on the issues page.
Thanks.