Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-11736] Propagate pipeline options to direct runner #13723

Merged
merged 8 commits into from Feb 2, 2021

Conversation

dandy10
Copy link
Contributor

@dandy10 dandy10 commented Jan 11, 2021

TLDR; Propagates pipeline options to the python DirectRunner when running in multi_processing mode.

When running a test pipeline with the DirectRunner, and with the direct_running_mode=multi_processing I found that pipeline options were not set as expected in the worker processes, causing the pipeline to fail. Specifically I required filesystem related options to be set in order to access a remote filesystem. It seems that the sdk_worker_main.py runner script expects the options to be populated into an environment variable (PIPELINE_OPTIONS) prior to running, which was not the case for the SubprocessSdkWorker.

It also seems that the sdk_worker_main.py needs to set the loaded pipeline options using the global filesystems.FileSystems.set_options which wasn't the case before. This seems more surprising to me, because I would've thought that this would have blocked other filesystems running through the various portable runners which go through this script. I don't have a great understanding of all of the different paths through which to configure the workers though, so perhaps this can also occur through different paths (such as the grpc control channels?). It would be great to get some guidance if my understanding is incorrect.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK Dataflow Flink Samza Spark Twister2
Go Build Status --- Build Status --- Build Status ---
Java Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status
Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
--- Build Status ---
XLang Build Status Build Status Build Status --- Build Status ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status --- --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@dandy10
Copy link
Contributor Author

dandy10 commented Jan 11, 2021

@pabloem it seems like you might have touched some nearby code, could you suggest who to add for review?

@pabloem pabloem self-requested a review January 11, 2021 20:51
@dandy10
Copy link
Contributor Author

dandy10 commented Jan 14, 2021

I've looked at the failing tests and as far as I can tell they are unrelated to the changes

@dandy10
Copy link
Contributor Author

dandy10 commented Jan 16, 2021

I've fixed the problem. The issue was with the snippets tests. The snippets tests were creating multiple PipelineOptions subclasses with conflicting/duplicated arguments. Calling PipelineOptions.get_all_options results in inspecting the subclasses of the PipelineOptions class. Even though the temporary PipelineOptions classes have fallen out of scope that had not yet been reflected in the PipelineOptions state. It seems that the calling gc.collect() is the recommended way to force an update, so I've added that in the test tearDown call.

@dandy10 dandy10 force-pushed the direct-runner-pipeline-options branch from e1fdf87 to f3b15fe Compare January 16, 2021 13:45
@codecov
Copy link

codecov bot commented Jan 16, 2021

Codecov Report

Merging #13723 (f3b15fe) into master (d0b6fdf) will increase coverage by 0.02%.
The diff coverage is 88.23%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master   #13723      +/-   ##
==========================================
+ Coverage   82.74%   82.77%   +0.02%     
==========================================
  Files         466      466              
  Lines       57525    57540      +15     
==========================================
+ Hits        47600    47628      +28     
+ Misses       9925     9912      -13     
Impacted Files Coverage Δ
...nners/portability/fn_api_runner/worker_handlers.py 80.00% <ø> (ø)
...ache_beam/runners/portability/local_job_service.py 80.53% <33.33%> (-0.64%) ⬇️
...python/apache_beam/runners/direct/direct_runner.py 93.94% <100.00%> (+0.09%) ⬆️
...apache_beam/runners/portability/portable_runner.py 75.00% <100.00%> (+0.44%) ⬆️
...thon/apache_beam/runners/worker/sdk_worker_main.py 78.18% <100.00%> (+2.25%) ⬆️
...ks/python/apache_beam/runners/worker/sdk_worker.py 89.69% <0.00%> (-0.16%) ⬇️
...hon/apache_beam/runners/worker/bundle_processor.py 93.83% <0.00%> (+0.12%) ⬆️
sdks/python/apache_beam/dataframe/frames.py 91.88% <0.00%> (+0.41%) ⬆️
sdks/python/apache_beam/io/source_test_utils.py 88.73% <0.00%> (+0.45%) ⬆️
...runners/interactive/display/pcoll_visualization.py 85.86% <0.00%> (+0.52%) ⬆️
... and 5 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update d0b6fdf...f3b15fe. Read the comment docs.

@dandy10
Copy link
Contributor Author

dandy10 commented Jan 16, 2021

retest this please

@dandy10
Copy link
Contributor Author

dandy10 commented Jan 16, 2021

@pabloem is there a command for rerunning specific unit test suite? I've run the failing test for macos-latest-py37 locally and it seems to be passing so I suspect it is flakey (given all others pass)

@ConverJens
Copy link

@dandy10 Great catch and work with this one! I believe that I've also stumbled upon this issue.

@dandy10 @pabloem

I'm trying to get s3 (Minio) to work for TFX and currently this issue is blocking me.

These are my supplied pipeline options:

'--direct_running_mode=multi_processing',
f'--direct_num_workers={NR_OF_CPUS}',
'--s3_endpoint_url=minio-service.kubeflow:9000',
f'--s3_access_key={ACCESS_KEY}',
f'--s3_secret_access_key={SECRET_ACCESS_KEY},
'--s3_verify=False'

