Skip to content

Commit

Permalink
Construct primitive session client asynchronously to allow external s…
Browse files Browse the repository at this point in the history
…ession ID generation.
  • Loading branch information
kuujo committed Jun 28, 2018
1 parent cb2cbac commit b26b60f
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 33 deletions.
Expand Up @@ -50,7 +50,7 @@ public class RecoveringSessionClient implements SessionClient {
private final PartitionId partitionId; private final PartitionId partitionId;
private final String name; private final String name;
private final PrimitiveType primitiveType; private final PrimitiveType primitiveType;
private final Supplier<SessionClient> proxyFactory; private final Supplier<CompletableFuture<SessionClient>> proxyFactory;
private final ThreadContext context; private final ThreadContext context;
private Logger log; private Logger log;
private volatile CompletableFuture<SessionClient> connectFuture; private volatile CompletableFuture<SessionClient> connectFuture;
Expand All @@ -67,7 +67,7 @@ public RecoveringSessionClient(
PartitionId partitionId, PartitionId partitionId,
String name, String name,
PrimitiveType primitiveType, PrimitiveType primitiveType,
Supplier<SessionClient> sessionFactory, Supplier<CompletableFuture<SessionClient>> sessionFactory,
ThreadContext context) { ThreadContext context) {
this.partitionId = checkNotNull(partitionId); this.partitionId = checkNotNull(partitionId);
this.name = checkNotNull(name); this.name = checkNotNull(name);
Expand Down Expand Up @@ -160,7 +160,7 @@ private void recover() {
*/ */
private void openProxy(CompletableFuture<SessionClient> future) { private void openProxy(CompletableFuture<SessionClient> future) {
log.debug("Opening proxy session"); log.debug("Opening proxy session");
proxyFactory.get().connect().whenComplete((proxy, error) -> { proxyFactory.get().thenCompose(proxy -> proxy.connect()).whenComplete((proxy, error) -> {
if (error == null) { if (error == null) {
synchronized (this) { synchronized (this) {
this.log = ContextualLoggerFactory.getLogger(getClass(), LoggerContext.builder(SessionClient.class) this.log = ContextualLoggerFactory.getLogger(getClass(), LoggerContext.builder(SessionClient.class)
Expand Down
Expand Up @@ -101,21 +101,22 @@ public PrimaryBackupSessionClient.Builder sessionBuilder(String primitiveName, P
return new PrimaryBackupSessionClient.Builder() { return new PrimaryBackupSessionClient.Builder() {
@Override @Override
public SessionClient build() { public SessionClient build() {
Supplier<SessionClient> proxyBuilder = () -> new PrimaryBackupSessionClient( Supplier<CompletableFuture<SessionClient>> proxyBuilder = () -> sessionIdService.nextSessionId()
clientName, .thenApply(sessionId -> new PrimaryBackupSessionClient(
partitionId, clientName,
Futures.get(sessionIdService.nextSessionId()), partitionId,
primitiveType, sessionId,
new PrimitiveDescriptor( primitiveType,
primitiveName, new PrimitiveDescriptor(
primitiveType.name(), primitiveName,
configBytes, primitiveType.name(),
numBackups, configBytes,
replication), numBackups,
clusterMembershipService, replication),
PrimaryBackupClient.this.protocol, clusterMembershipService,
primaryElection, PrimaryBackupClient.this.protocol,
threadContextFactory.createContext()); primaryElection,
threadContextFactory.createContext()));


SessionClient proxy; SessionClient proxy;
ThreadContext context = threadContextFactory.createContext(); ThreadContext context = threadContextFactory.createContext();
Expand All @@ -128,7 +129,7 @@ public SessionClient build() {
proxyBuilder, proxyBuilder,
context); context);
} else { } else {
proxy = proxyBuilder.get(); proxy = Futures.get(proxyBuilder.get());
} }


// If max retries is set, wrap the client in a retrying proxy client. // If max retries is set, wrap the client in a retrying proxy client.
Expand Down
Expand Up @@ -129,19 +129,20 @@ public RaftSessionClient.Builder sessionBuilder(String primitiveName, PrimitiveT
@Override @Override
public SessionClient build() { public SessionClient build() {
// Create a proxy builder that uses the session manager to open a session. // Create a proxy builder that uses the session manager to open a session.
Supplier<SessionClient> proxyFactory = () -> new DefaultRaftSessionClient( Supplier<CompletableFuture<SessionClient>> proxyFactory = () -> CompletableFuture.completedFuture(
primitiveName, new DefaultRaftSessionClient(
primitiveType, primitiveName,
serviceConfig, primitiveType,
partitionId, serviceConfig,
DefaultRaftClient.this.protocol, partitionId,
selectorManager, DefaultRaftClient.this.protocol,
sessionManager, selectorManager,
readConsistency, sessionManager,
communicationStrategy, readConsistency,
threadContextFactory.createContext(), communicationStrategy,
minTimeout, threadContextFactory.createContext(),
maxTimeout); minTimeout,
maxTimeout));


SessionClient proxy; SessionClient proxy;


Expand All @@ -157,7 +158,7 @@ public SessionClient build() {
proxyFactory, proxyFactory,
context); context);
} else { } else {
proxy = proxyFactory.get(); proxy = proxyFactory.get().join();
} }


// If max retries is set, wrap the client in a retrying proxy client. // If max retries is set, wrap the client in a retrying proxy client.
Expand Down

0 comments on commit b26b60f

Please sign in to comment.