Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public RemoteEnvironment createEnvironment(Environment container) throws Excepti
() -> {
try {
FnHarness.main(
"id",
options,
loggingServer.getApiServiceDescriptor(),
controlServer.getApiServiceDescriptor(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public void setup() throws Exception {
sdkHarnessExecutor.submit(
() ->
FnHarness.main(
"id",
PipelineOptionsFactory.create(),
loggingServer.getApiServiceDescriptor(),
controlServer.getApiServiceDescriptor(),
Expand Down

This file was deleted.

1 change: 1 addition & 0 deletions sdks/java/container/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func main() {

// (3) Invoke the Java harness, preserving artifact ordering in classpath.

os.Setenv("HARNESS_ID", *id)
os.Setenv("PIPELINE_OPTIONS", options)
os.Setenv("LOGGING_API_SERVICE_DESCRIPTOR", proto.MarshalTextString(&pb.ApiServiceDescriptor{Url: *loggingEndpoint}))
os.Setenv("CONTROL_API_SERVICE_DESCRIPTOR", proto.MarshalTextString(&pb.ApiServiceDescriptor{Url: *controlEndpoint}))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.beam.sdk.fn.channel;

import io.grpc.ClientInterceptor;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.netty.NettyChannelBuilder;
Expand All @@ -26,11 +27,10 @@
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.unix.DomainSocketAddress;
import java.net.SocketAddress;
import java.util.List;
import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;

/**
* A Factory which creates an underlying {@link ManagedChannel} implementation.
*/
/** A Factory which creates an underlying {@link ManagedChannel} implementation. */
public abstract class ManagedChannelFactory {
public static ManagedChannelFactory createDefault() {
return new Default();
Expand All @@ -41,7 +41,20 @@ public static ManagedChannelFactory createEpoll() {
return new Epoll();
}

public abstract ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor);
public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) {
return builderFor(apiServiceDescriptor).build();
}

/** Create a {@link ManagedChannelBuilder} for the provided {@link ApiServiceDescriptor}. */
protected abstract ManagedChannelBuilder<?> builderFor(ApiServiceDescriptor descriptor);

/**
* Returns a {@link ManagedChannelFactory} like this one, but which will apply the provided {@link
* ClientInterceptor ClientInterceptors} to any channel it creates.
*/
public ManagedChannelFactory withInterceptors(List<ClientInterceptor> interceptors) {
return new InterceptedManagedChannelFactory(this, interceptors);
}

/**
* Creates a {@link ManagedChannel} backed by an {@link EpollDomainSocketChannel} if the address
Expand All @@ -50,17 +63,18 @@ public static ManagedChannelFactory createEpoll() {
*/
private static class Epoll extends ManagedChannelFactory {
@Override
public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) {
public ManagedChannelBuilder<?> builderFor(ApiServiceDescriptor apiServiceDescriptor) {
SocketAddress address = SocketAddressFactory.createFrom(apiServiceDescriptor.getUrl());
return NettyChannelBuilder.forAddress(address)
.channelType(address instanceof DomainSocketAddress
? EpollDomainSocketChannel.class : EpollSocketChannel.class)
.channelType(
address instanceof DomainSocketAddress
? EpollDomainSocketChannel.class
: EpollSocketChannel.class)
.eventLoopGroup(new EpollEventLoopGroup())
.usePlaintext(true)
// Set the message size to max value here. The actual size is governed by the
// buffer size in the layers above.
.maxInboundMessageSize(Integer.MAX_VALUE)
.build();
.maxInboundMessageSize(Integer.MAX_VALUE);
}
}

Expand All @@ -70,13 +84,38 @@ public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) {
*/
private static class Default extends ManagedChannelFactory {
@Override
public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) {
public ManagedChannelBuilder<?> builderFor(ApiServiceDescriptor apiServiceDescriptor) {
return ManagedChannelBuilder.forTarget(apiServiceDescriptor.getUrl())
.usePlaintext(true)
// Set the message size to max value here. The actual size is governed by the
// buffer size in the layers above.
.maxInboundMessageSize(Integer.MAX_VALUE)
.build();
.maxInboundMessageSize(Integer.MAX_VALUE);
}
}

private static class InterceptedManagedChannelFactory extends ManagedChannelFactory {
private final ManagedChannelFactory channelFactory;
private final List<ClientInterceptor> interceptors;

private InterceptedManagedChannelFactory(
ManagedChannelFactory managedChannelFactory, List<ClientInterceptor> interceptors) {
this.channelFactory = managedChannelFactory;
this.interceptors = interceptors;
}

@Override
public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) {
return builderFor(apiServiceDescriptor).intercept(interceptors).build();
}

@Override
protected ManagedChannelBuilder<?> builderFor(ApiServiceDescriptor descriptor) {
return channelFactory.builderFor(descriptor);
}

@Override
public ManagedChannelFactory withInterceptors(List<ClientInterceptor> interceptors) {
return new InterceptedManagedChannelFactory(channelFactory, interceptors);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose this method is for Factory chaining.
Should we pass "this" in that case?

I would suggest to remove chaining in this manner and provide a builder to do the chaining.
Something like this:
ManagedChannelFactoryBuilder.builder(channelFactory).withInterceptors(interceptors)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are loosing the original list of interceptors here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'm also curious about this. If it's not for factory chaining and isn't necessary for another reason, could it be removed?

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
package org.apache.beam.sdk.fn.test;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.inprocess.InProcessChannelBuilder;
import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
Expand All @@ -35,7 +35,7 @@ public static ManagedChannelFactory create() {
private InProcessManagedChannelFactory() {}

@Override
public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) {
return InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build();
public ManagedChannelBuilder<?> builderFor(ApiServiceDescriptor apiServiceDescriptor) {
return InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,21 @@
* Main entry point into the Beam SDK Fn Harness for Java.
*
* <p>This entry point expects the following environment variables:
*
* <ul>
* <li>LOGGING_API_SERVICE_DESCRIPTOR: A
* {@link org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor} encoded as text
* representing the endpoint that is to be connected to for the Beam Fn Logging service.</li>
* <li>CONTROL_API_SERVICE_DESCRIPTOR: A
* {@link Endpoints.ApiServiceDescriptor} encoded as text
* representing the endpoint that is to be connected to for the Beam Fn Control service.</li>
* <li>HARNESS_ID: A String representing the ID of this FnHarness. This will be added to the
* headers of calls to the Beam Control Service
* <li>LOGGING_API_SERVICE_DESCRIPTOR: A {@link
* org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor} encoded as text
* representing the endpoint that is to be connected to for the Beam Fn Logging service.
* <li>CONTROL_API_SERVICE_DESCRIPTOR: A {@link Endpoints.ApiServiceDescriptor} encoded as text
* representing the endpoint that is to be connected to for the Beam Fn Control service.
* <li>PIPELINE_OPTIONS: A serialized form of {@link PipelineOptions}. See {@link PipelineOptions}
* for further details.</li>
* for further details.
* </ul>
*/
public class FnHarness {
private static final String HARNESS_ID = "HARNESS_ID";
private static final String CONTROL_API_SERVICE_DESCRIPTOR = "CONTROL_API_SERVICE_DESCRIPTOR";
private static final String LOGGING_API_SERVICE_DESCRIPTOR = "LOGGING_API_SERVICE_DESCRIPTOR";
private static final String PIPELINE_OPTIONS = "PIPELINE_OPTIONS";
Expand All @@ -76,27 +79,33 @@ private static Endpoints.ApiServiceDescriptor getApiServiceDescriptor(String env

public static void main(String[] args) throws Exception {
System.out.format("SDK Fn Harness started%n");
System.out.format("Harness ID %s%n", System.getenv(HARNESS_ID));
System.out.format("Logging location %s%n", System.getenv(LOGGING_API_SERVICE_DESCRIPTOR));
System.out.format("Control location %s%n", System.getenv(CONTROL_API_SERVICE_DESCRIPTOR));
System.out.format("Pipeline options %s%n", System.getenv(PIPELINE_OPTIONS));

ObjectMapper objectMapper = new ObjectMapper().registerModules(
ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
PipelineOptions options = objectMapper.readValue(
System.getenv(PIPELINE_OPTIONS), PipelineOptions.class);
String id = System.getenv(HARNESS_ID);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we also log this parameter as other parameters.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

ObjectMapper objectMapper =
new ObjectMapper()
.registerModules(ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
PipelineOptions options =
objectMapper.readValue(System.getenv(PIPELINE_OPTIONS), PipelineOptions.class);

Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor =
getApiServiceDescriptor(LOGGING_API_SERVICE_DESCRIPTOR);

Endpoints.ApiServiceDescriptor controlApiServiceDescriptor =
getApiServiceDescriptor(CONTROL_API_SERVICE_DESCRIPTOR);

main(options, loggingApiServiceDescriptor, controlApiServiceDescriptor);
main(id, options, loggingApiServiceDescriptor, controlApiServiceDescriptor);
}

public static void main(PipelineOptions options,
public static void main(
String id,
PipelineOptions options,
Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor,
Endpoints.ApiServiceDescriptor controlApiServiceDescriptor) throws Exception {
Endpoints.ApiServiceDescriptor controlApiServiceDescriptor)
throws Exception {
ManagedChannelFactory channelFactory;
List<String> experiments = options.as(ExperimentalOptions.class).getExperiments();
if (experiments != null && experiments.contains("beam_fn_api_epoll")) {
Expand All @@ -107,6 +116,7 @@ public static void main(PipelineOptions options,
StreamObserverFactory streamObserverFactory =
HarnessStreamObserverFactories.fromOptions(options);
main(
id,
options,
loggingApiServiceDescriptor,
controlApiServiceDescriptor,
Expand All @@ -115,43 +125,46 @@ public static void main(PipelineOptions options,
}

public static void main(
String id,
PipelineOptions options,
Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor,
Endpoints.ApiServiceDescriptor controlApiServiceDescriptor,
ManagedChannelFactory channelFactory,
StreamObserverFactory streamObserverFactory) {
IdGenerator idGenerator = IdGenerators.decrementingLongs();
try (BeamFnLoggingClient logging = new BeamFnLoggingClient(
options,
loggingApiServiceDescriptor,
channelFactory::forDescriptor)) {
try (BeamFnLoggingClient logging =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logging is not used. Shall we remove it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. This is done to close the client automatically at the completion of the try block, and the client must exist to send intercepted LOG messages to the logging server.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment explaining this as logging is unused which makes this code block non intuitive.

new BeamFnLoggingClient(
options, loggingApiServiceDescriptor, channelFactory::forDescriptor)) {

LOG.info("Fn Harness started");
EnumMap<BeamFnApi.InstructionRequest.RequestCase,
EnumMap<
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please reformat the file.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

BeamFnApi.InstructionRequest.RequestCase,
ThrowingFunction<InstructionRequest, Builder>>
handlers = new EnumMap<>(BeamFnApi.InstructionRequest.RequestCase.class);

RegisterHandler fnApiRegistry = new RegisterHandler();
BeamFnDataGrpcClient beamFnDataMultiplexer = new BeamFnDataGrpcClient(
options, channelFactory::forDescriptor, streamObserverFactory::from);
BeamFnDataGrpcClient beamFnDataMultiplexer =
new BeamFnDataGrpcClient(
options, channelFactory::forDescriptor, streamObserverFactory::from);

BeamFnStateGrpcClientCache beamFnStateGrpcClientCache =
new BeamFnStateGrpcClientCache(
options, idGenerator, channelFactory::forDescriptor, streamObserverFactory::from);

ProcessBundleHandler processBundleHandler = new ProcessBundleHandler(
options,
fnApiRegistry::getById,
beamFnDataMultiplexer,
beamFnStateGrpcClientCache);
handlers.put(BeamFnApi.InstructionRequest.RequestCase.REGISTER,
fnApiRegistry::register);
handlers.put(BeamFnApi.InstructionRequest.RequestCase.PROCESS_BUNDLE,
ProcessBundleHandler processBundleHandler =
new ProcessBundleHandler(
options, fnApiRegistry::getById, beamFnDataMultiplexer, beamFnStateGrpcClientCache);
handlers.put(BeamFnApi.InstructionRequest.RequestCase.REGISTER, fnApiRegistry::register);
handlers.put(
BeamFnApi.InstructionRequest.RequestCase.PROCESS_BUNDLE,
processBundleHandler::processBundle);
BeamFnControlClient control = new BeamFnControlClient(controlApiServiceDescriptor,
channelFactory::forDescriptor,
streamObserverFactory::from,
handlers);
BeamFnControlClient control =
new BeamFnControlClient(
id,
controlApiServiceDescriptor,
channelFactory,
streamObserverFactory::from,
handlers);

LOG.info("Entering instruction processing loop");
control.processInstructionRequests(options.as(GcsOptions.class).getExecutorService());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.beam.fn.harness.control;

import static com.google.common.base.Preconditions.checkArgument;

import io.grpc.ClientInterceptor;
import io.grpc.Metadata;
import io.grpc.Metadata.Key;
import io.grpc.stub.MetadataUtils;

/** A {@link ClientInterceptor} that attaches a provided SDK Harness ID to outgoing messages. */
public class AddHarnessIdInterceptor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we need a repository of interceptors. Just putting it as a thought.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this much more readable as a factory method than as an inline key and MetadataUtils call - it tells us exactly the thing it does.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SG

private static final Key<String> ID_KEY = Key.of("worker_id", Metadata.ASCII_STRING_MARSHALLER);

public static ClientInterceptor create(String harnessId) {
checkArgument(harnessId != null, "harnessId must not be null");
Metadata md = new Metadata();
md.put(ID_KEY, harnessId);
return MetadataUtils.newAttachHeadersInterceptor(md);
}

// This is implemented via MetadataUtils, so we never actually create an instance of this class
private AddHarnessIdInterceptor() {}
}
Loading