and this is the error that I'm facing:

Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 742, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 867, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/io/iobase.py", line 1129, in process
    self.writer = self.sink.open_writer(init_result, str(uuid.uuid4()))
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/options/value_provider.py", line 135, in _f
    return fnc(self, *args, **kwargs)
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/io/filebasedsink.py", line 196, in open_writer
    return FileBasedSinkWriter(self, writer_path)
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/io/filebasedsink.py", line 417, in __init__
    self.temp_handle = self.sink.open(temp_shard_path)
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/options/value_provider.py", line 135, in _f
    return fnc(self, *args, **kwargs)
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/io/filebasedsink.py", line 138, in open
    return FileSystems.create(temp_path, self.mime_type, self.compression_type)
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/io/filesystems.py", line 229, in create
    return filesystem.create(path, mime_type, compression_type)
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/io/aws/s3filesystem.py", line 171, in create
    return self._path_open(path, 'wb', mime_type, compression_type)
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/io/aws/s3filesystem.py", line 151, in _path_open
    raw_file = s3io.S3IO(options=self._options).open(
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/io/aws/s3io.py", line 63, in __init__
    raise ValueError('Must provide one of client or options')
ValueError: Must provide one of client or options

@pabloem Any chance this could be reviewed and merged ASAP? This would be extremely helpful.

@ConverJens
Copy link

@dandy10 I tried to use beam from this PR but I ended up with the same error still.

I simply cloned the repo, installed dependencies from build-requirements.txt and ran setup.py which completed successfully. Did I miss something or do I have to build anything else as well?

@dandy10
Copy link
Contributor Author

dandy10 commented Jan 20, 2021

@ConverJens forgive the silly question, but when you say you cloned the repo, do you mean that you cloned my fork and checked out this branch locally? I've just successfully rerun a pipeline from this branch (installing into a clean virtual environment) which requires setting a custom s3_endpoint, using the direct runner with multi_processing so I think it should work as is. Perhaps you could place a print statement into your sdk_worker_main.py to verify if the PIPELINE_OPTIONS environment variable is set, and the expected options are present?

Also worth checking is whether you also have another version of apache beam installed locally. The subprocess runs python -m apache_beam.runners.worker.sdk_worker_main so perhaps that resolved to another version, rather than the one with the fix?

@ConverJens
Copy link

@dandy10 Sorry, I forgot to checkout your branch from your fork. Pipeline options now seems to be passed on but I still get an error though.

I've passed endpoint args but beam is still trying to connect to aws rather than the specified endpoint. Do you why that is?
My supplied args:

'--direct_running_mode=multi_processing',
f'--direct_num_workers={NR_OF_CPUS}',
'--s3_endpoint_url=minio-service.kubeflow:9000',
f'--s3_access_key={ACCESS_KEY}',
f'--s3_secret_access_key={SECRET_ACCESS_KEY},
'--s3_verify=False'

The error I'm getting:

INFO:apache_beam.runners.portability.local_job_service:Worker: severity: ERROR timestamp {   seconds: 1611236675   nanos: 354464530 } message: "Error in _start_upload while inserting file s3://pipelines/tfx/trace_pipeline_e2e/FileBasedExampleGenWithDate/examples/1414/eval/beam-temp-data_tfrecord-178216b45bee11ebab6f6a446dded339/8bc9a559-3429-408f-b93d-e3a64174ec56.data_tfrecord.gz: Traceback (most recent call last):\n  File \"/usr/local/lib/python3.7/dist-packages/urllib3/connection.py\", line 170, in _new_conn\n    (self._dns_host, self.port), self.timeout, **extra_kw\n  File \"/usr/local/lib/python3.7/dist-packages/urllib3/util/connection.py\", line 96, in create_connection\n    raise err\n  File \"/usr/local/lib/python3.7/dist-packages/urllib3/util/connection.py\", line 86, in create_connection\n    sock.connect(sa)\nsocket.timeout: timed out\n\nDuring handling of the above exception, another exception occurred:\n\nTraceback (most recent call last):\n  File \"/usr/local/lib/python3.7/dist-packages/botocore/httpsession.py\", line 317, in send\n    chunked=self._chunked(request.headers),\n  File \"/usr/local/lib/python3.7/dist-packages/urllib3/connectionpool.py\", line 756, in urlopen\n    method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]\n  File \"/usr/local/lib/python3.7/dist-packages/urllib3/util/retry.py\", line 506, in increment\n    raise six.reraise(type(error), error, _stacktrace)\n  File \"/usr/local/lib/python3.7/dist-packages/urllib3/packages/six.py\", line 735, in reraise\n    raise value\n  File \"/usr/local/lib/python3.7/dist-packages/urllib3/connectionpool.py\", line 706, in urlopen\n    chunked=chunked,\n  File \"/usr/local/lib/python3.7/dist-packages/urllib3/connectionpool.py\", line 382, in _make_request\n    self._validate_conn(conn)\n  File \"/usr/local/lib/python3.7/dist-packages/urllib3/connectionpool.py\", line 1010, in _validate_conn\n    conn.connect()\n  File \"/usr/local/lib/python3.7/dist-packages/urllib3/connection.py\", line 353, in connect\n    conn = self._new_conn()\n  File \"/usr/local/lib/python3.7/dist-packages/urllib3/connection.py\", line 177, in _new_conn\n    % (self.host, self.timeout),\nurllib3.exceptions.ConnectTimeoutError: (<botocore.awsrequest.AWSHTTPSConnection object at 0x7fd2ee57f950>, \'Connection to pipelines.s3.amazonaws.com timed out. (connect timeout=60)\')\n\nDuring handling of the above exception, another exception occurred:\n\nTraceback (most recent call last):\n  File \"/usr/local/lib/python3.7/dist-packages/apache_beam/io/aws/clients/s3/boto3_client.py\", line 171, in create_multipart_upload\n    ContentType=request.mime_type)\n  File \"/usr/local/lib/python3.7/dist-packages/botocore/client.py\", line 357, in _api_call\n    return self._make_api_call(operation_name, kwargs)\n  File \"/usr/local/lib/python3.7/dist-packages/botocore/client.py\", line 663, in _make_api_call\n    operation_model, request_dict, request_context)\n  File \"/usr/local/lib/python3.7/dist-packages/botocore/client.py\", line 682, in _make_request\n    return self._endpoint.make_request(operation_model, request_dict)\n  File \"/usr/local/lib/python3.7/dist-packages/botocore/endpoint.py\", line 102, in make_request\n    return self._send_request(request_dict, operation_model)\n  File \"/usr/local/lib/python3.7/dist-packages/botocore/endpoint.py\", line 137, in _send_request\n    success_response, exception):\n  File \"/usr/local/lib/python3.7/dist-packages/botocore/endpoint.py\", line 256, in _needs_retry\n    caught_exception=caught_exception, request_dict=request_dict)\n  File \"/usr/local/lib/python3.7/dist-packages/botocore/hooks.py\", line 356, in emit\n    return self._emitter.emit(aliased_event_name, **kwargs)\n  File \"/usr/local/lib/python3.7/dist-packages/botocore/hooks.py\", line 228, in emit\n    return self._emit(event_name, kwargs)\n  File \"/usr/local/lib/python3.7/dist-packages/botocore/hooks.py\", line 211, in _emit\n    response = handler(**kwargs)\n  File \"/usr/local/lib/python3.7/dist-packages/botocore/retryhandler.py\", line 183, in __call__\n    if self._checker(attempts, response, caught_exception):\n  File \"/usr/local/lib/python3.7/dist-packages/botocore/retryhandler.py\", line 251, in __call__\n    caught_exception)\n  File \"/usr/local/lib/python3.7/dist-packages/botocore/retryhandler.py\", line 277, in _should_retry\n    return self._checker(attempt_number, response, caught_exception)\n  File \"/usr/local/lib/python3.7/dist-packages/botocore/retryhandler.py\", line 317, in __call__\n    caught_exception)\n  File \"/usr/local/lib/python3.7/dist-packages/botocore/retryhandler.py\", line 223, in __call__\n    attempt_number, caught_exception)\n  File \"/usr/local/lib/python3.7/dist-packages/botocore/retryhandler.py\", line 359, in _check_caught_exception\n    raise caught_exception\n  File \"/usr/local/lib/python3.7/dist-packages/botocore/endpoint.py\", line 200, in _do_get_response\n    http_response = self._send(request)\n  File \"/usr/local/lib/python3.7/dist-packages/botocore/endpoint.py\", line 269, in _send\n    return self.http_session.send(request)\n  File \"/usr/local/lib/python3.7/dist-packages/botocore/httpsession.py\", line 341, in send\n    raise ConnectTimeoutError(endpoint_url=request.url, error=e)\nbotocore.exceptions.ConnectTimeoutError: Connect timeout on endpoint URL: \"
https://pipelines.s3.amazonaws.com/tfx/trace_pipeline_e2e/FileBasedExampleGenWithDate/examples/1414/eval/beam-temp-data_tfrecord-178216b45bee11ebab6f6a446dded339/8bc9a559-3429-408f-b93d-e3a64174ec56.data_tfrecord.gz?uploads
\"\n\nDuring handling of the above exception, another exception occurred:\n\nTraceback (most recent call last):\n  File \"/usr/local/lib/python3.7/dist-packages/apache_beam/io/aws/s3io.py\", line 566, in _start_upload\n    response = self._client.create_multipart_upload(request)\n  File \"/usr/local/lib/python3.7/dist-packages/apache_beam/io/aws/clients/s3/boto3_client.py\", line 174, in create_multipart_upload\n    message = e.response[\'Error\'][\'Message\']\nAttributeError: \'ConnectTimeoutError\' object has no attribute \'response\'\n" instruction_id: "bundle_39" transform_id: "WriteSplit[eval]/Write/Write/WriteImpl/WriteBundles" log_location: "/usr/local/lib/python3.7/dist-packages/apache_beam/io/aws/s3io.py:572" thread: "Thread-14" 
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/urllib3/connection.py", line 170, in _new_conn
    (self._dns_host, self.port), self.timeout, **extra_kw
  File "/usr/local/lib/python3.7/dist-packages/urllib3/util/connection.py", line 96, in create_connection
    raise err
  File "/usr/local/lib/python3.7/dist-packages/urllib3/util/connection.py", line 86, in create_connection
    sock.connect(sa)
socket.timeout: timed out
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/botocore/httpsession.py", line 317, in send
    chunked=self._chunked(request.headers),
  File "/usr/local/lib/python3.7/dist-packages/urllib3/connectionpool.py", line 756, in urlopen
    method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
  File "/usr/local/lib/python3.7/dist-packages/urllib3/util/retry.py", line 506, in increment
    raise six.reraise(type(error), error, _stacktrace)
  File "/usr/local/lib/python3.7/dist-packages/urllib3/packages/six.py", line 735, in reraise
    raise value
  File "/usr/local/lib/python3.7/dist-packages/urllib3/connectionpool.py", line 706, in urlopen
    chunked=chunked,
  File "/usr/local/lib/python3.7/dist-packages/urllib3/connectionpool.py", line 382, in _make_request
    self._validate_conn(conn)
  File "/usr/local/lib/python3.7/dist-packages/urllib3/connectionpool.py", line 1010, in _validate_conn
    conn.connect()
  File "/usr/local/lib/python3.7/dist-packages/urllib3/connection.py", line 353, in connect
    conn = self._new_conn()
  File "/usr/local/lib/python3.7/dist-packages/urllib3/connection.py", line 177, in _new_conn
    % (self.host, self.timeout),
urllib3.exceptions.ConnectTimeoutError: (<botocore.awsrequest.AWSHTTPSConnection object at 0x7fd2ee57f950>, 'Connection to pipelines.s3.amazonaws.com timed out. (connect timeout=60)')

During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/io/aws/clients/s3/boto3_client.py", line 171, in create_multipart_upload
    ContentType=request.mime_type)
  File "/usr/local/lib/python3.7/dist-packages/botocore/client.py", line 357, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/usr/local/lib/python3.7/dist-packages/botocore/client.py", line 663, in _make_api_call
    operation_model, request_dict, request_context)
  File "/usr/local/lib/python3.7/dist-packages/botocore/client.py", line 682, in _make_request
    return self._endpoint.make_request(operation_model, request_dict)
  File "/usr/local/lib/python3.7/dist-packages/botocore/endpoint.py", line 102, in make_request
    return self._send_request(request_dict, operation_model)
  File "/usr/local/lib/python3.7/dist-packages/botocore/endpoint.py", line 137, in _send_request
    success_response, exception):
  File "/usr/local/lib/python3.7/dist-packages/botocore/endpoint.py", line 256, in _needs_retry
    caught_exception=caught_exception, request_dict=request_dict)
  File "/usr/local/lib/python3.7/dist-packages/botocore/hooks.py", line 356, in emit
    return self._emitter.emit(aliased_event_name, **kwargs)
  File "/usr/local/lib/python3.7/dist-packages/botocore/hooks.py", line 228, in emit
    return self._emit(event_name, kwargs)
  File "/usr/local/lib/python3.7/dist-packages/botocore/hooks.py", line 211, in _emit
    response = handler(**kwargs)
  File "/usr/local/lib/python3.7/dist-packages/botocore/retryhandler.py", line 183, in __call__
    if self._checker(attempts, response, caught_exception):
  File "/usr/local/lib/python3.7/dist-packages/botocore/retryhandler.py", line 251, in __call__
    caught_exception)
  File "/usr/local/lib/python3.7/dist-packages/botocore/retryhandler.py", line 277, in _should_retry
    return self._checker(attempt_number, response, caught_exception)
  File "/usr/local/lib/python3.7/dist-packages/botocore/retryhandler.py", line 317, in __call__
    caught_exception)
  File "/usr/local/lib/python3.7/dist-packages/botocore/retryhandler.py", line 223, in __call__
    attempt_number, caught_exception)
  File "/usr/local/lib/python3.7/dist-packages/botocore/retryhandler.py", line 359, in _check_caught_exception
    raise caught_exception
  File "/usr/local/lib/python3.7/dist-packages/botocore/endpoint.py", line 200, in _do_get_response
    http_response = self._send(request)
  File "/usr/local/lib/python3.7/dist-packages/botocore/endpoint.py", line 269, in _send
    return self.http_session.send(request)
  File "/usr/local/lib/python3.7/dist-packages/botocore/httpsession.py", line 341, in send
    raise ConnectTimeoutError(endpoint_url=request.url, error=e)
