Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,17 @@ def __init__(self, packages, options, environment_version, pipeline_url):
if job_type.startswith('FNAPI_'):
runner_harness_override = (
dependency.get_runner_harness_container_image())
self.debug_options.experiments = self.debug_options.experiments or []
if runner_harness_override:
self.debug_options.experiments = self.debug_options.experiments or []
self.debug_options.experiments.append(
'runner_harness_container_image=' + runner_harness_override)
# Add use_multiple_sdk_containers flag if its not already present. Do not
# add the flag if 'no_use_multiple_sdk_containers' is present.
# TODO: Cleanup use_multiple_sdk_containers once we deprecate Python SDK
# till version 2.4.
if ('use_multiple_sdk_containers' not in self.proto.experiments and
'no_use_multiple_sdk_containers' not in self.proto.experiments):
self.debug_options.experiments.append('use_multiple_sdk_containers')
# Experiments
if self.debug_options.experiments:
for experiment in self.debug_options.experiments:
Expand Down
4 changes: 4 additions & 0 deletions sdks/python/apache_beam/runners/worker/data_plane.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from apache_beam.coders import coder_impl
from apache_beam.portability.api import beam_fn_api_pb2
from apache_beam.portability.api import beam_fn_api_pb2_grpc
from apache_beam.runners.worker.worker_id_interceptor import WorkerIdInterceptor

# This module is experimental. No backwards-compatibility guarantees.

Expand Down Expand Up @@ -311,6 +312,9 @@ def create_data_channel(self, remote_grpc_port):
# controlled in a layer above.
options=[("grpc.max_receive_message_length", -1),
("grpc.max_send_message_length", -1)])
# Add workerId to the grpc channel
grpc_channel = grpc.intercept_channel(grpc_channel,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not Simplifying to keep readability.

WorkerIdInterceptor())
self._data_channel_cache[url] = GrpcClientDataChannel(
beam_fn_api_pb2_grpc.BeamFnDataStub(grpc_channel))
return self._data_channel_cache[url]
Expand Down
5 changes: 4 additions & 1 deletion sdks/python/apache_beam/runners/worker/log_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

from apache_beam.portability.api import beam_fn_api_pb2
from apache_beam.portability.api import beam_fn_api_pb2_grpc
from apache_beam.runners.worker.worker_id_interceptor import WorkerIdInterceptor

# This module is experimental. No backwards-compatibility guarantees.

Expand All @@ -48,7 +49,9 @@ class FnApiLogRecordHandler(logging.Handler):

def __init__(self, log_service_descriptor):
super(FnApiLogRecordHandler, self).__init__()
self._log_channel = grpc.insecure_channel(log_service_descriptor.url)
self._log_channel = grpc.intercept_channel(
grpc.insecure_channel(log_service_descriptor.url),
WorkerIdInterceptor())
self._logging_stub = beam_fn_api_pb2_grpc.BeamFnLoggingStub(
self._log_channel)
self._log_entry_queue = queue.Queue()
Expand Down
4 changes: 3 additions & 1 deletion sdks/python/apache_beam/runners/worker/sdk_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from apache_beam.portability.api import beam_fn_api_pb2_grpc
from apache_beam.runners.worker import bundle_processor
from apache_beam.runners.worker import data_plane
from apache_beam.runners.worker.worker_id_interceptor import WorkerIdInterceptor


