Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

python CombineGlobally().with_fanout() cause duplicate combine results for sliding windows #20528

Open
damccorm opened this issue Jun 4, 2022 · 33 comments

Comments

@damccorm
Copy link
Contributor

damccorm commented Jun 4, 2022

not only there are more than 1 result per window, results for each window got duplicated as well.

here is some code I made to reproduce the issue, just run it with and without *.with_fanout*

if running with Dataflow runner, add appropriate *gs://path/* in *WriteToText*

 


import apache_beam as beam
from apache_beam.transforms import window
from apache_beam.utils.timestamp import Timestamp

class ListFn(beam.CombineFn):
  def create_accumulator(self):
    return []


 def add_input(self, mutable_accumulator, element):
    return mutable_accumulator + [element]


 def merge_accumulators(self, accumulators):
    res = []
    for accu in accumulators:
      res = res + accu
    return res

  def extract_output(self, accumulator):
    return accumulator


p = beam.Pipeline()

(
    p
    | beam.Create([
      window.TimestampedValue(1, Timestamp(seconds=1596216396)),

     window.TimestampedValue(2, Timestamp(seconds=1596216397)),
      window.TimestampedValue(3, Timestamp(seconds=1596216398)),

     window.TimestampedValue(4, Timestamp(seconds=1596216399)),
      window.TimestampedValue(5, Timestamp(seconds=1596216400)),

     window.TimestampedValue(6, Timestamp(seconds=1596216402)),
      window.TimestampedValue(7, Timestamp(seconds=1596216403)),

     window.TimestampedValue(8, Timestamp(seconds=1596216405))])
    | beam.WindowInto(window.SlidingWindows(10, 5))
    | beam.CombineGlobally(ListFn()).without_defaults().with_fanout(5)
    | beam.Map(repr)

   | beam.io.WriteToText("py-test-result", file_name_suffix='.json', num_shards=1))

p.run()

 

Imported from Jira BEAM-10617. Original Jira may contain additional context.
Reported by: leiyiz.

@kennknowles
Copy link
Member

If this is still true, then it should be very easy to reproduce. @robertwb or @tvalentyn or @pabloem do you know?

@tvalentyn
Copy link
Contributor

@BjornPrime can you try to reproduce this ?

@BjornPrime
Copy link
Contributor

BjornPrime commented Oct 4, 2022

I've successfully reproduced the issue.

Compare with fanout:

[1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 7, 5, 5, 6, 6, 7, 7, 8, 8]
[1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 7, 1, 1, 2, 2, 3, 3, 4, 4]
[1, 1, 2, 2, 3, 3, 4, 4]
[5, 5, 6, 6, 7, 7, 8, 8, 8, 8]
[8, 8]

and without:

[1, 2, 3, 4, 5, 6, 7]
[1, 2, 3, 4]
[5, 6, 7, 8]
[8]

@tvalentyn
Copy link
Contributor

Thanks, I'll add it to our interrupts tracker.

@TheNeuralBit
Copy link
Member

I feel like the labels P1 and good first issue should be mutually exclusive?

@tvalentyn
Copy link
Contributor

it may be neither actually.

@kennknowles
Copy link
Member

It sounds like data loss, which should certainly be P1

@tvalentyn
Copy link
Contributor

tvalentyn commented Oct 7, 2022

Sure, we can investigate as P1. P2 was based on the suspicion that this behavior has been like this from initial implementation.
As next step, I think we should identify whether with_fanout by nature of its implementation only works for particular classes of combiners/accumulators, for example: Top or Mean where repeating elements twice doesn't affect the end result, or whether this is a bug that can and should be fixed.

@tvalentyn tvalentyn added P1 and removed P2 labels Oct 7, 2022
@kennknowles
Copy link
Member

If we cannot quick fix, we should remove with_fanout until we can fix it, or at least warn somehow, and add to release notes for released SDKs.

@tvalentyn
Copy link
Contributor

started working on this today, no PR yet.

@tvalentyn
Copy link
Contributor

This re-duplicates the windows for sliding windows WindowFns. (It will do the wrong thing for sessions as well.)