botocore.exceptions.ConnectTimeoutError: Connect timeout on endpoint URL: "
https://pipelines.s3.amazonaws.com/tfx/trace_pipeline_e2e/FileBasedExampleGenWithDate/examples/1414/eval/beam-temp-data_tfrecord-178216b45bee11ebab6f6a446dded339/8bc9a559-3429-408f-b93d-e3a64174ec56.data_tfrecord.gz?uploads
"

Do I need to specify s3_region, tokens etc as well even when I try to access a local Minio instance?

Which beam args are you testing with?

Any help would be appreciated!

@dandy10
Copy link
Contributor Author

dandy10 commented Jan 21, 2021

@ConverJens no problem. Are you sure the endpoint url has been passed correctly? It looks like it's still trying to connect to aws: Connection to pipelines.s3.amazonaws.com timed out. (connect timeout=60) (or is that what minio-service.kubeflow resolves to within your cluster?).

I think there might be a typo in one of the options, how about when you change --s3_access_key to --s3_access_key_id?

I'm just using the s3_endpoint_url, s3_access_key_id, s3_secret_access_key (not setting --s3_verify=False although I don't think that would be causing the issue you're seeing). I did have some timeouts when testing if I didn't set the cert bundle correctly, maybe something similar is happening here? You can override the default with the AWS_CA_BUNDLE environment variable.

