diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java index efbfe19760f28..fa51308d04e39 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java @@ -18,9 +18,9 @@ package org.apache.beam.runners.fnexecution.control; import com.google.auto.value.AutoValue; -import com.google.common.base.Function; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicLong; import org.apache.beam.model.fnexecution.v1.BeamFnApi; @@ -115,12 +115,8 @@ public Future register( return Futures.transform( genericResponse, - new Function() { - @Override - public BeamFnApi.RegisterResponse apply(BeamFnApi.InstructionResponse input) { - return input.getRegister(); - } - }); + input -> input.getRegister(), + MoreExecutors.directExecutor()); } /** @@ -164,12 +160,8 @@ public void accept(Object input) throws Exception { ListenableFuture specificResponse = Futures.transform( genericResponse, - new Function() { - @Override - public BeamFnApi.ProcessBundleResponse apply(BeamFnApi.InstructionResponse input) { - return input.getProcessBundle(); - } - }); + input -> input.getProcessBundle(), + MoreExecutors.directExecutor()); return ActiveBundle.create(bundleId, specificResponse, dataReceiver); } diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/SocketAddressFactory.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/SocketAddressFactory.java index 090cc75a383ac..640931d694e00 100644 --- a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/SocketAddressFactory.java +++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/channel/SocketAddressFactory.java @@ -58,7 +58,7 @@ public static SocketAddress createFrom(String value) { HostAndPort hostAndPort = HostAndPort.fromString(value); checkArgument(hostAndPort.hasPort(), "Address must be a unix:// path or be in the form host:port. Got: %s", value); - return new InetSocketAddress(hostAndPort.getHostText(), hostAndPort.getPort()); + return new InetSocketAddress(hostAndPort.getHost(), hostAndPort.getPort()); } } }