Skip to content

[BEAM-3418] Send worker_id in all grpc channels to runner harness#4587

Merged
aaltay merged 1 commit intoapache:masterfrom
angoenka:multiprocess_new
Mar 22, 2018
Merged

[BEAM-3418] Send worker_id in all grpc channels to runner harness#4587
aaltay merged 1 commit intoapache:masterfrom
angoenka:multiprocess_new

Conversation

@angoenka
Copy link
Contributor

@angoenka angoenka commented Feb 2, 2018

DESCRIPTION HERE


Follow this checklist to help us incorporate your contribution quickly and easily:

  • Make sure there is a JIRA issue filed for the change (usually before you start working on it). Trivial changes like typos do not require a JIRA issue. Your pull request should address just this issue, without pulling in other changes.
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue.
  • Write a pull request description that is detailed enough to understand:
    • What the pull request does
    • Why it does it
    • How it does it
    • Why this approach
  • Each commit in the pull request should have a meaningful subject line and body.
  • Run mvn clean verify to make sure basic checks pass. A more thorough check will be performed on your pull request automatically.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

@angoenka
Copy link
Contributor Author

angoenka commented Feb 2, 2018

@aaltay @lukecwik @robertwb Can you please take a look!

'avro>=1.8.1,<2.0.0',
'crcmod>=1.7,<2.0',
'dill==0.2.6',
'grpcio>=1.0,<2',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason for this change? AFAIK, some runner (Dataflow) requires grpcio 1.3? Also, it is became more restrictive for users each time we reduce the list of allowed versions for a dependency.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

grpcio 1.8 is the first python version which allows passing/receiving client headers

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use header to send worker_id in all channels and GRPC headers are only supported after 1.8.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Just try running dataflow runner test before finishing this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, Tried running apache_beam.runners.dataflow.dataflow_runner_test.DataflowRunnerTest

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant running a job on dataflow using: https://github.com/apache/beam/blob/master/.test-infra/jenkins/job_beam_PostCommit_Python_Verify.groovy

Commenting "Run Python PostCommit" on the PR should trigger one.

class WorkerIdInterceptor(grpc.StreamStreamClientInterceptor):