@ConverJens
Copy link

@dandy10 I corrected my s3_access_key argument but still got the same error.

The weird thing is that I believe that I have passed them on correctly because if switch to in_memory and one worker it works as expected and beam writes to minio, so something seems slightly off.

I think there is a discrepancy on how the options are passed on to the filesystem itself. Looking at the first line of the error message I'm getting it first says that:

"Error in _start_upload while inserting file s3://pipelines/tfx/trace_pipeline_e2e/FileBasedExampleGenWithDate/examples/1438/train/beam-temp-data_tfrecord-286f19285c8d11ebbb52a24bbfe454c5/319ecf64-85bb-46bd-afcd-a355144724a7.data_tfrecord.gz:

which is the correct endpoint. Later on the same line it says:

botocore.exceptions.ConnectTimeoutError: Connect timeout on endpoint URL: \"
https://pipelines.s3.amazonaws.com/tfx/trace_pipeline_e2e/FileBasedExampleGenWithDate/examples/1438/train/beam-temp-data_tfrecord-286f19285c8d11ebbb52a24bbfe454c5/319ecf64-85bb-46bd-afcd-a355144724a7.data_tfrecord.gz?uploads

which seems to indicate that the s3 client is still trying to reach amazon.com which I assume is a default value.

Do you have any idea why that is?

Below is the full first line again.