class SdkHarness(object):
Expand All @@ -42,7 +43,8 @@ class SdkHarness(object):
def __init__(self, control_address, worker_count):
self._worker_count = worker_count
self._worker_index = 0
self._control_channel = grpc.insecure_channel(control_address)
self._control_channel = grpc.intercept_channel(
grpc.insecure_channel(control_address), WorkerIdInterceptor())
self._data_channel_factory = data_plane.GrpcClientDataChannelFactory()
self.workers = queue.Queue()
# one thread is enough for getting the progress report.
Expand Down
59 changes: 59 additions & 0 deletions sdks/python/apache_beam/runners/worker/worker_id_interceptor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Client Interceptor to inject worker_id"""
from __future__ import absolute_import
from __future__ import division
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need print_function and division imports?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we don't need these imports.
I added them based to resolve the compatibility issue between python 2 and 3 based on https://docs.python.org/3/howto/pyporting.html

Should I remove them?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove them, if they are not needed now. I do not see print() or / being used here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure!

from __future__ import print_function

import collections
import os
import uuid

import grpc


class _ClientCallDetails(
collections.namedtuple('_ClientCallDetails',
('method', 'timeout', 'metadata', 'credentials')),
grpc.ClientCallDetails):
pass


class WorkerIdInterceptor(grpc.StreamStreamClientInterceptor):

# TODO: (BEAM-3904) Removed defaulting to UUID when worker_id is not present
# and throw exception in worker_id_interceptor.py after we have rolled out
# the corresponding container changes.
# Unique worker Id for this worker.
_worker_id = os.environ['WORKER_ID'] if os.environ.has_key(
'WORKER_ID') else str(uuid.uuid4())

def __init__(self):
pass

def intercept_stream_stream(self, continuation, client_call_details,
request_iterator):
metadata = []
if client_call_details.metadata is not None:
metadata = list(client_call_details.metadata)
if 'worker_id' in metadata:
raise RuntimeError('Header metadata alreay have worker_id.')
metadata.append(('worker_id', self._worker_id))
new_client_details = _ClientCallDetails(
client_call_details.method, client_call_details.timeout, metadata,
client_call_details.credentials)
return continuation(new_client_details, request_iterator)
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Test for WorkerIdInterceptor"""
from __future__ import absolute_import

import collections
import logging
import unittest

import grpc

from apache_beam.runners.worker.worker_id_interceptor import WorkerIdInterceptor


class _ClientCallDetails(
collections.namedtuple('_ClientCallDetails',
('method', 'timeout', 'metadata', 'credentials')),
grpc.ClientCallDetails):
pass


class WorkerIdInterceptorTest(unittest.TestCase):

def test_worker_id_insertion(self):
worker_id_key = 'worker_id'
headers_holder = {}

def continuation(client_details, request_iterator):
headers_holder.update({
worker_id_key: dict(client_details.metadata).get(worker_id_key)
})

WorkerIdInterceptor._worker_id = 'my_worker_id'

WorkerIdInterceptor().intercept_stream_stream(continuation,
_ClientCallDetails(
None, None, None, None),
[])
self.assertEqual(headers_holder[worker_id_key], 'my_worker_id',
'worker_id_key not set')

def test_failure_when_worker_id_exists(self):
worker_id_key = 'worker_id'
headers_holder = {}

def continuation(client_details, request_iterator):
headers_holder.update({
worker_id_key: dict(client_details.metadata).get(worker_id_key)
})

WorkerIdInterceptor._worker_id = 'my_worker_id'

with self.assertRaises(RuntimeError):
WorkerIdInterceptor().intercept_stream_stream(
continuation, _ClientCallDetails(None, None, {'worker_id': '1'},
None), [])


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()
1 change: 1 addition & 0 deletions sdks/python/container/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func main() {

// (3) Invoke python

os.Setenv("WORKER_ID", *id)
os.Setenv("PIPELINE_OPTIONS", options)
os.Setenv("SEMI_PERSISTENT_DIRECTORY", *semiPersistDir)
os.Setenv("LOGGING_API_SERVICE_DESCRIPTOR", proto.MarshalTextString(&pbpipeline.ApiServiceDescriptor{Url: *loggingEndpoint}))
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def get_version():
'avro>=1.8.1,<2.0.0',
'crcmod>=1.7,<2.0',
'dill==0.2.6',
'grpcio>=1.0,<2',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason for this change? AFAIK, some runner (Dataflow) requires grpcio 1.3? Also, it is became more restrictive for users each time we reduce the list of allowed versions for a dependency.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

grpcio 1.8 is the first python version which allows passing/receiving client headers

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use header to send worker_id in all channels and GRPC headers are only supported after 1.8.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Just try running dataflow runner test before finishing this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, Tried running apache_beam.runners.dataflow.dataflow_runner_test.DataflowRunnerTest

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant running a job on dataflow using: https://github.com/apache/beam/blob/master/.test-infra/jenkins/job_beam_PostCommit_Python_Verify.groovy

Commenting "Run Python PostCommit" on the PR should trigger one.

'grpcio>=1.8,<2',
'hdfs>=2.1.0,<3.0.0',
'httplib2>=0.8,<0.10',
'mock>=1.0.1,<3.0.0',
Expand Down