Skip to content

[Bug]: AfterCount trigger doesn't work with Sessions window in tests on DirectRunner #27636

@olokshyn

Description

@olokshyn

What happened?

Python SDK version: 2.48.0

It is not possible to fire the Sessions window early based on the number of elements in it using the AfterCount trigger. The window only fires when the watermark advances and closes it.

Related issue: #20813. It is supposed to be fixed, but reproduces for me.

The code snippet provided below is a test that:

  1. Defines a one-day-long Sessions window.
  2. Defines the trigger.AfterCount(1) trigger that is supposed to fire the window early on every element.
  3. Combines the results of the window using a custom combiner that just puts all results in a single list.
  4. Note that there are three elements: two of them fall within a single window, and the third one sits in its own window.

Note that when the commented trigger=trigger.AfterEach(trigger.AfterCount(1), trigger.AfterWatermark()), line is used instead, the result is the same.

Expected: All three elements will be processed separately because the window fires after every single element:

[
    ("user1", [1]),
    ("user1", [2]),
    ("user1", [5]),
]

Actual: The first two elements are processed together because they fall in the same window based on the watermark:

[
    ("user1", [1, 2]),
    ("user1", [5]),
]

The test that reproduces the issue:

from datetime import datetime, timedelta, timezone
from typing import Iterable

import pytest
import apache_beam as beam
from apache_beam.transforms import trigger
from apache_beam.transforms.window import Sessions
from apache_beam.pipeline_test import TestPipeline
from apache_beam.pipeline import PipelineOptions
from apache_beam.testing.test_stream import ElementEvent, TestStream, TimestampedValue, WatermarkEvent
from apache_beam.testing.util import assert_that, equal_to


base_datetime = datetime(2023, 7, 7, 10, 30, 0, 0, timezone.utc)


@pytest.mark.parametrize(
    "events, expected",
    [
        (
            [
                WatermarkEvent(0),
                ElementEvent(
                    [
                        TimestampedValue(
                            ("user1", 1),
                            base_datetime.timestamp(),
                        ),
                    ]
                ),
                WatermarkEvent(base_datetime.timestamp() + 5),
                ElementEvent(
                    [
                        TimestampedValue(
                            ("user1", 2),
                            (base_datetime + timedelta(minutes=10)).timestamp(),
                        ),
                    ]
                ),
                WatermarkEvent((base_datetime + timedelta(minutes=10)).timestamp() + 5),
                ElementEvent(
                    [
                        TimestampedValue(
                            ("user1", 5),
                            (base_datetime + timedelta(days=1, minutes=20)).timestamp(),
                        ),
                    ]
                ),
                WatermarkEvent((base_datetime + timedelta(days=1, minutes=20)).timestamp() + 5),
            ],
            [
                ("user1", [1]),
                ("user1", [2]),
                ("user1", [5]),
            ],
        ),
    ],
)
def test_after_count_trigger(events, expected):
    with TestPipeline(options=PipelineOptions(allow_unsafe_triggers=True)) as p:
        test_stream = p | TestStream(events=events, output_tags={None})

        pcoll = (
            test_stream
            | "Window"
            >> beam.WindowInto(
                Sessions(int(timedelta(days=1).total_seconds())),
                # trigger=trigger.AfterEach(trigger.AfterCount(1), trigger.AfterWatermark()),
                trigger=trigger.AfterCount(1),
                accumulation_mode=beam.trigger.AccumulationMode.ACCUMULATING,
            )
            | "Combine" >> beam.CombinePerKey(CustomCombine())
        )
        pcoll | beam.Map(print)

        assert_that(pcoll, equal_to(expected))


class CustomCombine(beam.CombineFn):
    def create_accumulator(self) -> list:
        return []

    def add_input(self, accumulator: list, input) -> list:
        return accumulator + [input]

    def merge_accumulators(self, accumulators: Iterable[list]) -> list:
        res = [x for acc in accumulators for x in acc]
        return res

    def extract_output(self, accumulator):
        return accumulator

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

No one assigned

    Labels

    P2bugdone & doneIssue has been reviewed after it was closed for verification, followups, etc.pythonstreamingIssues pertaining to streaming functionality

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions