[BEAM-4145] Populate the worker_id metadata in the Java SDK Harness#5456
[BEAM-4145] Populate the worker_id metadata in the Java SDK Harness#5456tgroh wants to merge 4 commits intoapache:masterfrom
Conversation
|
run java precommit |
|
|
||
| public abstract ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor); | ||
| public final ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) { | ||
| return forDescriptorOnly(apiServiceDescriptor); |
There was a problem hiding this comment.
Shall we get away with this indirection?
Implementer can always overwrite this public method (if we drop the final).
| @Override | ||
| protected ManagedChannel forDescriptor( | ||
| ApiServiceDescriptor apiServiceDescriptor, List<ClientInterceptor> interceptors) { | ||
| return channelFactory.forDescriptor(apiServiceDescriptor, interceptors); |
There was a problem hiding this comment.
The behavior in not well defined.
Though this is an intercepted factory, the original interceptors (interceptors from constructor) are not applied.
|
|
||
| @Override | ||
| public ManagedChannelFactory withInterceptors(List<ClientInterceptor> interceptors) { | ||
| return new InterceptedManagedChannelFactory(channelFactory, interceptors); |
There was a problem hiding this comment.
I suppose this method is for Factory chaining.
Should we pass "this" in that case?
I would suggest to remove chaining in this manner and provide a builder to do the chaining.
Something like this:
ManagedChannelFactoryBuilder.builder(channelFactory).withInterceptors(interceptors)
There was a problem hiding this comment.
We are loosing the original list of interceptors here.
There was a problem hiding this comment.
Yes, I'm also curious about this. If it's not for factory chaining and isn't necessary for another reason, could it be removed?
| return forDescriptorOnly(apiServiceDescriptor); | ||
| } | ||
|
|
||
| protected ManagedChannel forDescriptorOnly(ApiServiceDescriptor descriptor) { |
There was a problem hiding this comment.
Based on the previous comment, we can remove this method.
| } | ||
| } | ||
|
|
||
| private static class InterceptedManagedChannelFactory extends ManagedChannelFactory { |
There was a problem hiding this comment.
What is the purpose of this class?
Based on method ManagedChannelFactory.forDescriptor(ApiServiceDescriptor apiServiceDescriptor, List<ClientInterceptor> interceptors)
All implementations of ManagedChannelFactory should support interception and if not, they should throw exception.
|
|
||
| LOG.info("Fn Harness started"); | ||
| EnumMap<BeamFnApi.InstructionRequest.RequestCase, | ||
| EnumMap< |
There was a problem hiding this comment.
Please reformat the file.
| options, | ||
| loggingApiServiceDescriptor, | ||
| channelFactory::forDescriptor)) { | ||
| try (BeamFnLoggingClient logging = |
There was a problem hiding this comment.
logging is not used. Shall we remove it?
There was a problem hiding this comment.
No. This is done to close the client automatically at the completion of the try block, and the client must exist to send intercepted LOG messages to the logging server.
There was a problem hiding this comment.
Can we add a comment explaining this as logging is unused which makes this code block non intuitive.
| @Override | ||
| public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) { | ||
| return InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build(); | ||
| public static ClientInterceptor create(String harnessId) { |
There was a problem hiding this comment.
Shall we have a null check for harnessId?
| import io.grpc.stub.MetadataUtils; | ||
|
|
||
| /** A {@link ClientInterceptor} that attaches a provided SDK Harness ID to outgoing messages. */ | ||
| public class AddHarnessIdInterceptor { |
There was a problem hiding this comment.
Not sure if we need a repository of interceptors. Just putting it as a thought.
There was a problem hiding this comment.
I find this much more readable as a factory method than as an inline key and MetadataUtils call - it tells us exactly the thing it does.
| StreamObserverClientFactory<InstructionRequest, BeamFnApi.InstructionResponse>, | ||
| StreamObserver<BeamFnApi.InstructionRequest>, | ||
| StreamObserver<BeamFnApi.InstructionResponse>> | ||
| StreamObserverClientFactory<InstructionRequest, InstructionResponse>, |
tgroh
left a comment
There was a problem hiding this comment.
I've changed the implementation of ManagedChannelFactory a little. Instead of having multiple methods, the public API is exactly forDescriptor(ApiServiceDescriptor), and the abstract API is builderFor(ApiServiceDescriptor), which returns a ManagedChannelBuilder - which can have interceptors attached, or can be otherwise modified as appropriate for the factory.
| ObjectMapper.findModules(ReflectHelpers.findClassLoader())); | ||
| PipelineOptions options = objectMapper.readValue( | ||
| System.getenv(PIPELINE_OPTIONS), PipelineOptions.class); | ||
| String id = System.getenv(HARNESS_ID); |
|
|
||
| LOG.info("Fn Harness started"); | ||
| EnumMap<BeamFnApi.InstructionRequest.RequestCase, | ||
| EnumMap< |
| options, | ||
| loggingApiServiceDescriptor, | ||
| channelFactory::forDescriptor)) { | ||
| try (BeamFnLoggingClient logging = |
There was a problem hiding this comment.
No. This is done to close the client automatically at the completion of the try block, and the client must exist to send intercepted LOG messages to the logging server.
| @Override | ||
| public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) { | ||
| return InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build(); | ||
| public static ClientInterceptor create(String harnessId) { |
| import io.grpc.stub.MetadataUtils; | ||
|
|
||
| /** A {@link ClientInterceptor} that attaches a provided SDK Harness ID to outgoing messages. */ | ||
| public class AddHarnessIdInterceptor { |
There was a problem hiding this comment.
I find this much more readable as a factory method than as an inline key and MetadataUtils call - it tells us exactly the thing it does.
| StreamObserverClientFactory<InstructionRequest, BeamFnApi.InstructionResponse>, | ||
| StreamObserver<BeamFnApi.InstructionRequest>, | ||
| StreamObserver<BeamFnApi.InstructionResponse>> | ||
| StreamObserverClientFactory<InstructionRequest, InstructionResponse>, |
youngoli
left a comment
There was a problem hiding this comment.
Approving, with one comment.
|
|
||
| @Override | ||
| public ManagedChannelFactory withInterceptors(List<ClientInterceptor> interceptors) { | ||
| return new InterceptedManagedChannelFactory(channelFactory, interceptors); |
There was a problem hiding this comment.
Yes, I'm also curious about this. If it's not for factory chaining and isn't necessary for another reason, could it be removed?
This populates a metadata field with the Harness ID.
This duplicates an available implementation in java-fn-execution, which is the one we use everywhere.
Expand the public API to add a `withInterceptors` method, which returns a ManagedChannelFactory which will create calls via the same underlying implementation, but with the provided interceptors attached. Expand the implementation requirements to take a list of ClientInterceptors, which must intercept any returned channel. The public API to get a channel from a ManagedChannelFactory remains the same.
Update the java container boot code to populate an environment variable with the harness ID. Add an ID parameter to FnHarness main methods. Pass the ID to the BeamFnControlClient in its constructor, and attach it to all outgoing `control` calls.
angoenka
left a comment
There was a problem hiding this comment.
PR looks good overall.
There is 1 major underlying comment at https://github.com/apache/beam/pull/5456/files#diff-379a0983f269015757adc77995b9eaa4R118
and a minor comment on commenting of 'logging'
|
|
||
| @Override | ||
| public ManagedChannelFactory withInterceptors(List<ClientInterceptor> interceptors) { | ||
| return new InterceptedManagedChannelFactory(channelFactory, interceptors); |
There was a problem hiding this comment.
We are loosing the original list of interceptors here.
| options, | ||
| loggingApiServiceDescriptor, | ||
| channelFactory::forDescriptor)) { | ||
| try (BeamFnLoggingClient logging = |
There was a problem hiding this comment.
Can we add a comment explaining this as logging is unused which makes this code block non intuitive.
| import io.grpc.stub.MetadataUtils; | ||
|
|
||
| /** A {@link ClientInterceptor} that attaches a provided SDK Harness ID to outgoing messages. */ | ||
| public class AddHarnessIdInterceptor { |
|
This PR is replaced by #5680. |
The ID is required for any control service with multiple clients, which is the general case.
Enable
ManagedChannelFactoryimplementations to register interceptors on all of thecreated channels.
Add an interceptor which attaches the ID to the metadata. Use this interceptor in the
Sdk Harness Control Client.
Populate the ID passed via the container contract in the boot go code.
Follow this checklist to help us incorporate your contribution quickly and easily:
[BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replaceBEAM-XXXwith the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.It will help us expedite review of your Pull Request if you tag someone (e.g.
@username) to look at it.