Skip to content

Watermark can become unavailable for executors while it's updated with new values #18426

@kennknowles

Description

@kennknowles

The watermark is updated by the driver like so:


blockManager.removeBlock(WATERMARKS_BLOCK_ID, true);
blockManager.putSingle(WATERMARKS_BLOCK_ID, newValues,
StorageLevel.MEMORY_ONLY(), 
true);

However, these operations are neither synchronous nor atomic, so if an executor requests the watermark values before they are actually put but after they have been removed, it may get a null as a response, which will make it default to negative infinity as the watermark. This can result in an erroneous results.

To overcome this in tests, a workaround which assumes a single JVM setting is used. In such a setting the watermark values are stored in a static member, accessible by both the driver and the executors, bypassing the BlockManager#putSingle(...) BlockManager#removeBlock(...) APIs.

Imported from Jira BEAM-2789. Original Jira may contain additional context.
Reported by: staslev.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions