Skip to content

Conversation

@bvarghese1
Copy link
Contributor

What is the purpose of the change

  • Non-Time over windows uses multiple states to manage data
  • These states define ttl independently which causes out of sync expiry for all associated states for the same key thus leading to NullPointerException
  • To fix this, register a cleanup timer state and use that to expire all associated state for a key when the timer fires. This guarantees expiration of all associated keys across states.

Brief change log

  • Use KeyedProcessFunctionWithCleanupState
  • Register cleanup timer during processElement() for each key
  • When timer expires via onTimer(), cleanup all associated state for the same key

Verifying this change

This change added tests and can be verified as follows:

  • Updated tests that validates how records are emitted after state cleanup using TestHarness

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

@flinkbot
Copy link
Collaborator

flinkbot commented Jan 28, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

- Non-Time over windows uses multiple states to manage data
- These states define ttl independently which causes out of sync expiry for
  all associated states for the same key thus leading to NullPointerException
- To fix this, register a cleanup timer state and use that to expire
  all associated state for a key when the timer fires. This guarantees
  expiration of all associated keys across states.
@bvarghese1 bvarghese1 force-pushed the fix_over_window_state_ttl branch from f3d9505 to ade3f38 Compare January 28, 2026 21:51

// Initialize state to maintain id counter
idStateDescriptor = new ValueStateDescriptor<Long>("idState", Long.class);
if (ttlConfig.isEnabled()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition to the fix could we have a validation check when defining a state ttl on NonTime Over Windows, and error in this case, so the user is not led to think that ttl might be valid in this case.

Copy link
Contributor

@fhueske fhueske left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix @bvarghese1!
Looks mostly good, left a few minor comments.

Thanks, Fabian

Comment on lines +294 to +295
cleanupState(idState, valueMapState, accMapState, sortedListState);
resetAndCleanupAggFuncs();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should check that the firing timer is actually a clean up timer, even though this operator does not register any other timers. This makes it a bit safer and the intent more clear.

  • KeyedProcessFunctionWithCleanupState.isProcessingTimeTimer()
  • KeyedProcessFunctionWithCleanupState.needToCleanupState()

generatedSortKeyComparator.newInstance(
getRuntimeContext().getUserCodeClassLoader());

StateTtlConfig ttlConfig = createTtlConfig(stateRetentionTime);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be removed (including import of class and static function)?

}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<RowData> out)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add some comments explaining that we are cleaning up state due to State TTL?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants