Skip to content

[Prism] Fix an issue on pane info being overwritten by different bundles.#36188

Merged
shunping merged 6 commits into
apache:masterfrom
shunping:prism-pane-info-overwritten
Sep 19, 2025
Merged

[Prism] Fix an issue on pane info being overwritten by different bundles.#36188
shunping merged 6 commits into
apache:masterfrom
shunping:prism-pane-info-overwritten

Conversation

@shunping
Copy link
Copy Markdown
Collaborator

@shunping shunping commented Sep 18, 2025

In prism, each stage state only stores the latest pane info.

type StateData struct {
Bag [][]byte
Multimap map[string][][]byte
Trigger map[Trigger]triggerState
Pane typex.PaneInfo
}

In the some scenarios, that's not enough. For example, when we have multiple injected bundles, and each call computeNextTriggeredPane, the latter pane will overwrite the previous one.

newElementCount: 1,
endOfWindowReached: endOfWindowReached,
}, &state)
if ready {
state.Pane = computeNextTriggeredPane(state.Pane, endOfWindowReached)
}
// Store the state as triggers may have changed it.
ss.state[LinkID{}][e.window][string(e.keyBytes)] = state
// If we're ready, it's time to fire!

Here is the code to reproduce:

import logging

import apache_beam as beam
from apache_beam.transforms.window import GlobalWindows
from apache_beam.transforms import trigger
from apache_beam.options.pipeline_options import PipelineOptions

logging.basicConfig(level=logging.INFO)

# prism runner option
options = PipelineOptions([
    "--environment_type=LOOPBACK",
    #"--runner=PrismRunner",
    "--runner=PortableRunner",
    "--job_endpoint=localhost:8073",
    "--experiments=prism_enable_rtc",
    "--prism_log_level=info",
    "--allow_unsafe_triggers",
])

with beam.Pipeline(options=options) as p:
  result = (
      p | beam.Create([1, 2])
      | beam.WindowInto(
          GlobalWindows(),
          trigger=trigger.Repeatedly(trigger.AfterCount(1)),
          accumulation_mode=trigger.AccumulationMode.DISCARDING,
          allowed_lateness=0,
      )
      | beam.GroupBy()
      | beam.LogElements(
          level=logging.WARNING,
          with_timestamp=True,
          with_window=True,
          with_pane_info=True,
          use_epoch_time=True))

Running on the current code, we can see both pane info are the same for these two elements.

WARNING:root:(BeamSchema_d621aa78_ce21_4a09_bcaf_45d143ced57e(), [2]), timestamp=9223371950454, window(start=-9223372036855, end=9223371950454), pane_info=PaneInfo(first: False, last: False, timing: EARLY, index: 1, nonspeculative_index: -1)
WARNING:root:(BeamSchema_d621aa78_ce21_4a09_bcaf_45d143ced57e(), [1]), timestamp=9223371950454, window(start=-9223372036855, end=9223371950454), pane_info=PaneInfo(first: False, last: False, timing: EARLY, index: 1, nonspeculative_index: -1)

The correct output should be:

WARNING:root:(BeamSchema_ef4d452e_e897_4dd8_9002_014e04413bc6(), [2]), timestamp=9223371950454, window(start=-9223372036855, end=9223371950454), pane_info=PaneInfo(first: False, last: False, timing: EARLY, index: 1, nonspeculative_index: -1)
WARNING:root:(BeamSchema_ef4d452e_e897_4dd8_9002_014e04413bc6(), [1]), timestamp=9223371950454, window(start=-9223372036855, end=9223371950454), pane_info=PaneInfo(first: True, last: False, timing: EARLY, index: 0, nonspeculative_index: -1)

This PR fixes the above issue by saving pane info for bundles if needed.

@shunping shunping changed the title Fix an issue on pane info being overwritten by different bundles. [Prism] Fix an issue on pane info being overwritten by different bundles. Sep 18, 2025
@shunping shunping marked this pull request as ready for review September 18, 2025 04:03
@shunping
Copy link
Copy Markdown
Collaborator Author

