Skip to content

Commit

Permalink
Merge 408c037 into b83742d
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathan Mittler committed Aug 20, 2015
2 parents b83742d + 408c037 commit 3dc9ed1
Show file tree
Hide file tree
Showing 15 changed files with 360 additions and 307 deletions.
85 changes: 8 additions & 77 deletions core/src/main/java/io/grpc/AbstractChannelBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,13 @@

package io.grpc;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import io.grpc.internal.ClientTransportFactory;
import io.grpc.internal.Internal;
import io.grpc.internal.SharedResourceHolder;
import io.grpc.internal.SharedResourceHolder.Resource;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.annotation.Nullable;

Expand All @@ -53,28 +47,9 @@
* @param <BuilderT> The concrete type of this builder.
*/
public abstract class AbstractChannelBuilder<BuilderT extends AbstractChannelBuilder<BuilderT>> {
static final Resource<ExecutorService> DEFAULT_EXECUTOR =
new Resource<ExecutorService>() {
private static final String name = "grpc-default-executor";
@Override
public ExecutorService create() {
return Executors.newCachedThreadPool(new ThreadFactoryBuilder()
.setNameFormat(name + "-%d").build());
}

@Override
public void close(ExecutorService instance) {
instance.shutdown();
}

@Override
public String toString() {
return name;
}
};

@Nullable
private ExecutorService userExecutor;
private ExecutorService executor;
private final List<ClientInterceptor> interceptors = new ArrayList<ClientInterceptor>();

@Nullable
Expand All @@ -90,7 +65,7 @@ public String toString() {
* shut down the executor when it's desired.
*/
public final BuilderT executor(ExecutorService executor) {
userExecutor = executor;
this.executor = executor;
return thisT();
}

Expand Down Expand Up @@ -135,59 +110,15 @@ public final BuilderT userAgent(String userAgent) {
* Builds a channel using the given parameters.
*/
public ChannelImpl build() {
final ExecutorService executor;
final boolean releaseExecutor;
if (userExecutor != null) {
executor = userExecutor;
releaseExecutor = false;
} else {
executor = SharedResourceHolder.get(DEFAULT_EXECUTOR);
releaseExecutor = true;
}

final ChannelEssentials essentials = buildEssentials();
ChannelImpl channel = new ChannelImpl(essentials.transportFactory, executor, userAgent,
interceptors);
channel.setTerminationRunnable(new Runnable() {
@Override
public void run() {
if (releaseExecutor) {
SharedResourceHolder.release(DEFAULT_EXECUTOR, executor);
}
if (essentials.terminationRunnable != null) {
essentials.terminationRunnable.run();
}
}
});
return channel;
}

/**
* The essentials required for creating a channel.
*/
@Internal
protected static class ChannelEssentials {
final ClientTransportFactory transportFactory;
@Nullable final Runnable terminationRunnable;

/**
* Constructor.
*
* @param transportFactory the created channel uses this factory to create transports
* @param terminationRunnable will be called at the channel termination
*/
public ChannelEssentials(ClientTransportFactory transportFactory,
@Nullable Runnable terminationRunnable) {
this.transportFactory = Preconditions.checkNotNull(transportFactory);
this.terminationRunnable = terminationRunnable;
}
ClientTransportFactory transportFactory = buildTransportFactory();
return new ChannelImpl(transportFactory, executor, userAgent, interceptors);
}

/**
* Children of AbstractChannelBuilder should override this method to provide transport specific
* information for the channel. This method is mean for Transport implementors and should not be
* used by normal users.
* Children of AbstractChannelBuilder should override this method to provide the
* {@link ClientTransportFactory} appropriate for this channel. This method is meant for
* Transport implementors and should not be used by normal users.
*/
@Internal
protected abstract ChannelEssentials buildEssentials();
protected abstract ClientTransportFactory buildTransportFactory();
}
57 changes: 5 additions & 52 deletions core/src/main/java/io/grpc/AbstractServerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,8 @@

package io.grpc;

import static io.grpc.AbstractChannelBuilder.DEFAULT_EXECUTOR;

import com.google.common.base.Preconditions;

import io.grpc.internal.SharedResourceHolder;

import java.util.concurrent.ExecutorService;

import javax.annotation.Nullable;
Expand All @@ -50,7 +46,7 @@ public abstract class AbstractServerBuilder<BuilderT extends AbstractServerBuild

private final HandlerRegistry registry;
@Nullable
private ExecutorService userExecutor;
private ExecutorService executor;

/**
* Constructs using a given handler registry.
Expand All @@ -77,7 +73,7 @@ protected AbstractServerBuilder() {
*/
@SuppressWarnings("unchecked")
public final BuilderT executor(ExecutorService executor) {
userExecutor = executor;
this.executor = executor;
return (BuilderT) this;
}

Expand All @@ -103,57 +99,14 @@ public final BuilderT addService(ServerServiceDefinition service) {
* with {@link ServerImpl#start()}.
*/
public ServerImpl build() {
final ExecutorService executor;
final boolean releaseExecutor;
if (userExecutor != null) {
executor = userExecutor;
releaseExecutor = false;
} else {
executor = SharedResourceHolder.get(DEFAULT_EXECUTOR);
releaseExecutor = true;
}

final ServerEssentials essentials = buildEssentials();
ServerImpl server = new ServerImpl(executor, registry, essentials.server);
server.setTerminationRunnable(new Runnable() {
@Override
public void run() {
if (releaseExecutor) {
SharedResourceHolder.release(DEFAULT_EXECUTOR, executor);
}
if (essentials.terminationRunnable != null) {
essentials.terminationRunnable.run();
}
}
});
return server;
io.grpc.internal.Server transportServer = buildTransportServer();
return new ServerImpl(executor, registry, transportServer);
}

/**
* Children of AbstractServerBuilder should override this method to provide transport specific
* information for the server. This method is mean for Transport implementors and should not be
* used by normal users.
*/
protected abstract ServerEssentials buildEssentials();

/**
* The essentials required for creating a server.
*/
protected static class ServerEssentials {
final io.grpc.internal.Server server;
@Nullable
final Runnable terminationRunnable;

/**
* Constructor.
*
* @param server the created server uses this server to accept transports
* @param terminationRunnable will be called at the server termination
*/
public ServerEssentials(io.grpc.internal.Server server,
@Nullable Runnable terminationRunnable) {
this.server = Preconditions.checkNotNull(server, "server");
this.terminationRunnable = terminationRunnable;
}
}
protected abstract io.grpc.internal.Server buildTransportServer();
}
64 changes: 47 additions & 17 deletions core/src/main/java/io/grpc/ChannelImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import io.grpc.ClientCallImpl.ClientTransportProvider;
import io.grpc.MessageEncoding.Compressor;
Expand All @@ -46,6 +47,7 @@
import io.grpc.internal.HttpUtil;
import io.grpc.internal.SerializingExecutor;
import io.grpc.internal.SharedResourceHolder;
import io.grpc.internal.SharedResourceHolder.Resource;

import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -66,8 +68,30 @@
@ThreadSafe
public final class ChannelImpl extends Channel {
private static final Logger log = Logger.getLogger(ChannelImpl.class.getName());

static final Resource<ExecutorService> SHARED_EXECUTOR =
new Resource<ExecutorService>() {
private static final String name = "grpc-default-executor";
@Override
public ExecutorService create() {
return Executors.newCachedThreadPool(new ThreadFactoryBuilder()
.setNameFormat(name + "-%d").build());
}

@Override
public void close(ExecutorService instance) {
instance.shutdown();
}

@Override
public String toString() {
return name;
}
};

private final ClientTransportFactory transportFactory;
private final ExecutorService executor;
private final boolean usingSharedExecutor;
private final String userAgent;
private final Object lock = new Object();

Expand Down Expand Up @@ -100,7 +124,6 @@ public final class ChannelImpl extends Channel {
private boolean shutdown;
@GuardedBy("lock")
private boolean terminated;
private Runnable terminationRunnable;

private long reconnectTimeMillis;
private BackoffPolicy reconnectPolicy;
Expand All @@ -114,22 +137,22 @@ public ClientTransport get() {
}
};

ChannelImpl(ClientTransportFactory transportFactory, ExecutorService executor,
ChannelImpl(ClientTransportFactory transportFactory, @Nullable ExecutorService executor,
@Nullable String userAgent, List<ClientInterceptor> interceptors) {
this.transportFactory = transportFactory;
this.executor = executor;
this.userAgent = userAgent;
this.interceptorChannel = ClientInterceptors.intercept(new RealChannel(), interceptors);
scheduledExecutor = SharedResourceHolder.get(TIMER_SERVICE);
}

/** Hack to allow executors to auto-shutdown. Not for general use. */
// TODO(ejona86): Replace with a real API.
void setTerminationRunnable(Runnable runnable) {
this.terminationRunnable = runnable;
if (executor == null) {
usingSharedExecutor = true;
this.executor = SharedResourceHolder.get(SHARED_EXECUTOR);
} else {
usingSharedExecutor = false;
this.executor = executor;
}
}


/**
* Sets the default compression method for this Channel. By default, new calls will use the
* provided compressor. Each individual Call can override this by specifying it in CallOptions.
Expand Down Expand Up @@ -162,9 +185,7 @@ public ChannelImpl shutdown() {
} else if (transports.isEmpty()) {
terminated = true;
lock.notifyAll();
if (terminationRunnable != null) {
terminationRunnable.run();
}
onChannelTerminated();
}
}
if (savedActiveTransport != null) {
Expand Down Expand Up @@ -380,10 +401,8 @@ public void transportTerminated() {
log.warning("transportTerminated called after already terminated");
}
terminated = true;
onChannelTerminated();
lock.notifyAll();
if (terminationRunnable != null) {
terminationRunnable.run();
}
}
}
}
Expand All @@ -401,6 +420,17 @@ public void transportTerminated() {
public static final Metadata.Key<String> MESSAGE_ENCODING_KEY =
Metadata.Key.of(HttpUtil.MESSAGE_ENCODING, Metadata.ASCII_STRING_MARSHALLER);

/**
* If we're using the shared executor, returns its reference.
*/
private void onChannelTerminated() {
if (usingSharedExecutor) {
SharedResourceHolder.release(SHARED_EXECUTOR, executor);
}
// Release the transport factory so that it can deallocate any resources.
transportFactory.release();
}

/**
* Marshals a microseconds representation of the timeout to and from a string representation,
* consisting of an ASCII decimal representation of a number with at most 8 digits, followed by a
Expand Down Expand Up @@ -470,8 +500,8 @@ public Long parseAsciiString(String serialized) {
}
}

static final SharedResourceHolder.Resource<ScheduledExecutorService> TIMER_SERVICE =
new SharedResourceHolder.Resource<ScheduledExecutorService>() {
static final Resource<ScheduledExecutorService> TIMER_SERVICE =
new Resource<ScheduledExecutorService>() {
@Override
public ScheduledExecutorService create() {
return Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
Expand Down
Loading

0 comments on commit 3dc9ed1

Please sign in to comment.