Skip to content

[Task]: Enable to set timeout for Python TestPipeline #29646

Open
@Abacn

Description

@Abacn

What needs to happen?

Currently TestPipeline run indefinitely:

state = result.wait_until_finish()

In the case the test timeout, it does not print useful information, just a pytest timeout message and the stacktrace where it gets interrupted (e.g. https://github.com/apache/beam/runs/19275621816)

Failed: Timeout >4500.0s
self = <apache_beam.transforms.ptransform_test.PTransformTest testMethod=test_flatten_one_single_pcollection>

    @pytest.mark.it_validatesrunner
    def test_flatten_one_single_pcollection(self):
>     with TestPipeline() as pipeline:
...
        while thread.is_alive():
>         time.sleep(5.0)
E         Failed: Timeout >4500.0s

However, DataflowRunner.wait_until_finish() indeed supports duration:

def wait_until_finish(self, duration=None):

and when timeout, it prints the job id so one can find the Dataflow job to investigate:

assert duration or terminated, (
'Job did not reach to a terminal state after waiting indefinitely. '
'{}'.format(consoleUrl))

We should be able to use this functionality for TestPipeline, for example,

with TestPipeline(timeout = 600.0) as p:
   ...

Issue Priority

Priority: 3 (nice-to-have improvement)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions