Skip to content

[Bug] Possible Realtime consumer semaphore leak: orphaned RealtimeSegmentDataManager can block the successor segment indefinitely #18867

Description

@anuragrai16

Summary

For realtime tables whose ParallelSegmentConsumptionPolicy is DISALLOW_ALWAYS (e.g. partial-upsert, non-pauseless), each partition uses a single-permit consumer semaphore (ConsumerCoordinator, Semaphore(1)) so that only one segment per partition consumes at a time. The permit is acquired when a consumer thread starts and is released only through the offload path (doOffload → closeStreamConsumer → releaseConsumerSemaphore).

Under certain transition orderings, a RealtimeSegmentDataManager (RSDM) can be left running in memory holding the semaphore while Helix's IdealState shows its segment as OFFLINE. No state transition is then able to reach it, so the next segment blocks forever on ConsumerCoordinator.acquire() and the partition stops ingesting until the stuck segment is manually deleted. We have seen this issue in production a few times now stemming from some memory pressure on server nodes causing ZK resets.

How it happens

  • T — Initial attempt fails during construction. An OFFLINE→CONSUMING for segment S throws while building the consuming segment (a transient resource error, e.g. a failed mmap/allocation). The constructor's catch block schedules a background "janitor" thread that sleeps ~30s and then calls segmentStoppedConsuming to ask the controller to mark S OFFLINE in IdealState.
  • T+1 — A retry succeeds. Before the janitor threaed fires, a retried OFFLINE→CONSUMING for the same segment S succeeds. A new RSDM is registered and acquires the semaphore, consumption is healthy.
  • T+2 — The stale janitor fires. The janitor from the failed attempt still calls segmentStoppedConsuming for S. The controller flips IdealState for S to OFFLINE, even though a healthy consumer is now running.
  • T+3 — Cleanup is lost. Normally the server would receive CONSUMING→OFFLINE and offload (releasing the semaphore). If that cleanup transition is not delivered/applied — e.g. the participant's ZooKeeper session changes around this moment, destroying the session-scoped CurrentState before the transition is generated — the in-memory RSDM survives (the consumer thread and semaphore are not tied to the ZK session) while Helix considers S settled at OFFLINE. The consumer is now a "ghost": running, holding the semaphore, invisible to Helix.
  • Result. The successor segment S+1 gets OFFLINE→CONSUMING, but its consumer blocks indefinitely on acquire(). Ingestion for that partition stalls until S is manually deleted (OFFLINE→DROPPED), which offloads the in-memory RSDM and releases the semaphore.

A related, independent gap: even when an ERROR→OFFLINE transition is delivered, the server's onBecomeOfflineFromError() only logs — it does not offload the segment. So a semaphore held by a failed CONSUMING→ONLINE path can also leak there.

Impact

  • A single partition's realtime ingestion can stall indefinitely (successor segments never acquire the semaphore).
  • For upsert tables, the orphaned consumer can also retain primary-key ownership, which can cause missing/incorrect query results until the stuck segment is removed.
  • Recovery currently requires manual intervention (deleting the segment).

Proposed changes

Note: These fixes are reduces the probability window of the above happening to near zero but doesn't completely fixes it. The compete fix would be to introduce a Reconciliation loop in the Server to catch these ghost segments or override the reset()logic to handle the cleanup of ZK session change in a proper manner. But these are high risk changes requiring a design and review.

1. ERROR→OFFLINE cleanup

Make SegmentOnlineOfflineStateModel.onBecomeOfflineFromError() call offloadSegment(...) (wrapped in try/catch, not rethrowing), mirroring onBecomeOfflineFromConsuming().

Why this fixes it: offloadSegment() runs doOffload → closeStreamConsumer → releaseConsumerSemaphore, so any RSDM/semaphore left behind by a failed transition is cleaned up when the segment moves ERROR→OFFLINE. It is safe to call unconditionally, offloadSegment() is null-safe and releaseConsumerSemaphore() uses an idempotent CAS.

2. Janitor thread guard before the 30s event fires

In the init-error janitor path (postStopConsumedMsg), before sending segmentStoppedConsuming, skip the notification if a segment data manager for the same segment is already registered and is not this (failed) instance — i.e. a retry has already succeeded.

Why this fixes it: a failed attempt never registers itself in the segment map, so a registered manager for the same segment name means a newer attempt is live; suppressing the stale "stopped consuming" prevents the wrongful IdealState→OFFLINE flip at its source.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething is not working as expectedingestionRelated to data ingestion pipelinereal-timeRelated to realtime table ingestion and serving

    Type

    No type

    Fields

    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions