Scenario:
I want to count how many events A and B I am getting for given 30 window. I require that every pane has all events types (A and B) with corresponding counters - this is why I am using Combine.globally.
The calculation logic works fine the problem is with writing files. The files are not written.
For debugging purposes I created some transformations (Simulate ApplyShardLabel, Simulate GroupIntoShards etc) that mimics that logic implemented by WriteFiles.
If you push string "A" and "B" to kinesis stream I am seeing the following system.out from the job:
AFTER COMBINE: {A=1, B=1}
According to my test transformations I should also see:
AFTER COMBINE: {A=1, B=1}
Simulating ApplyShardLabel
Simulating finalizing writer: KV{null, [KV{0,
[{A=1, B=1}]}]}
Using DirectRunner and Beam 2.0.0. When I switch to Beam 2.1.0 I see the expected debug output and files being written out.
I think that there is some issue with AfterSynchronizedProcessingTime trigger support.
I cannot replicate the issue when using TestStream
The test code can be found at
https://gist.github.com/pbartoszek/9dd58c4fcfc5171eafba3520cb3040fa
Imported from Jira BEAM-3152. Original Jira may contain additional context.
Reported by: pawelbartoszek.
Scenario:
I want to count how many events A and B I am getting for given 30 window. I require that every pane has all events types (A and B) with corresponding counters - this is why I am using Combine.globally.
The calculation logic works fine the problem is with writing files. The files are not written.
For debugging purposes I created some transformations (Simulate ApplyShardLabel, Simulate GroupIntoShards etc) that mimics that logic implemented by WriteFiles.
If you push string "A" and "B" to kinesis stream I am seeing the following system.out from the job:
According to my test transformations I should also see:
Using DirectRunner and Beam 2.0.0. When I switch to Beam 2.1.0 I see the expected debug output and files being written out.
I think that there is some issue with AfterSynchronizedProcessingTime trigger support.
I cannot replicate the issue when using
TestStreamThe test code can be found at
https://gist.github.com/pbartoszek/9dd58c4fcfc5171eafba3520cb3040fa
Imported from Jira BEAM-3152. Original Jira may contain additional context.
Reported by: pawelbartoszek.