INFO:apache_beam.runners.portability.local_job_service:Worker: severity: ERROR timestamp {   seconds: 1611304997   nanos: 298857688 } message: "Error in _start_upload while inserting file s3://pipelines/tfx/trace_pipeline_e2e/FileBasedExampleGenWithDate/examples/1438/train/beam-temp-data_tfrecord-286f19285c8d11ebbb52a24bbfe454c5/319ecf64-85bb-46bd-afcd-a355144724a7.data_tfrecord.gz: Traceback (most recent call last):\n  File \"/usr/local/lib/python3.7/dist-packages/urllib3/connection.py\", line 170, in _new_conn\n    (self._dns_host, self.port), self.timeout, **extra_kw\n  File \"/usr/local/lib/python3.7/dist-packages/urllib3/util/connection.py\", line 96, in create_connection\n    raise err\n  File \"/usr/local/lib/python3.7/dist-packages/urllib3/util/connection.py\", line 86, in create_connection\n    sock.connect(sa)\nsocket.timeout: timed out\n\nDuring handling of the above exception, another exception occurred:\n\nTraceback (most recent call last):\n  File \"/usr/local/lib/python3.7/dist-packages/botocore/httpsession.py\", line 317, in send\n    chunked=self._chunked(request.headers),\n  File \"/usr/local/lib/python3.7/dist-packages/urllib3/connectionpool.py\", line 756, in urlopen\n    method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]\n  File \"/usr/local/lib/python3.7/dist-packages/urllib3/util/retry.py\", line 506, in increment\n    raise six.reraise(type(error), error, _stacktrace)\n  File \"/usr/local/lib/python3.7/dist-packages/urllib3/packages/six.py\", line 735, in reraise\n    raise value\n  File \"/usr/local/lib/python3.7/dist-packages/urllib3/connectionpool.py\", line 706, in urlopen\n    chunked=chunked,\n  File \"/usr/local/lib/python3.7/dist-packages/urllib3/connectionpool.py\", line 382, in _make_request\n    self._validate_conn(conn)\n  File \"/usr/local/lib/python3.7/dist-packages/urllib3/connectionpool.py\", line 1010, in _validate_conn\n    conn.connect()\n  File \"/usr/local/lib/python3.7/dist-packages/urllib3/connection.py\", line 353, in connect\n    conn = self._new_conn()\n  File \"/usr/local/lib/python3.7/dist-packages/urllib3/connection.py\", line 177, in _new_conn\n    % (self.host, self.timeout),\nurllib3.exceptions.ConnectTimeoutError: (<botocore.awsrequest.AWSHTTPSConnection object at 0x7f69952bde50>, \'Connection to pipelines.s3.amazonaws.com timed out. (connect timeout=60)\')\n\nDuring handling of the above exception, another exception occurred:\n\nTraceback (most recent call last):\n  File \"/usr/local/lib/python3.7/dist-packages/apache_beam/io/aws/clients/s3/boto3_client.py\", line 171, in create_multipart_upload\n    ContentType=request.mime_type)\n  File \"/usr/local/lib/python3.7/dist-packages/botocore/client.py\", line 357, in _api_call\n    return self._make_api_call(operation_name, kwargs)\n  File \"/usr/local/lib/python3.7/dist-packages/botocore/client.py\", line 663, in _make_api_call\n    operation_model, request_dict, request_context)\n  File \"/usr/local/lib/python3.7/dist-packages/botocore/client.py\", line 682, in _make_request\n    return self._endpoint.make_request(operation_model, request_dict)\n  File \"/usr/local/lib/python3.7/dist-packages/botocore/endpoint.py\", line 102, in make_request\n    return self._send_request(request_dict, operation_model)\n  File \"/usr/local/lib/python3.7/dist-packages/botocore/endpoint.py\", line 137, in _send_request\n    success_response, exception):\n  File \"/usr/local/lib/python3.7/dist-packages/botocore/endpoint.py\", line 256, in _needs_retry\n    caught_exception=caught_exception, request_dict=request_dict)\n  File \"/usr/local/lib/python3.7/dist-packages/botocore/hooks.py\", line 356, in emit\n    return self._emitter.emit(aliased_event_name, **kwargs)\n  File \"/usr/local/lib/python3.7/dist-packages/botocore/hooks.py\", line 228, in emit\n    return self._emit(event_name, kwargs)\n  File \"/usr/local/lib/python3.7/dist-packages/botocore/hooks.py\", line 211, in _emit\n    response = handler(**kwargs)\n  File \"/usr/local/lib/python3.7/dist-packages/botocore/retryhandler.py\", line 183, in __call__\n    if self._checker(attempts, response, caught_exception):\n  File \"/usr/local/lib/python3.7/dist-packages/botocore/retryhandler.py\", line 251, in __call__\n    caught_exception)\n  File \"/usr/local/lib/python3.7/dist-packages/botocore/retryhandler.py\", line 277, in _should_retry\n    return self._checker(attempt_number, response, caught_exception)\n  File \"/usr/local/lib/python3.7/dist-packages/botocore/retryhandler.py\", line 317, in __call__\n    caught_exception)\n  File \"/usr/local/lib/python3.7/dist-packages/botocore/retryhandler.py\", line 223, in __call__\n    attempt_number, caught_exception)\n  File \"/usr/local/lib/python3.7/dist-packages/botocore/retryhandler.py\", line 359, in _check_caught_exception\n    raise caught_exception\n  File \"/usr/local/lib/python3.7/dist-packages/botocore/endpoint.py\", line 200, in _do_get_response\n    http_response = self._send(request)\n  File \"/usr/local/lib/python3.7/dist-packages/botocore/endpoint.py\", line 269, in _send\n    return self.http_session.send(request)\n  File \"/usr/local/lib/python3.7/dist-packages/botocore/httpsession.py\", line 341, in send\n    raise ConnectTimeoutError(endpoint_url=request.url, error=e)\nbotocore.exceptions.ConnectTimeoutError: Connect timeout on endpoint URL: \"
https://pipelines.s3.amazonaws.com/tfx/trace_pipeline_e2e/FileBasedExampleGenWithDate/examples/1438/train/beam-temp-data_tfrecord-286f19285c8d11ebbb52a24bbfe454c5/319ecf64-85bb-46bd-afcd-a355144724a7.data_tfrecord.gz?uploads
\"\n\nDuring handling of the above exception, another exception occurred:\n\nTraceback (most recent call last):\n  File \"/usr/local/lib/python3.7/dist-packages/apache_beam/io/aws/s3io.py\", line 566, in _start_upload\n    response = self._client.create_multipart_upload(request)\n  File \"/usr/local/lib/python3.7/dist-packages/apache_beam/io/aws/clients/s3/boto3_client.py\", line 174, in create_multipart_upload\n    message = e.response[\'Error\'][\'Message\']\nAttributeError: \'ConnectTimeoutError\' object has no attribute \'response\'\n" instruction_id: "bundle_33" transform_id: "WriteSplit[train]/Write/Write/WriteImpl/WriteBundles" log_location: "/usr/local/lib/python3.7/dist-packages/apache_beam/io/aws/s3io.py:572" thread: "Thread-14" 

