Skip to content

Commit

Permalink
Fix kafka_franz input batching period
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail committed Oct 3, 2023
1 parent 82891ff commit 01a817e
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 2 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ All notable changes to this project will be documented in this file.

## Unreleased

## 4.22.0 - 2023-10-03

### Added

- The `-e/--env-file` cli flag for importing environment variable files now supports glob patterns.
Expand All @@ -23,6 +25,7 @@ All notable changes to this project will be documented in this file.
- The `sqs` input now periodically refreshes the visibility timeout of messages that take a significant amount of time to process.
- The `ts_add_iso8601` and `ts_sub_iso8601` bloblang methods now return the correct error for certain invalid durations.
- The `discord` output no longer ignores structured message fields containing underscores.
- Fixed an issue where the `kafka_franz` input was ignoring batching periods and stalling.

### Changed

Expand Down
6 changes: 5 additions & 1 deletion internal/batch/policy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,11 @@ func (p *Batcher) UntilNext() time.Duration {
if p.period <= 0 {
return -1
}
return time.Until(p.lastBatch.Add(p.period))
tUntil := time.Until(p.lastBatch.Add(p.period))
if tUntil < 0 {
tUntil = 0
}
return tUntil
}

//------------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/kafka/input_kafka_franz.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ func (p *partitionTracker) loop() {
return
}

if sendBatch, _ := p.batcher.Flush(closeAtLeisureCtx); len(sendBatch) == 0 {
if sendBatch, _ = p.batcher.Flush(closeAtLeisureCtx); len(sendBatch) == 0 {
return
}
sendRecord = p.topBatchRecord
Expand Down

0 comments on commit 01a817e

Please sign in to comment.