Skip to content

Commit

Permalink
Fix improper constructor arguments in RaftProxy.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Jun 19, 2017
1 parent 2c601f2 commit 1e17650
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 12 deletions.
Expand Up @@ -15,18 +15,17 @@
*/
package io.atomix.protocols.raft.proxy.impl;

import io.atomix.protocols.raft.CommunicationStrategies;
import io.atomix.protocols.raft.CommunicationStrategy;
import io.atomix.protocols.raft.RaftCommand;
import io.atomix.protocols.raft.RaftOperation;
import io.atomix.protocols.raft.RaftQuery;
import io.atomix.protocols.raft.CommunicationStrategies;
import io.atomix.protocols.raft.CommunicationStrategy;
import io.atomix.protocols.raft.protocol.RaftClientProtocol;
import io.atomix.protocols.raft.proxy.RaftProxy;
import io.atomix.util.concurrent.ThreadContext;
import io.atomix.util.serializer.Serializer;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;

import static com.google.common.base.MoreObjects.toStringHelper;
Expand Down Expand Up @@ -62,15 +61,14 @@ public DefaultRaftProxy(
RaftProxyManager sessionManager,
CommunicationStrategy communicationStrategy,
Serializer serializer,
Executor orderedExecutor,
ScheduledExecutorService scheduledExecutor) {
ThreadContext context) {
this.state = checkNotNull(state, "state cannot be null");
this.sessionManager = checkNotNull(sessionManager, "sessionManager cannot be null");
RaftProxySequencer sequencer = new RaftProxySequencer(state);
this.sessionListener = new RaftProxyListener(protocol, state, sequencer, serializer, orderedExecutor);
this.sessionListener = new RaftProxyListener(protocol, state, sequencer, serializer, context);
RaftConnection leaderConnection = new RaftConnection(String.valueOf(state.getSessionId()), protocol.dispatcher(), selectorManager.createSelector(CommunicationStrategies.LEADER));
RaftConnection sessionConnection = new RaftConnection(String.valueOf(state.getSessionId()), protocol.dispatcher(), selectorManager.createSelector(communicationStrategy));
this.sessionSubmitter = new RaftProxySubmitter(leaderConnection, sessionConnection, state, sequencer, sessionManager, serializer, orderedExecutor, scheduledExecutor);
this.sessionSubmitter = new RaftProxySubmitter(leaderConnection, sessionConnection, state, sequencer, sessionManager, serializer, context);
}

@Override
Expand Down
Expand Up @@ -27,6 +27,8 @@
import io.atomix.protocols.raft.proxy.RaftProxy;
import io.atomix.util.concurrent.Futures;
import io.atomix.util.concurrent.OrderedExecutor;
import io.atomix.util.concurrent.SingleThreadContext;
import io.atomix.util.concurrent.ThreadContext;
import io.atomix.util.serializer.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -123,7 +125,7 @@ public CompletableFuture<RaftProxy> openSession(

LOGGER.trace("{} - Sending {}", clientId, request);
CompletableFuture<RaftProxy> future = new CompletableFuture<>();
Executor sessionExecutor = new OrderedExecutor(threadPoolExecutor);
ThreadContext proxyContext = new SingleThreadContext(threadPoolExecutor);
connection.openSession(request).whenCompleteAsync((response, error) -> {
if (error == null) {
if (response.status() == RaftResponse.Status.OK) {
Expand All @@ -138,15 +140,14 @@ public CompletableFuture<RaftProxy> openSession(
this,
communicationStrategy,
serializer,
sessionExecutor,
threadPoolExecutor));
proxyContext));
} else {
future.completeExceptionally(response.error().createException());
}
} else {
future.completeExceptionally(error);
}
}, sessionExecutor);
}, proxyContext);
return future;
}

Expand Down

0 comments on commit 1e17650

Please sign in to comment.