@dandy10
Copy link
Contributor Author

dandy10 commented Jan 22, 2021

@ConverJens can you add a print log in the boto3_client.Client initialiser to verify that the parameters have made it all the way up to that point? If it has then that's all this PR is aiming to do, and it must be another issue.

Another thing to verify. Are you running your test locally with the apache_beam package built from my fork, or are you running your tests by submitting a job to kubeflow? If it's the latter have you ensured that the worker pod which is running the job has the patched apache_beam installed?

@ConverJens
Copy link

@dandy10

@ConverJens can you add a print log in the boto3_client.Client initialiser to verify that the parameters have made it all the way up to that point? If it has then that's all this PR is aiming to do, and it must be another issue.

I will try that to verify the parameters and compare it to when using in_memory and see if there is a difference.

Another thing to verify. Are you running your test locally with the apache_beam package built from my fork, or are you running your tests by submitting a job to kubeflow? If it's the latter have you ensured that the worker pod which is running the job has the patched apache_beam installed?

I'm using a custom image where I have installed your form so I'm positive that I'm running the correct beam code.

I'll let you know next week when I have checked the params in the boto3_client.

@ConverJens
Copy link

@dandy10
I added some logging in s3io.py just before the s3 client is initialised as you suggested:

logging.info("boto3 options")
      logging.info(options)
      self.client = boto3_client.Client(options=options)

From the output it seems as the pipeline options are indeed empty:

INFO:apache_beam.runners.portability.local_job_service:Worker: severity: INFO timestamp {   seconds: 1611569401   nanos: 499072313 } message: "boto3 options" instruction_id: "bundle_33" transform_id: "WriteSplit[train]/Write/Write/WriteImpl/WriteBundles" log_location: "/usr/local/lib/python3.7/dist-packages/apache_beam/io/aws/s3io.py:67" thread: "Thread-14" 
INFO:apache_beam.runners.portability.local_job_service:Worker: severity: INFO timestamp {   seconds: 1611569401   nanos: 499203681 } message: "PipelineOptions()" instruction_id: "bundle_33" transform_id: "WriteSplit[train]/Write/Write/WriteImpl/WriteBundles" log_location: "/usr/local/lib/python3.7/dist-packages/apache_beam/io/aws/s3io.py:68" thread: "Thread-14" 

Any idea where to continue troubleshooting?

@ConverJens
Copy link

@dandy10
I added an additional printout to the options.get_all_options() and all the s3 args are indeed empty:

INFO:apache_beam.runners.portability.local_job_service:Worker: severity: INFO timestamp {   seconds: 1611570854   nanos: 805634498 } message: "{\'runner\': None, \'streaming\': False, \'beam_services\': {}, \'type_check_strictness\': \'DEFAULT_TO_ANY\', \'type_check_additional\': \'\', \'pipeline_type_check\': True, \'runtime_type_check\': False, \'performance_runtime_type_check\': False, \'direct_runner_use_stacked_bundle\': True, \'direct_runner_bundle_repeat\': 0, \'direct_num_workers\': 1, \'direct_running_mode\': \'in_memory\', \'dataflow_endpoint\': \'
https://dataflow.googleapis.com
\', \'project\': None, \'job_name\': None, \'staging_location\': None, \'temp_location\': None, \'region\': None, \'service_account_email\': None, \'no_auth\': False, \'template_location\': None, \'labels\': None, \'update\': False, \'transform_name_mapping\': None, \'enable_streaming_engine\': False, \'dataflow_kms_key\': None, \'flexrs_goal\': None, \'hdfs_host\': None, \'hdfs_port\': None, \'hdfs_user\': None, \'hdfs_full_urls\': False, \'num_workers\': None, \'max_num_workers\': None, \'autoscaling_algorithm\': None, \'machine_type\': None, \'disk_size_gb\': None, \'disk_type\': None, \'worker_region\': None, \'worker_zone\': None, \'zone\': None, \'network\': None, \'subnetwork\': None, \'worker_harness_container_image\': None, \'sdk_harness_container_image_overrides\': None, \'use_public_ips\': None, \'min_cpu_platform\': None, \'dataflow_worker_jar\': None, \'dataflow_job_file\': None, \'experiments\': None, \'number_of_worker_harness_threads\': None, \'profile_cpu\': False, \'profile_memory\': False, \'profile_location\': None, \'profile_sample_rate\': 1.0, \'requirements_file\': None, \'requirements_cache\': None, \'setup_file\': None, \'beam_plugins\': None, \'save_main_session\': False, \'sdk_location\': \'default\', \'extra_packages\': None, \'prebuild_sdk_container_engine\': None, \'prebuild_sdk_container_base_image\': None, \'docker_registry_push_url\': None, \'job_endpoint\': None, \'artifact_endpoint\': None, \'job_server_timeout\': 60, \'environment_type\': None, \'environment_config\': None, \'environment_options\': None, \'sdk_worker_parallelism\': 1, \'environment_cache_millis\': 0, \'output_executable_path\': None, \'artifacts_dir\': None, \'job_port\': 0, \'artifact_port\': 0, \'expansion_port\': 0, \'job_server_java_launcher\': \'java\', \'job_server_jvm_properties\': [], \'flink_master\': \'[auto]\', \'flink_version\': \'1.10\', \'flink_job_server_jar\': None, \'flink_submit_uber_jar\': False, \'spark_master_url\': \'local[4]\', \'spark_job_server_jar\': None, \'spark_submit_uber_jar\': False, \'spark_rest_url\': None, \'on_success_matcher\': None, \'dry_run\': False, \'wait_until_finish_duration\': None, \'pubsubRootUrl\': None, \'s3_access_key_id\': None, \'s3_secret_access_key\': None, \'s3_session_token\': None, \'s3_endpoint_url\': None, \'s3_region_name\': None, \'s3_api_version\': None, \'s3_verify\': None, \'s3_disable_ssl\': False}" instruction_id: "bundle_31" transform_id: "WriteSplit[eval]/Write/Write/WriteImpl/WriteBundles" log_location: "/usr/local/lib/python3.7/dist-packages/apache_beam/io/aws/s3io.py:70" thread: "Thread-14" 