# Unique worker Id for this worker.
_worker_id = os.environ['WORKER_ID'] if os.environ.has_key(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@angoenka angoenka Feb 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, For testing I am modifying the internal version of boot.go. Currently its a bit of hack as I artificially generate the id in boot.go for testing. The hack will be removed once we have the multi container thing setup.
I was using woker_id to be more explicit but we can change it to just id.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry if I was unclear: this PR should also change the referenced boot.go code to set the WORKER_ID. The code here should also fail if that ID is not found -- the ID is set by the runner so that the gRPC servers can know who's who; they are not just unique IDs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the relevant boot.go changes

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to fail if WORKER_ID is not found?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For backward compatibility of containers, I would like to assign a UUID if worker_id is not provided.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to be backward compatible? This is mostly new code with no production usage. I would prefer to not have it succeed like this. But if you think this is necessary in the interim, we can add a TODO to remove the UUID generation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created a jira issue BEAM-3904 to clean it up.
I want to keep keep it around to decouple sdk changes to internal container changes.

class WorkerIdInterceptor(grpc.StreamStreamClientInterceptor):

# Unique worker Id for this worker.
_worker_id = os.environ['WORKER_ID'] if os.environ.has_key(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry if I was unclear: this PR should also change the referenced boot.go code to set the WORKER_ID. The code here should also fail if that ID is not found -- the ID is set by the runner so that the gRPC servers can know who's who; they are not just unique IDs.

@angoenka
Copy link
Contributor Author

angoenka commented Feb 9, 2018

@herohde Please have a look. I have updated the PR to include the boot.go changes. However I am not sure how the actual id is passed to the boot.go as a flag. I made similar change in internal version of boot.go and using the new worker image.

@angoenka
Copy link
Contributor Author

retest this please

# the flag if 'NO_MULTIPLE_SDK_CONTAINERS' is present.
# TODO: Cleanup MULTIPLE_SDK_CONTAINERS once we depricate Python SDK till
# version 2.4.
if ('MULTIPLE_SDK_CONTAINERS' not in self.proto.experiments and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How confident are we to make this a default behavior for 2.5?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expect this CL to get in 2.5.
In a way this flag is required to help router distinguish between old SDK (sdk till 2.4) and new SDK (sdk from 2.5). So once we do not have any sdk which is older than 2.5, we don't need to distinguish between sdk atleast for MultiSdk functionality and hence it automatically becomes the default feature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@robertwb @aaltay I am planning to make this feature opt out for new SDKs. Instead should we keep it opt in?

Copy link
Contributor

@herohde herohde left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM for the boot code

self.proto.experiments.append(experiment)
# Add MULTIPLE_SDK_CONTAINERS flag if its not already present. Do not add
# the flag if 'NO_MULTIPLE_SDK_CONTAINERS' is present.
# TODO: Cleanup MULTIPLE_SDK_CONTAINERS once we depricate Python SDK till
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

depricate -> deprecate

@angoenka angoenka force-pushed the multiprocess_new branch 2 times, most recently from bd777cf to 8c92cce Compare March 16, 2018 23:53
@angoenka
Copy link
Contributor Author

@aaltay Can you please take a look?

class WorkerIdInterceptor(grpc.StreamStreamClientInterceptor):

# Unique worker Id for this worker.
_worker_id = os.environ['WORKER_ID'] if os.environ.has_key(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to fail if WORKER_ID is not found?

#
"""Client Interceptor to inject worker_id"""
from __future__ import absolute_import
from __future__ import division
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need print_function and division imports?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we don't need these imports.
I added them based to resolve the compatibility issue between python 2 and 3 based on https://docs.python.org/3/howto/pyporting.html

Should I remove them?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove them, if they are not needed now. I do not see print() or / being used here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure!

metadata = []
if client_call_details.metadata is not None:
metadata = list(client_call_details.metadata)
metadata.append(('worker_id', self._worker_id))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be an error (or expected) for client_call_details to already have worker_id?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be an error.

self._worker_count = worker_count
self._worker_index = 0
self._control_channel = grpc.insecure_channel(control_address)
self._control_channel = grpc.intercept_channel(self._control_channel,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we simplify this as:

self._control_channel = grpc.intercept_channel(grpc.insecure_channel(control_address), WorkerIdInterceptor())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure!

def __init__(self, log_service_descriptor):
super(FnApiLogRecordHandler, self).__init__()
self._log_channel = grpc.insecure_channel(log_service_descriptor.url)
self._log_channel = grpc.intercept_channel(self._log_channel,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Same simplification comment applies here.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure!

if (job_type.startswith('FNAPI_') and
'use_multiple_sdk_containers' not in self.proto.experiments and
'no_use_multiple_sdk_containers' not in self.proto.experiments):
self.proto.experiments.append('use_multiple_sdk_containers')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is preferable to modify debug_options.experiments (as done above for runner_harness_override). This also properly helps with updating the user visible pipeline options in the UI, and it will auto added to the proto by the loop above.

It would also help combine things related to if job_type.startswith('FNAPI_'): in a single place.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes Sense!

class WorkerIdInterceptor(grpc.StreamStreamClientInterceptor):

# Unique worker Id for this worker.
_worker_id = os.environ['WORKER_ID'] if os.environ.has_key(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For backward compatibility of containers, I would like to assign a UUID if worker_id is not provided.

metadata = []
if client_call_details.metadata is not None:
metadata = list(client_call_details.metadata)
metadata.append(('worker_id', self._worker_id))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be an error.

self._worker_count = worker_count
self._worker_index = 0
self._control_channel = grpc.insecure_channel(control_address)
self._control_channel = grpc.intercept_channel(self._control_channel,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure!

def __init__(self, log_service_descriptor):
super(FnApiLogRecordHandler, self).__init__()
self._log_channel = grpc.insecure_channel(log_service_descriptor.url)
self._log_channel = grpc.intercept_channel(self._log_channel,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure!

options=[("grpc.max_receive_message_length", -1),
("grpc.max_send_message_length", -1)])
# Add workerId to the grpc channel
grpc_channel = grpc.intercept_channel(grpc_channel,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not Simplifying to keep readability.

if (job_type.startswith('FNAPI_') and
'use_multiple_sdk_containers' not in self.proto.experiments and
'no_use_multiple_sdk_containers' not in self.proto.experiments):
self.proto.experiments.append('use_multiple_sdk_containers')
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes Sense!

@aaltay
Copy link
Member

aaltay commented Mar 22, 2018

Could you squash your commits?

@angoenka
Copy link
Contributor Author

Sure, I will squash them.

Adding use_multiple_sdk_containers flag for FNAPI pipelines.
@aaltay aaltay merged commit 6740ead into apache:master Mar 22, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants