Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python Reshuffle holds elements #21591

Closed
damccorm opened this issue Jun 5, 2022 · 10 comments
Closed

Python Reshuffle holds elements #21591

damccorm opened this issue Jun 5, 2022 · 10 comments

Comments

@damccorm
Copy link
Contributor

damccorm commented Jun 5, 2022

Python Reshuffle holds elements when pipeline is running, and likely release them in a batch. In contrast, Java Reshuffle triggers on every element as noted in its documentation
"the trigger used with {@link Reshuffle} which triggers on every element and never buffers

  • state."

Here is a working example:


def test(p: Pipeline):
  class SlowProcessFn(beam.DoFn):
    def process(self, element):
      time.sleep(0.5)

     yield element

  result = (p 
    | beam.Create(range(100)) 
    | beam.ParDo(SlowProcessFn())

   | beam.Reshuffle() # HERE
    | beam.Map(lambda x: print(x, time.time())))
  return result

Tested on local runner and flink runner (1.14), the elements are printed after 50 secs. If commenting out Reshuffle, every half second an element gets printed.

This behavior introduces issue when downstream PTransform involves some kind of time-sensitive operation, like receiving a list of updated files from input and read them done by filebasedsource.ReadAllFiles transform. Because there is a Reshuffle in ReadAll, the actual read will be blocked.

Imported from Jira BEAM-14497. Original Jira may contain additional context.
Reported by: yihu.

@kennknowles
Copy link
Member

@Abacn will you be taking this one? Which runner are you discussing? I believe that the runner controls this, so it is not specific to the SDK. But if the Python SDK is putting in a trigger that does not cause the reshuffle, this could still be a runner thing since the runner should maybe be looking at the composite transform. I don't have current context on this one.

@Abacn
Copy link
Contributor

Abacn commented Jun 10, 2022

Sorry there was a typo, it should "the elements are not printed after 50 secs". i.e. It seems reShuffle always holds element until the stage is finished.Testes and it seems this behavior exists in all runners based on portable: python direct runner (which also calls fn_runner now), and flink.

@kennknowles
Copy link
Member

Just to clarify: because you say "until the stage is finished" I think of batch processing, since stages do not finish in streaming. If it is batch, then this is expected behavior. In streaming, it is expected that a reshuffle would immediately emit elements as each bundle is committed.

@Abacn
Copy link
Contributor

Abacn commented Jun 10, 2022

Just to clarify: because you say "until the stage is finished" I think of batch processing, since stages do not finish in streaming. If it is batch, then this is expected behavior. In streaming, it is expected that a reshuffle would immediately emit elements as each bundle is committed.

In my case it is streaming. I've written a pretty simple test:

_LOGGER = logging.getLogger(__name__)

def print_time(x):
  import datetime
  _LOGGER.warning(datetime.datetime.fromtimestamp(x))

result = (p 
  | PeriodicImpulse(start_timestamp=time.time(), fire_interval=1.0)
  | beam.Reshuffle() # HERE
  | beam.Map(print_time))

No output (warning log) seen in direct runner, portable runner and flink runner running locally; also tested on dataflow runner and the output can be seen.

@Abacn
Copy link
Contributor

Abacn commented Jun 10, 2022

.take-issue

@kennknowles
Copy link
Member

I see that you also fixed #21606. That could be a source of this problem too.

@Abacn
Copy link
Contributor

Abacn commented Jun 16, 2022

I see that you also fixed #21606. That could be a source of this problem too.

Thanks for follow up. With that fix in, tested with local flink cluster now it no longer holds elements, but the direct runner (without specifying --runner) Shuffle still hold elements. Likely related to that currently streaming is not fully supported running locally.

@kennknowles
Copy link
Member

Yea, that was going to be my next guess. Since you said it impacted Flink I looked for other causes. But if it is just the Python direct runner, streaming support is most likely the issue. I think @robertwb and @pabloem are the people I would expect to know about what is expected to work and what is not implemented yet.

@tvalentyn
Copy link
Contributor

Default (FnApi) direct runner doesn't fully support streaming https://issues.apache.org/jira/browse/BEAM-7514.
Using --runner BundleBasedDirectRunner --streaming also didn't work due to smth similar to: #21103

@tvalentyn
Copy link
Contributor

Overall, I think Reshuffle itself semantically can hold element as @kennknowles mentions in #21591 (comment). I suggest we close this as WAI and track runner improvements as necessary.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants