diff --git a/core/src/main/java/io/grpc/inprocess/AnonymousInProcessSocketAddress.java b/core/src/main/java/io/grpc/inprocess/AnonymousInProcessSocketAddress.java new file mode 100644 index 00000000000..5f6486e335d --- /dev/null +++ b/core/src/main/java/io/grpc/inprocess/AnonymousInProcessSocketAddress.java @@ -0,0 +1,58 @@ +/* + * Copyright 2021 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.inprocess; + +import static com.google.common.base.Preconditions.checkState; + +import io.grpc.ExperimentalApi; +import java.io.IOException; +import java.net.SocketAddress; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +/** + * Custom SocketAddress class for {@link InProcessTransport}, for + * a server which can only be referenced via this address instance. + */ +@ExperimentalApi("https://github.com/grpc/grpc-java/issues/8626") +public final class AnonymousInProcessSocketAddress extends SocketAddress { + private static final long serialVersionUID = -8567592561863414695L; + + @Nullable + @GuardedBy("this") + private InProcessServer server; + + /** Creates a new AnonymousInProcessSocketAddress. */ + public AnonymousInProcessSocketAddress() { } + + @Nullable + synchronized InProcessServer getServer() { + return server; + } + + synchronized void setServer(InProcessServer server) throws IOException { + if (this.server != null) { + throw new IOException("Server instance already registered"); + } + this.server = server; + } + + synchronized void clearServer(InProcessServer server) { + checkState(this.server == server); + this.server = null; + } +} diff --git a/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java b/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java index 8a309408a94..df396ae2f66 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java @@ -55,15 +55,28 @@ public final class InProcessChannelBuilder extends * @return a new builder */ public static InProcessChannelBuilder forName(String name) { - return new InProcessChannelBuilder(name); + return forAddress(new InProcessSocketAddress(checkNotNull(name, "name"))); } /** - * Always fails. Call {@link #forName} instead. + * Create a channel builder that will connect to the server referenced by the given target URI. + * Only intended for use with a custom name resolver. + * + * @param target the identity of the server to connect to + * @return a new builder */ - @DoNotCall("Unsupported. Use forName() instead") public static InProcessChannelBuilder forTarget(String target) { - throw new UnsupportedOperationException("call forName() instead"); + return new InProcessChannelBuilder(null, checkNotNull(target, "target")); + } + + /** + * Create a channel builder that will connect to the server referenced by the given address. + * + * @param address the address of the server to connect to + * @return a new builder + */ + public static InProcessChannelBuilder forAddress(SocketAddress address) { + return new InProcessChannelBuilder(checkNotNull(address, "address"), null); } /** @@ -75,13 +88,11 @@ public static InProcessChannelBuilder forAddress(String name, int port) { } private final ManagedChannelImplBuilder managedChannelImplBuilder; - private final String name; private ScheduledExecutorService scheduledExecutorService; private int maxInboundMetadataSize = Integer.MAX_VALUE; private boolean transportIncludeStatusCause = false; - private InProcessChannelBuilder(String name) { - this.name = checkNotNull(name, "name"); + private InProcessChannelBuilder(@Nullable SocketAddress directAddress, @Nullable String target) { final class InProcessChannelTransportFactoryBuilder implements ClientTransportFactoryBuilder { @Override @@ -90,8 +101,13 @@ public ClientTransportFactory buildClientTransportFactory() { } } - managedChannelImplBuilder = new ManagedChannelImplBuilder(new InProcessSocketAddress(name), - "localhost", new InProcessChannelTransportFactoryBuilder(), null); + if (directAddress != null) { + managedChannelImplBuilder = new ManagedChannelImplBuilder(directAddress, "localhost", + new InProcessChannelTransportFactoryBuilder(), null); + } else { + managedChannelImplBuilder = new ManagedChannelImplBuilder(target, + new InProcessChannelTransportFactoryBuilder(), null); + } // In-process transport should not record its traffic to the stats module. // https://github.com/grpc/grpc-java/issues/2284 @@ -204,7 +220,7 @@ public InProcessChannelBuilder propagateCauseWithStatus(boolean enable) { ClientTransportFactory buildTransportFactory() { return new InProcessClientTransportFactory( - name, scheduledExecutorService, maxInboundMetadataSize, transportIncludeStatusCause); + scheduledExecutorService, maxInboundMetadataSize, transportIncludeStatusCause); } void setStatsEnabled(boolean value) { @@ -215,7 +231,6 @@ void setStatsEnabled(boolean value) { * Creates InProcess transports. Exposed for internal use, as it should be private. */ static final class InProcessClientTransportFactory implements ClientTransportFactory { - private final String name; private final ScheduledExecutorService timerService; private final boolean useSharedTimer; private final int maxInboundMetadataSize; @@ -223,10 +238,8 @@ static final class InProcessClientTransportFactory implements ClientTransportFac private final boolean includeCauseWithStatus; private InProcessClientTransportFactory( - String name, @Nullable ScheduledExecutorService scheduledExecutorService, int maxInboundMetadataSize, boolean includeCauseWithStatus) { - this.name = name; useSharedTimer = scheduledExecutorService == null; timerService = useSharedTimer ? SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE) : scheduledExecutorService; @@ -242,7 +255,7 @@ public ConnectionClientTransport newClientTransport( } // TODO(carl-mastrangelo): Pass channelLogger in. return new InProcessTransport( - name, maxInboundMetadataSize, options.getAuthority(), options.getUserAgent(), + addr, maxInboundMetadataSize, options.getAuthority(), options.getUserAgent(), options.getEagAttributes(), includeCauseWithStatus); } diff --git a/core/src/main/java/io/grpc/inprocess/InProcessServer.java b/core/src/main/java/io/grpc/inprocess/InProcessServer.java index 7922ebd21a1..ffaca78f397 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessServer.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessServer.java @@ -40,11 +40,16 @@ final class InProcessServer implements InternalServer { private static final ConcurrentMap registry = new ConcurrentHashMap<>(); - static InProcessServer findServer(String name) { - return registry.get(name); + static InProcessServer findServer(SocketAddress addr) { + if (addr instanceof AnonymousInProcessSocketAddress) { + return ((AnonymousInProcessSocketAddress) addr).getServer(); + } else if (addr instanceof InProcessSocketAddress) { + return registry.get(((InProcessSocketAddress) addr).getName()); + } + return null; } - private final String name; + private final SocketAddress listenAddress; private final int maxInboundMetadataSize; private final List streamTracerFactories; private ServerListener listener; @@ -60,7 +65,7 @@ static InProcessServer findServer(String name) { InProcessServer( InProcessServerBuilder builder, List streamTracerFactories) { - this.name = builder.name; + this.listenAddress = builder.listenAddress; this.schedulerPool = builder.schedulerPool; this.maxInboundMetadataSize = builder.maxInboundMetadataSize; this.streamTracerFactories = @@ -72,14 +77,25 @@ public void start(ServerListener serverListener) throws IOException { this.listener = serverListener; this.scheduler = schedulerPool.getObject(); // Must be last, as channels can start connecting after this point. - if (registry.putIfAbsent(name, this) != null) { - throw new IOException("name already registered: " + name); + registerInstance(); + } + + private void registerInstance() throws IOException { + if (listenAddress instanceof AnonymousInProcessSocketAddress) { + ((AnonymousInProcessSocketAddress) listenAddress).setServer(this); + } else if (listenAddress instanceof InProcessSocketAddress) { + String name = ((InProcessSocketAddress) listenAddress).getName(); + if (registry.putIfAbsent(name, this) != null) { + throw new IOException("name already registered: " + name); + } + } else { + throw new AssertionError(); } } @Override public SocketAddress getListenSocketAddress() { - return new InProcessSocketAddress(name); + return listenAddress; } @Override @@ -99,9 +115,7 @@ public List> getListenSocketStatsList() { @Override public void shutdown() { - if (!registry.remove(name, this)) { - throw new AssertionError(); - } + unregisterInstance(); scheduler = schedulerPool.returnObject(scheduler); synchronized (this) { shutdown = true; @@ -109,9 +123,22 @@ public void shutdown() { } } + private void unregisterInstance() { + if (listenAddress instanceof AnonymousInProcessSocketAddress) { + ((AnonymousInProcessSocketAddress) listenAddress).clearServer(this); + } else if (listenAddress instanceof InProcessSocketAddress) { + String name = ((InProcessSocketAddress) listenAddress).getName(); + if (!registry.remove(name, this)) { + throw new AssertionError(); + } + } else { + throw new AssertionError(); + } + } + @Override public String toString() { - return MoreObjects.toStringHelper(this).add("name", name).toString(); + return MoreObjects.toStringHelper(this).add("listenAddress", listenAddress).toString(); } synchronized ServerTransportListener register(InProcessTransport transport) { diff --git a/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java b/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java index 6c68189fcc9..4c5d46dc54d 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java @@ -34,6 +34,7 @@ import io.grpc.internal.ServerImplBuilder.ClientTransportServersBuilder; import io.grpc.internal.SharedResourcePool; import java.io.File; +import java.net.SocketAddress; import java.util.List; import java.util.UUID; import java.util.concurrent.ScheduledExecutorService; @@ -81,7 +82,16 @@ public final class InProcessServerBuilder extends * @return a new builder */ public static InProcessServerBuilder forName(String name) { - return new InProcessServerBuilder(name); + return forAddress(new InProcessSocketAddress(checkNotNull(name, "name"))); + } + + /** + * Create a server builder which listens on the given address. + * @param listenAddress The SocketAddress this server will listen on. + * @return a new builder + */ + public static InProcessServerBuilder forAddress(SocketAddress listenAddress) { + return new InProcessServerBuilder(listenAddress); } /** @@ -100,13 +110,13 @@ public static String generateName() { } private final ServerImplBuilder serverImplBuilder; - final String name; + final SocketAddress listenAddress; int maxInboundMetadataSize = Integer.MAX_VALUE; ObjectPool schedulerPool = SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE); - private InProcessServerBuilder(String name) { - this.name = Preconditions.checkNotNull(name, "name"); + private InProcessServerBuilder(SocketAddress listenAddress) { + this.listenAddress = checkNotNull(listenAddress, "listenAddress"); final class InProcessClientTransportServersBuilder implements ClientTransportServersBuilder { @Override diff --git a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java index 895b709559b..2f4870fdcc2 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java @@ -59,6 +59,7 @@ import io.grpc.internal.StatsTraceContext; import io.grpc.internal.StreamListener; import java.io.InputStream; +import java.net.SocketAddress; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; @@ -80,7 +81,7 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans private static final Logger log = Logger.getLogger(InProcessTransport.class.getName()); private final InternalLogId logId; - private final String name; + private final SocketAddress address; private final int clientMaxInboundMetadataSize; private final String authority; private final String userAgent; @@ -119,10 +120,10 @@ protected void handleNotInUse() { } }; - private InProcessTransport(String name, int maxInboundMetadataSize, String authority, + private InProcessTransport(SocketAddress address, int maxInboundMetadataSize, String authority, String userAgent, Attributes eagAttrs, Optional optionalServerListener, boolean includeCauseWithStatus) { - this.name = name; + this.address = address; this.clientMaxInboundMetadataSize = maxInboundMetadataSize; this.authority = authority; this.userAgent = GrpcUtil.getGrpcUserAgent("inprocess", userAgent); @@ -130,18 +131,18 @@ private InProcessTransport(String name, int maxInboundMetadataSize, String autho this.attributes = Attributes.newBuilder() .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY) .set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttrs) - .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, new InProcessSocketAddress(name)) - .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, new InProcessSocketAddress(name)) + .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, address) + .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, address) .build(); this.optionalServerListener = optionalServerListener; - logId = InternalLogId.allocate(getClass(), name); + logId = InternalLogId.allocate(getClass(), address.toString()); this.includeCauseWithStatus = includeCauseWithStatus; } public InProcessTransport( - String name, int maxInboundMetadataSize, String authority, String userAgent, + SocketAddress address, int maxInboundMetadataSize, String authority, String userAgent, Attributes eagAttrs, boolean includeCauseWithStatus) { - this(name, maxInboundMetadataSize, authority, userAgent, eagAttrs, + this(address, maxInboundMetadataSize, authority, userAgent, eagAttrs, Optional.absent(), includeCauseWithStatus); } @@ -150,7 +151,7 @@ public InProcessTransport( Attributes eagAttrs, ObjectPool serverSchedulerPool, List serverStreamTracerFactories, ServerListener serverListener) { - this(name, maxInboundMetadataSize, authority, userAgent, eagAttrs, + this(new InProcessSocketAddress(name), maxInboundMetadataSize, authority, userAgent, eagAttrs, Optional.of(serverListener), false); this.serverMaxInboundMetadataSize = maxInboundMetadataSize; this.serverSchedulerPool = serverSchedulerPool; @@ -165,7 +166,7 @@ public synchronized Runnable start(ManagedClientTransport.Listener listener) { serverScheduler = serverSchedulerPool.getObject(); serverTransportListener = optionalServerListener.get().transportCreated(this); } else { - InProcessServer server = InProcessServer.findServer(name); + InProcessServer server = InProcessServer.findServer(address); if (server != null) { serverMaxInboundMetadataSize = server.getMaxInboundMetadataSize(); serverSchedulerPool = server.getScheduledExecutorServicePool(); @@ -176,7 +177,7 @@ public synchronized Runnable start(ManagedClientTransport.Listener listener) { } } if (serverTransportListener == null) { - shutdownStatus = Status.UNAVAILABLE.withDescription("Could not find server: " + name); + shutdownStatus = Status.UNAVAILABLE.withDescription("Could not find server: " + address); final Status localShutdownStatus = shutdownStatus; return new Runnable() { @Override @@ -194,8 +195,8 @@ public void run() { public void run() { synchronized (InProcessTransport.this) { Attributes serverTransportAttrs = Attributes.newBuilder() - .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, new InProcessSocketAddress(name)) - .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, new InProcessSocketAddress(name)) + .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, address) + .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, address) .build(); serverStreamAttributes = serverTransportListener.transportReady(serverTransportAttrs); clientTransportListener.transportReady(); @@ -307,7 +308,7 @@ public void shutdownNow(Status reason) { public String toString() { return MoreObjects.toStringHelper(this) .add("logId", logId.getId()) - .add("name", name) + .add("address", address) .toString(); } diff --git a/core/src/test/java/io/grpc/inprocess/AnonymousInProcessSocketAddressTest.java b/core/src/test/java/io/grpc/inprocess/AnonymousInProcessSocketAddressTest.java new file mode 100644 index 00000000000..9c1beb7dde3 --- /dev/null +++ b/core/src/test/java/io/grpc/inprocess/AnonymousInProcessSocketAddressTest.java @@ -0,0 +1,104 @@ +/* + * Copyright 2021 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.inprocess; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.fail; + +import com.google.common.testing.EqualsTester; +import io.grpc.ServerStreamTracer; +import java.io.IOException; +import java.util.Collections; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link AnonymousInProcessSocketAddress}. */ +@RunWith(JUnit4.class) +public class AnonymousInProcessSocketAddressTest { + + @Test + public void defaultState() { + AnonymousInProcessSocketAddress addr = new AnonymousInProcessSocketAddress(); + assertThat(addr.getServer()).isNull(); + } + + @Test + public void setServer() throws Exception { + AnonymousInProcessSocketAddress addr = new AnonymousInProcessSocketAddress(); + InProcessServer server = createAnonymousServer(); + addr.setServer(server); + assertThat(addr.getServer()).isSameInstanceAs(server); + } + + @Test + public void setServerTwice() throws Exception { + AnonymousInProcessSocketAddress addr = new AnonymousInProcessSocketAddress(); + InProcessServer server = createAnonymousServer(); + addr.setServer(server); + try { + addr.setServer(server); + fail("Expected IOException on attempt to set server twice"); + } catch (IOException ioe) { + // Expected. + } + } + + @Test + public void clearServer() throws Exception { + AnonymousInProcessSocketAddress addr = new AnonymousInProcessSocketAddress(); + InProcessServer server = createAnonymousServer(); + addr.setServer(server); + addr.clearServer(server); + assertThat(addr.getServer()).isNull(); + } + + @Test + public void clearServerWrongInstance() throws Exception { + AnonymousInProcessSocketAddress addr = new AnonymousInProcessSocketAddress(); + addr.setServer(createAnonymousServer()); + try { + addr.clearServer(createAnonymousServer()); + fail("Expected IllegalStateException on attempt to clear the wrong server"); + } catch (IllegalStateException ise) { + // Expected. + } + } + + @Test + public void equality() throws IOException { + AnonymousInProcessSocketAddress addrA = new AnonymousInProcessSocketAddress(); + AnonymousInProcessSocketAddress addrB = new AnonymousInProcessSocketAddress(); + AnonymousInProcessSocketAddress addrC = new AnonymousInProcessSocketAddress(); + InProcessServer server = createAnonymousServer(); + + // Ensure two addresses with the same server are still distinct from each other. + addrA.setServer(server); + addrB.setServer(server); + new EqualsTester() + .addEqualityGroup(addrA) + .addEqualityGroup(addrB) + .addEqualityGroup(addrC) + .testEquals(); + } + + private InProcessServer createAnonymousServer() { + AnonymousInProcessSocketAddress unused = new AnonymousInProcessSocketAddress(); + InProcessServerBuilder builder = InProcessServerBuilder.forAddress(unused); + return new InProcessServer(builder, Collections.emptyList()); + } +} diff --git a/core/src/test/java/io/grpc/inprocess/AnonymousInProcessTransportTest.java b/core/src/test/java/io/grpc/inprocess/AnonymousInProcessTransportTest.java new file mode 100644 index 00000000000..a78a604eac3 --- /dev/null +++ b/core/src/test/java/io/grpc/inprocess/AnonymousInProcessTransportTest.java @@ -0,0 +1,57 @@ +/* + * Copyright 2021 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.inprocess; + +import static com.google.common.truth.Truth.assertThat; + +import io.grpc.ServerStreamTracer; +import io.grpc.internal.GrpcUtil; +import io.grpc.internal.InternalServer; +import io.grpc.internal.ManagedClientTransport; +import java.util.List; +import org.junit.After; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link InProcessTransport} with an anonymous server. */ +@RunWith(JUnit4.class) +public final class AnonymousInProcessTransportTest extends InProcessTransportTest { + + private AnonymousInProcessSocketAddress address = new AnonymousInProcessSocketAddress(); + + @After + @Override + public void tearDown() throws InterruptedException { + super.tearDown(); + assertThat(address.getServer()).isNull(); + } + + @Override + protected InternalServer newServer( + List streamTracerFactories) { + InProcessServerBuilder builder = InProcessServerBuilder.forAddress(address) + .maxInboundMetadataSize(GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE); + return new InProcessServer(builder, streamTracerFactories); + } + + @Override + protected ManagedClientTransport newClientTransport(InternalServer server) { + return new InProcessTransport( + address, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, + testAuthority(server), USER_AGENT, eagAttrs(), false); + } +} diff --git a/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java b/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java index 7325cda73cc..9e63a3d9d7c 100644 --- a/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java +++ b/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java @@ -54,7 +54,7 @@ public class InProcessTransportTest extends AbstractTransportTest { private static final String TRANSPORT_NAME = "perfect-for-testing"; private static final String AUTHORITY = "a-testing-authority"; - private static final String USER_AGENT = "a-testing-user-agent"; + protected static final String USER_AGENT = "a-testing-user-agent"; @Rule public final GrpcCleanupRule grpcCleanupRule = new GrpcCleanupRule(); @@ -82,8 +82,8 @@ protected String testAuthority(InternalServer server) { @Override protected ManagedClientTransport newClientTransport(InternalServer server) { return new InProcessTransport( - TRANSPORT_NAME, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, testAuthority(server), USER_AGENT, - eagAttrs(), false); + new InProcessSocketAddress(TRANSPORT_NAME), GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, + testAuthority(server), USER_AGENT, eagAttrs(), false); } @Override