@ConverJens
Copy link

@dandy10
When using in_memory they are present:

INFO:root:s3io pipeline options: {'runner': None, 'streaming': False, 'beam_services': {}, 'type_check_strictness': 'DEFAULT_TO_ANY', 'type_check_additional': '', 'pipeline_type_check': True, 'runtime_type_check': False, 'performance_runtime_type_check': False, 'direct_runner_use_stacked_bundle': True, 'direct_runner_bundle_repeat': 0, 'direct_num_workers': 1, 'direct_running_mode': 'in_memory', 'dataflow_endpoint': '
https://dataflow.googleapis.com
', 'project': None, 'job_name': None, 'staging_location': None, 'temp_location': None, 'region': None, 'service_account_email': None, 'no_auth': False, 'template_location': None, 'labels': ['tfx_executor=mponents-example_gen-custom_executors-parquet_executor-executor', 'tfx_py_version=3-7', 'tfx_runner=kfp', 'tfx_version=0-26-0'], 'update': False, 'transform_name_mapping': None, 'enable_streaming_engine': False, 'dataflow_kms_key': None, 'flexrs_goal': None, 'hdfs_host': None, 'hdfs_port': None, 'hdfs_user': None, 'hdfs_full_urls': False, 'num_workers': None, 'max_num_workers': None, 'autoscaling_algorithm': None, 'machine_type': None, 'disk_size_gb': None, 'disk_type': None, 'worker_region': None, 'worker_zone': None, 'zone': None, 'network': None, 'subnetwork': None, 'worker_harness_container_image': None, 'sdk_harness_container_image_overrides': None, 'use_public_ips': None, 'min_cpu_platform': None, 'dataflow_worker_jar': None, 'dataflow_job_file': None, 'experiments': ['beam_fn_api'], 'number_of_worker_harness_threads': None, 'profile_cpu': False, 'profile_memory': False, 'profile_location': None, 'profile_sample_rate': 1.0, 'requirements_file': None, 'requirements_cache': None, 'setup_file': None, 'beam_plugins': None, 'save_main_session': False, 'sdk_location': 'default', 'extra_packages': ['/tmp/tmp9d5hikqe/build/tfx/dist/tfx_ephemeral-0.26.0.tar.gz'], 'prebuild_sdk_container_engine': None, 'prebuild_sdk_container_base_image': None, 'docker_registry_push_url': None, 'job_endpoint': None, 'artifact_endpoint': None, 'job_server_timeout': 60, 'environment_type': None, 'environment_config': None, 'environment_options': None, 'sdk_worker_parallelism': 1, 'environment_cache_millis': 0, 'output_executable_path': None, 'artifacts_dir': None, 'job_port': 0, 'artifact_port': 0, 'expansion_port': 0, 'job_server_java_launcher': 'java', 'job_server_jvm_properties': [], 'flink_master': '[auto]', 'flink_version': '1.10', 'flink_job_server_jar': None, 'flink_submit_uber_jar': False, 'spark_master_url': 'local[4]', 'spark_job_server_jar': None, 'spark_submit_uber_jar': False, 'spark_rest_url': None, 'on_success_matcher': None, 'dry_run': False, 'wait_until_finish_duration': None, 'pubsubRootUrl': None, 's3_access_key_id': '<access_key>', 's3_secret_access_key': '<secret_key>', 's3_session_token': None, 's3_endpoint_url': 'http://minio-service.kubeflow:9000', 's3_region_name': None, 's3_api_version': None, 's3_verify': 'False--s3_disable_ssl=True', 's3_disable_ssl': False}
http://minio-service.kubeflow:9000
', 's3_region_name': None, 's3_api_version': None, 's3_verify': 'False--s3_disable_ssl=True', 's3_disable_ssl': False}

@dandy10
Copy link
Contributor Author

dandy10 commented Jan 25, 2021

That's strange, I'm really not sure why it's not working for you. I think the quickest way to troubleshoot would be to add logging at the points in the PR which I've changed, to verify that you're going down the same path, and see where exactly the options are lost. Could you log them as they are passed through the SwitchingDirectRunner and SubprocessSdkWorker classes?

@pabloem
Copy link
Member

pabloem commented Jan 26, 2021

I'll start reviewing this tomorrow. Thanks!

@dandy10
Copy link
Contributor Author

dandy10 commented Jan 29, 2021

@ConverJens did you make any progress on finding what the issue was?

@pabloem did you get a chance to look yet?

@ConverJens
Copy link

@dandy10 No not much, unfortunately. I added logging to both classes that you suggested and from SubprocessSdkWorker I get empty pipeline options, but even more strange is that I get no output at all from my loggs in SwitchingDirectRunner.

If this is indeed the case, it would explain why there are no options present downstream since they are exported in SwitchingDirectRunner.

Is there any other path that to initialize the pipeline that doesn't go through SwitchingDirectRunner?

These are the first part of the beam logs where the subprocesses are initalized and each subprocess calls sdk_worker_main.

INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function annotate_downstream_side_inputs at 0x7f8370065710> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function fix_side_input_pcoll_coders at 0x7f8370065830> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function eliminate_common_key_with_none at 0x7f83700659e0> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function pack_combiners at 0x7f8370065a70> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function lift_combiners at 0x7f8370065b00> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function expand_sdf at 0x7f8370065cb0> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function expand_gbk at 0x7f8370065d40> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function sink_flattens at 0x7f8370065e60> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function greedily_fuse at 0x7f8370065ef0> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function read_to_impulse at 0x7f8370065f80> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function impulse_to_input at 0x7f8370066050> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function sort_stages at 0x7f8370066290> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function setup_timer_mapping at 0x7f8370066200> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function populate_data_channel_coders at 0x7f8370066320> ====================
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:starting control server on port 39917
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:starting data server on port 45603
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:starting state server on port 38319
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:starting logging server on port 35791
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.SubprocessSdkWorkerHandler object at 0x7f84433a6910> for environment ref_Environment_default_environment_1 (beam:env:harness_subprocess_python:v1, b'/usr/bin/python -m apache_beam.runners.worker.sdk_worker_main')
WARNING:root:SubprocessSdkWorker
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.SubprocessSdkWorkerHandler object at 0x7f8441b39950> for environment ref_Environment_default_environment_1 (beam:env:harness_subprocess_python:v1, b'/usr/bin/python -m apache_beam.runners.worker.sdk_worker_main')
WARNING:root:SubprocessSdkWorker
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.SubprocessSdkWorkerHandler object at 0x7f8441b591d0> for environment ref_Environment_default_environment_1 (beam:env:harness_subprocess_python:v1, b'/usr/bin/python -m apache_beam.runners.worker.sdk_worker_main')
WARNING:root:SubprocessSdkWorker
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.SubprocessSdkWorkerHandler object at 0x7f8441b59810> for environment ref_Environment_default_environment_1 (beam:env:harness_subprocess_python:v1, b'/usr/bin/python -m apache_beam.runners.worker.sdk_worker_main')
WARNING:root:SubprocessSdkWorker pipeline_options: {}
WARNING:root:SubprocessSdkWorker
WARNING:root:SubprocessSdkWorker pipeline_options: {}
INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running (((((ref_AppliedPTransform_WriteSplit[train]/Write/Write/WriteImpl/DoOnce/Impulse_27)+(ref_AppliedPTransform_WriteSplit[train]/Write/Write/WriteImpl/DoOnce/FlatMap(<lambda at core.py:2957>)_28))+(ref_AppliedPTransform_WriteSplit[train]/Write/Write/WriteImpl/DoOnce/Map(decode)_30))+(ref_AppliedPTransform_WriteSplit[train]/Write/Write/WriteImpl/InitializeWrite_31))+(ref_PCollection_PCollection_17/Write))+(ref_PCollection_PCollection_18/Write)
WARNING:root:SubprocessSdkWorker pipeline_options: {}
WARNING:root:SubprocessSdkWorker pipeline_options: {}

@ConverJens
Copy link

@dandy10 For reference, this is how the beam pipeline is initiated in TFX:

  def _make_beam_pipeline(self) -> beam_Pipeline:  # pytype: disable=invalid-annotation
    """Makes beam pipeline."""
    if not beam:
      raise Exception(
          'Apache Beam must be installed to use this functionality.')
    # pylint: disable=g-import-not-at-top
    from apache_beam.options.pipeline_options import DirectOptions
    from apache_beam.options.pipeline_options import PipelineOptions
    from apache_beam.options.pipeline_options import StandardOptions
    from apache_beam.runners.portability import fn_api_runner
    # pylint: enable=g-import-not-at-top
    pipeline_options = PipelineOptions(self._beam_pipeline_args)
    if pipeline_options.view_as(StandardOptions).runner:
      return beam.Pipeline(argv=self._beam_pipeline_args)

    # TODO(b/159468583): move this warning to Beam.
    direct_running_mode = pipeline_options.view_as(
        DirectOptions).direct_running_mode
    direct_num_workers = pipeline_options.view_as(
        DirectOptions).direct_num_workers
    if direct_running_mode == 'in_memory' and direct_num_workers != 1:
      absl.logging.warning(
          'If direct_num_workers is not equal to 1, direct_running_mode should '
          'be `multi_processing` or `multi_threading` instead of `in_memory` '
          'in order for it to have the desired worker parallelism effect.')

    return beam.Pipeline(
        options=pipeline_options, runner=fn_api_runner.FnApiRunner())

@dandy10
Copy link
Contributor Author

dandy10 commented Feb 1, 2021

@ConverJens there's the problem. You're not specifying the runner type so TFX is defaulting to go straight to the FnApiRunner (without specifying the provision info, which is what this PR is fixing). If you explicitly specify '--runner=DirectRunner' as an extra beam arg I think it should solve the issue.

@ConverJens
Copy link

@dandy10 That solved it! I was previously only specifying --direct_runner but actually setting --runner=DirectRunner did it. Thank you for your help and this great contribution! Consider this PR third party tested!

@dandy10
Copy link
Contributor Author

dandy10 commented Feb 2, 2021

Excellent, glad to help, and thanks for testing!

@pabloem pabloem changed the title Propagate pipeline options to direct runner [BEAM-11736] Propagate pipeline options to direct runner Feb 2, 2021
@pabloem
Copy link
Member

pabloem commented Feb 2, 2021

This LGTM. Thank you for the contribution!

@pabloem pabloem merged commit 5b105d4 into apache:master Feb 2, 2021
masahitojp pushed a commit to masahitojp/beam that referenced this pull request Feb 3, 2021
…options to direct runner

* propagate pipeline options to direct runner

* delay new import

* fix import reference

* format fix

* remove mypy comment

* fix import order error

* only serialize known args

* fix snippets tests

Co-authored-by: John Doe <john@doe.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants