Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dataflow Job stuck on Draining for long time #24438

Closed
BjornPrime opened this issue Nov 30, 2022 · 4 comments
Closed

Dataflow Job stuck on Draining for long time #24438

BjornPrime opened this issue Nov 30, 2022 · 4 comments
Assignees
Labels
bug core done & done Issue has been reviewed after it was closed for verification, followups, etc. P2 python

Comments

@BjornPrime
Copy link
Contributor

What happened?

Issue was reported causing streaming pipeline to get stuck while draining.

Issue was reproduced in the following pipeline:

import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms import trigger
from apache_beam.transforms import window
from apache_beam.transforms.periodicsequence import PeriodicImpulse
import time


def run(argv=None):
  options = PipelineOptions()
  pipeline = beam.Pipeline(options=options)
  _ = (
      pipeline
      |  PeriodicImpulse(
          start_timestamp=time.time(), stop_timestamp=time.time()+15,
          fire_interval=1, apply_windowing=False)
      | beam.WindowInto(
            window.GlobalWindows(),
            trigger=trigger.Repeatedly(trigger.AfterCount(4)),
            accumulation_mode=trigger.AccumulationMode.DISCARDING)
      | beam.combiners.Top.Largest(1)
      | beam.ParDo(lambda x: logging.error('element: %s', x))
  )
  pipeline.run().wait_until_finish()

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

Fails both on Dataflow and Flink.

Example:

./flink-1.11.6/bin/start-cluster.sh

docker run --net=host apache/beam_flink1.11_job_server:latest

python -m pimpulse_small  --runner PortableRunner  --job_endpoint="localhost:8099" --environment_type="LOOPBACK"  --streaming
...
merge_accumulators
    assert result_heap is not None and holds_comparables is not None
RuntimeError: AssertionError [while running 'Largest(1)/Top(1)/CombineGlobally(TopCombineFn)/CombinePerKey/Combine/ParDo(CombineValuesDoFn)']

Issue Priority

Priority: 2

Issue Component

Component: sdk-py-core

@Abacn
Copy link
Contributor

Abacn commented Mar 3, 2023

Is this still a problem after #23765 ?

@tvalentyn
Copy link
Contributor

I don't know, I'd suggest try it out and answer here.

@liferoad
Copy link
Collaborator

I tested the above code using both DirectRunner and DataflowRunner. Both work fine. So #23765 fixed the assert issue. I suggest we should close this issue and track the future work in #24683 to handle inputs from multiple windows simultaneously.

@tvalentyn tvalentyn added the done & done Issue has been reviewed after it was closed for verification, followups, etc. label May 15, 2023
@tvalentyn
Copy link
Contributor

thanks for testing.

@github-actions github-actions bot added this to the 2.48.0 Release milestone May 15, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug core done & done Issue has been reviewed after it was closed for verification, followups, etc. P2 python
Projects
None yet
Development

No branches or pull requests

4 participants