Does not seem to repro with sessions. Passing test:

  def test_combining_with_session_windows_and_fanout(self):
    import logging
    class ListFn(beam.CombineFn):
      def create_accumulator(self):
        return []

      def add_input(self, mutable_accumulator, element):
        return mutable_accumulator + [element]

      def merge_accumulators(self, accumulators):
        res = []
        for accu in accumulators:
          res = res + accu
        return res

      def extract_output(self, accumulator):
        return accumulator

    options = PipelineOptions()
    options.view_as(StandardOptions).streaming = True
    with TestPipeline(options=options) as p:
      def has_expected_values(actual):
        from hamcrest.core import assert_that as hamcrest_assert
        from hamcrest.library.collection import contains_exactly
        from hamcrest.library.collection import only_contains

        hamcrest_assert(ordered, contains_exactly([0, 1, 2, 3], [5, 6, 7, 8]))
       
      result = (
              p
              | beam.Create([
        window.TimestampedValue(0, Timestamp(seconds=0)),
        window.TimestampedValue(1, Timestamp(seconds=1)),
        window.TimestampedValue(2, Timestamp(seconds=2)),
        window.TimestampedValue(3, Timestamp(seconds=3)),

        window.TimestampedValue(5, Timestamp(seconds=5)),
        window.TimestampedValue(6, Timestamp(seconds=6)),
        window.TimestampedValue(7, Timestamp(seconds=7)),
        window.TimestampedValue(8, Timestamp(seconds=8))])
              | beam.WindowInto(window.Sessions(2))
              | beam.CombineGlobally(ListFn()).without_defaults().with_fanout(5)
      )

@kennknowles
Copy link
Member

Yea, it probably has to do with any windowfn that duplicates an element into multiple windows

@tvalentyn
Copy link
Contributor

OK, the problem is that we re-window to avoid stacked accumulating mode:

Can we re-window only if we detect accumulating mode? See: #23828

Alternatively, we could not rewindow if we detect slidng windows. disabling sliding windows is straightforward too if we want to go that route for now.

@chamikaramj
Copy link
Contributor

This is the only issue currently blocking the 2.43.0 release without an existing cherry-pick request.

Can we push this to the next release ?

@tvalentyn
Copy link
Contributor

I discussed the fix yesterday w/ Robert and we will pursue a slightly different fix, which should be ready shortly. however, this bug has been there from the very first commit implementing with_fanout, so I wouldn't delay this specific release on it.

@chamikaramj
Copy link
Contributor

Ack. Removing this from the milestone for now.

@tvalentyn
Copy link
Contributor

current fix caused an OOM in one of customers' pipelines , planning to revert and investigate further.

@kennknowles
Copy link
Member

Is it possible that the OOM is unavoidable because of the fix? This is a pretty seriouis data corruption problem, no? I suppose it is not a regression but I would very much like 2.44.0 to have correct results in this situation.

@tvalentyn
Copy link
Contributor

tvalentyn commented Dec 7, 2022

Is it possible that the OOM is unavoidable because of the fix?

it's unlikely

This is a pretty seriouis data corruption problem, no? I suppose it is not a regression

that's correct. it has been there since when with_fanout was added.

but I would very much like 2.44.0 to have correct results in this situation.

I am planning to come back to it next week when I am on a rotational duty again.

@kennknowles
Copy link
Member

OK given this context I am going to remove the release milestone. We can document the range of releases for which it does not work.

@kennknowles
Copy link
Member

Actually simply disabling it would be a reasonable way to protect user data and we should do that in the immediate term.

@kennknowles
Copy link
Member

The mitigation has been cherrypicked, so I am removing it from the 2.44.0 milestone. The bug remains open.

@kennknowles
Copy link
Member

If this is now disabled, is it perhaps P2?

@kennknowles
Copy link
Member

If it isn't actively being worked on, I suggest downgrading and unassigning.

@tvalentyn tvalentyn added P2 and removed P1 labels Feb 13, 2023
@ee07dazn
Copy link

any progress on this ?

@tvalentyn
Copy link
Contributor

we had a tentative fix but it has caused a performance regression and was rolled back; since then it remains in the backlog.

@ee07dazn
Copy link

I need the combiner because group-by is ineffective. And i needed it to further fanout as its streaming kafka input at quite a high rate.

any possible workaround approaches ?

@tvalentyn
Copy link
Contributor

tvalentyn commented Apr 28, 2023

Ack. this feedback would help w/ prioritization.

I'm not sure I can answer based on limited information about the usecase, but for experimentation processes you could apply #23828 locally. These changes only matter at job submission.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

8 participants