Also ran Java validatesRunner locally and all tests passed.

@github-actions
Copy link
Copy Markdown
Contributor

Assigning reviewers:

R: @jrmccluskey for label go.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@lostluck
Copy link
Copy Markdown
Contributor

For a given post aggregation stage, only one bundle should process a given key+window at a time. So one pane should be enough...

@lostluck
Copy link
Copy Markdown
Contributor

Ah right but for triggers we have a global state... Hmmm. Todo to unify but otherwise this should be fine.

@shunping
Copy link
Copy Markdown
Collaborator Author

shunping commented Sep 18, 2025

What do you prefer to unify that? Alternatively, we can put bundlePanes into stateData.

type StateData struct {
Bag [][]byte
Multimap map[string][][]byte
Trigger map[Trigger]triggerState
Pane typex.PaneInfo
}
.

Given we already had that nested structure in stageState

state map[LinkID]map[typex.Window]map[string]StateData // state data for this stage, from {tid, stateID} -> window -> userKey

@lostluck
Copy link
Copy Markdown
Contributor

What do you prefer to unify that? Alternatively, we can put bundlePanes into stateData.

type StateData struct {
Bag [][]byte
Multimap map[string][][]byte
Trigger map[Trigger]triggerState
Pane typex.PaneInfo
}

.
Given we already had that nested structure in stageState

state map[LinkID]map[typex.Window]map[string]StateData // state data for this stage, from {tid, stateID} -> window -> userKey

Thinking this through...

A trigger fires, and we compute a pane, and we start a bundle for it. Bundle contains that key so we only have that bundle running with that key.
A trigger fires again. The bundle can't start since it uses the key. It needs the "subsequent" pane for that window+key.

But since this is post aggregation (Aggregations which is inherently stateful, but also never done SDK side), there's no real "stateful" restriction for the key+window on the output here, unless the post-GBK transform is stateful itself.

The pane needs to be preserved downstream of these sequences since the GBK is implemented as an outside of EM transform, so we can propagate it down to the post GBK transform...

But at no point, are we "forking" the pane lineage, since as a state, the pane is pretty moot.

Your implementation's idea is correct then I think.

state map[LinkID]map[typex.Window]map[string]StateData // state data for this stage, from {tid, stateID} -> window -> userKey
stateTypeLen map[LinkID]func([]byte) int // map from state to a function that will produce the total length of a single value in bytes.
bundlesToInject []RunBundle // bundlesToInject are triggered bundles that will be injected by the watermark loop to avoid premature pipeline termination.
bundlePanes map[string]map[typex.Window]map[string]typex.PaneInfo // PanInfo snapshot for bundles, from BundleID -> window -> userKey
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
bundlePanes map[string]map[typex.Window]map[string]typex.PaneInfo // PanInfo snapshot for bundles, from BundleID -> window -> userKey
bundlePanes map[string]map[typex.Window]map[string]typex.PaneInfo // PaneInfo snapshot for bundles, from BundleID -> window -> userKey

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

if _, ok := ss.kind.(*aggregateStageKind); ok {
// For aggregate stage, buildEventTimeBundle may have saved bundle panes with empty string as key.
// Move it under the correct BundleID now.
ss.bundlePanes[bundID] = ss.bundlePanes[""]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could probably avoid the sneaky empty-string handling by returning a panesInBundle map from buildEventTimeBundle, like we are doing for holdsInBundle. Similar "refused bequests" to non-stateful stages needing to return an empty holds map or empty newKeys.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@github-actions github-actions Bot added python and removed python labels Sep 19, 2025
Comment thread sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go Outdated
@github-actions github-actions Bot added python and removed python labels Sep 19, 2025
@shunping shunping requested a review from lostluck September 19, 2025 04:42
Copy link
Copy Markdown
Contributor

@lostluck lostluck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM
Thanks!

@shunping shunping merged commit da57e58 into apache:master Sep 19, 2025
110 of 115 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants