Skip to content

Commit a1fe7ac

Browse files
[Enhancement] Allow block-builder to operate over empty partitions (#5581)
In case partition is empty during consumption cycle, it is possible that previously empty partition started to receive data
1 parent 875be0e commit a1fe7ac

File tree

3 files changed

+173
-30
lines changed

3 files changed

+173
-30
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ Additionally the `compaction_tenant_backoff_total` metric has been renamed to `c
5252
* [ENHANCEMENT] Make block ordering deterministic [#5411](https://github.com/grafana/tempo/pull/5411) (@rajiv-singh)
5353
* [ENHANCEMENT] Improve exemplar selection in quantile_over_time() [#5278](https://github.com/grafana/tempo/pull/5278) (@zalegrala)
5454
* [ENHANCEMENT] Add live store to jsonnet lib [#5591](https://github.com/grafana/tempo/pull/5591) [#5606](https://github.com/grafana/tempo/pull/5606) [#5609](https://github.com/grafana/tempo/pull/5609) (@mapno)
55+
* [ENHANCEMENT] Allow block-builder to operate over empty partitions [#5581](https://github.com/grafana/tempo/pull/5581) (@ruslan-mikhailov)
5556
* [BUGFIX] Correctly apply trace idle period in ingesters and add the concept of trace live period. [#5346](https://github.com/grafana/tempo/pull/5346/files) (@joe-elliott)
5657
* [BUGFIX] Fix invalid YAML output from /status/runtime_config endpoint by adding document separator. [#5146](https://github.com/grafana/tempo/issues/5146)
5758
* [BUGFIX] Fix race condition between compaction provider and backend-scheduler [#5409](https://github.com/grafana/tempo/pull/5409) (@zalegrala)

modules/blockbuilder/blockbuilder.go

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ const (
3333
ConsumerGroup = "block-builder"
3434
pollTimeout = 2 * time.Second
3535
cutTime = 10 * time.Second
36+
emptyPartitionEndOffset = 0 // partition has no records
37+
commitOffsetAtEnd = -1 // offset is at the end of partition
38+
commitOffsetAtStart = -2 // offset is at the start of partition
3639
)
3740

3841
var (
@@ -113,21 +116,35 @@ type BlockBuilder struct {
113116
type partitionState struct {
114117
// Partition number
115118
partition int32
116-
// Start and end offset
117-
commitOffset, endOffset int64
119+
// commitOffset is the last committed consumer offset for this partition
120+
// it is maintained per consumer group
121+
commitOffset int64
122+
// endOffset is the latest offset for this partition
123+
// it represents the last message written by producers
124+
endOffset int64
118125
// Last committed record timestamp
119126
lastRecordTs time.Time
120127
}
121128

122129
func (p partitionState) getStartOffset() kgo.Offset {
123-
if p.commitOffset >= 0 {
130+
if p.commitOffset > commitOffsetAtEnd {
124131
return kgo.NewOffset().At(p.commitOffset)
125132
}
133+
// If commit offset is AtEnd (-1), it nevertheless will consume from the start.
134+
// This is a workaround for franz-go and default Kafka behaviour:
135+
// in case consumer is new and has no committed offsets, it will start consuming from the end,
136+
// while for block builder, it should consume from the earliest record.
137+
// The workaround is dirty and can break the consumer if it starts returning AtEnd (-1) for
138+
// already running consumer.
139+
// TODO: replace the workaround with proper new consumer offset initialization
140+
// if p.commitOffset == commitOffsetAtEnd {
141+
// return kgo.NewOffset().AtEnd()
142+
// }
126143
return kgo.NewOffset().AtStart()
127144
}
128145

129146
func (p partitionState) hasRecords() bool {
130-
return p.endOffset >= -1
147+
return p.endOffset > emptyPartitionEndOffset
131148
}
132149

133150
func New(
@@ -265,7 +282,12 @@ func (b *BlockBuilder) consume(ctx context.Context) (time.Duration, error) {
265282

266283
// First iteration over all the assigned partitions to get their current lag in time
267284
for i, p := range ps {
268-
if !p.hasRecords() { // No records, we can skip the partition
285+
if !p.hasRecords() { // No records, skip for the first iteration
286+
// We treat the partition as updated through now,
287+
// and will check it again after ConsumeCycleDuration has elapsed
288+
ps[i].lastRecordTs = time.Now()
289+
ps[i].commitOffset = 0 // always start at beginning
290+
level.Info(b.logger).Log("msg", "partition has no records", "partition", p.partition)
269291
continue
270292
}
271293
lastRecordTs, commitOffset, err := b.consumePartition(ctx, p)
@@ -359,7 +381,7 @@ outer:
359381
break
360382
}
361383
metricFetchErrors.WithLabelValues(partLabel).Inc()
362-
return time.Time{}, -1, err
384+
return time.Time{}, commitOffsetAtEnd, err
363385
}
364386

365387
if fetches.Empty() {
@@ -410,7 +432,7 @@ outer:
410432

411433
err := b.pushTraces(rec.Timestamp, rec.Key, rec.Value, writer)
412434
if err != nil {
413-
return time.Time{}, -1, err
435+
return time.Time{}, commitOffsetAtEnd, err
414436
}
415437

416438
processedRecords++
@@ -443,21 +465,21 @@ outer:
443465
span.AddEvent("no data")
444466
// No data means we are caught up
445467
ingest.SetPartitionLagSeconds(group, ps.partition, 0)
446-
return time.Time{}, -1, nil
468+
return time.Time{}, commitOffsetAtEnd, nil
447469
}
448470

449471
// Record lag at the end of the consumption
450472
ingest.SetPartitionLagSeconds(group, ps.partition, time.Since(lastRec.Timestamp))
451473

452474
err = writer.flush(ctx, b.reader, b.writer, b.compactor)
453475
if err != nil {
454-
return time.Time{}, -1, err
476+
return time.Time{}, commitOffsetAtEnd, err
455477
}
456478

457479
offset := kadm.NewOffsetFromRecord(lastRec)
458480
err = b.commitOffset(ctx, offset, group, ps.partition)
459481
if err != nil {
460-
return time.Time{}, -1, err
482+
return time.Time{}, commitOffsetAtEnd, err
461483
}
462484

463485
level.Info(b.logger).Log(
@@ -575,7 +597,7 @@ func (b *BlockBuilder) getPartitionOffsets(ctx context.Context, partitionIDs []i
575597
func (b *BlockBuilder) getPartitionState(partition int32, commits kadm.OffsetResponses, endsOffsets kadm.ListedOffsets) partitionState {
576598
var (
577599
topic = b.cfg.IngestStorageConfig.Kafka.Topic
578-
ps = partitionState{partition: partition, commitOffset: -1, endOffset: -2}
600+
ps = partitionState{partition: partition, commitOffset: commitOffsetAtEnd, endOffset: emptyPartitionEndOffset}
579601
)
580602

581603
lastCommit, found := commits.Lookup(topic, partition)

modules/blockbuilder/blockbuilder_test.go

Lines changed: 139 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func TestBlockbuilder_lookbackOnNoCommit(t *testing.T) {
6464
})
6565

6666
client := newKafkaClient(t, cfg.IngestStorageConfig.Kafka)
67-
producedRecords := sendReq(t, ctx, client, util.FakeTenantID)
67+
producedRecords := sendReq(ctx, t, client, util.FakeTenantID)
6868

6969
// Wait for record to be consumed and committed.
7070
require.Eventually(t, func() bool {
@@ -246,7 +246,7 @@ func TestBlockbuilder_receivesOldRecords(t *testing.T) {
246246
})
247247

248248
client := newKafkaClient(t, cfg.IngestStorageConfig.Kafka)
249-
producedRecords := sendReq(t, ctx, client, util.FakeTenantID)
249+
producedRecords := sendReq(ctx, t, client, util.FakeTenantID)
250250

251251
// Wait for record to be consumed and committed.
252252
require.Eventually(t, func() bool {
@@ -385,7 +385,7 @@ func TestBlockbuilder_retries_on_retriable_commit_error(t *testing.T) {
385385
logger := test.NewTestingLogger(t)
386386

387387
client := newKafkaClient(t, cfg.IngestStorageConfig.Kafka)
388-
producedRecords := sendReq(t, ctx, client, util.FakeTenantID)
388+
producedRecords := sendReq(ctx, t, client, util.FakeTenantID)
389389
lastRecordOffset := producedRecords[len(producedRecords)-1].Offset
390390

391391
b, err := New(cfg, logger, newPartitionRingReader(), &mockOverrides{}, store)
@@ -443,7 +443,7 @@ func TestBlockbuilder_retries_on_commit_error(t *testing.T) {
443443
logger := test.NewTestingLogger(t)
444444

445445
client := newKafkaClient(t, cfg.IngestStorageConfig.Kafka)
446-
producedRecords := sendReq(t, ctx, client, util.FakeTenantID)
446+
producedRecords := sendReq(ctx, t, client, util.FakeTenantID)
447447
lastRecordOffset := producedRecords[len(producedRecords)-1].Offset
448448

449449
b, err := New(cfg, logger, newPartitionRingReader(), &mockOverrides{}, store)
@@ -490,7 +490,7 @@ func TestBlockbuilder_noDoubleConsumption(t *testing.T) {
490490
client := newKafkaClient(t, cfg.IngestStorageConfig.Kafka)
491491

492492
// Send a single record
493-
producedRecords := sendReq(t, ctx, client, util.FakeTenantID)
493+
producedRecords := sendReq(ctx, t, client, util.FakeTenantID)
494494
lastRecordOffset := producedRecords[len(producedRecords)-1].Offset
495495

496496
// Create the block builder
@@ -510,7 +510,7 @@ func TestBlockbuilder_noDoubleConsumption(t *testing.T) {
510510
requireLastCommitEquals(t, ctx, client, lastRecordOffset+1)
511511

512512
// Send another record
513-
newRecords := sendReq(t, ctx, client, util.FakeTenantID)
513+
newRecords := sendReq(ctx, t, client, util.FakeTenantID)
514514
newRecordOffset := newRecords[len(newRecords)-1].Offset
515515

516516
// Wait for the new record to be consumed and committed
@@ -539,7 +539,7 @@ func TestBlockBuilder_honor_maxBytesPerCycle(t *testing.T) {
539539
{
540540
name: "Limited to 1 bytes per cycle",
541541
maxBytesPerCycle: 1,
542-
expectedCommits: 1,
542+
expectedCommits: 2,
543543
expectedWrites: 2,
544544
},
545545
{
@@ -587,8 +587,8 @@ func TestBlockBuilder_honor_maxBytesPerCycle(t *testing.T) {
587587

588588
client := newKafkaClient(t, cfg.IngestStorageConfig.Kafka)
589589
// We send two records with a size less than 30KB
590-
sendReq(t, ctx, client, util.FakeTenantID)
591-
producedRecords := sendReq(t, ctx, client, util.FakeTenantID)
590+
sendReq(ctx, t, client, util.FakeTenantID)
591+
producedRecords := sendReq(ctx, t, client, util.FakeTenantID)
592592

593593
require.Eventually(t, func() bool {
594594
return kafkaCommits.Load() == tc.expectedCommits
@@ -731,8 +731,8 @@ func TestBlockbuilder_marksOldBlocksCompacted(t *testing.T) {
731731
badTenantID = "2"
732732
producedRecords []*kgo.Record
733733
)
734-
producedRecords = append(producedRecords, sendReq(t, ctx, client, goodTenantID)...)
735-
producedRecords = append(producedRecords, sendReq(t, ctx, client, badTenantID)...)
734+
producedRecords = append(producedRecords, sendReq(ctx, t, client, goodTenantID)...)
735+
producedRecords = append(producedRecords, sendReq(ctx, t, client, badTenantID)...)
736736
lastRecordOffset := producedRecords[len(producedRecords)-1].Offset
737737

738738
// Simulate failures on the first cycle
@@ -824,9 +824,13 @@ func TestBlockbuilder_gracefulShutdown(t *testing.T) {
824824
store := newStore(ctx, t)
825825
cfg := blockbuilderConfig(t, address, []int32{0}) // Fix: Properly specify partition
826826

827+
// Send initial traces to ensure the partition has records
828+
client := newKafkaClient(t, cfg.IngestStorageConfig.Kafka)
829+
sendReq(ctx, t, client, util.FakeTenantID)
830+
827831
// Start sending traces in the background
828832
go func() {
829-
sendTracesFor(t, ctx, newKafkaClient(t, cfg.IngestStorageConfig.Kafka), 60*time.Second, time.Second)
833+
sendTracesFor(t, ctx, client, 60*time.Second, time.Second)
830834
}()
831835

832836
b, err := New(cfg, test.NewTestingLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
@@ -1004,15 +1008,30 @@ func countFlushedTraces(store storage.Store) int {
10041008
return count
10051009
}
10061010

1011+
type reqOpts struct {
1012+
partition int32
1013+
time time.Time
1014+
tenantID string
1015+
}
1016+
1017+
func (r *reqOpts) applyDefaults() {
1018+
if r.tenantID == "" {
1019+
r.tenantID = util.FakeTenantID
1020+
}
1021+
if r.time.IsZero() {
1022+
r.time = time.Now()
1023+
}
1024+
}
1025+
10071026
// nolint: revive
1008-
func sendReq(t testing.TB, ctx context.Context, client *kgo.Client, tenantID string) []*kgo.Record {
1027+
func sendReqWithOpts(ctx context.Context, t testing.TB, client *kgo.Client, opts reqOpts) []*kgo.Record {
10091028
traceID := generateTraceID(t)
1029+
opts.applyDefaults()
10101030

1011-
now := time.Now()
1012-
startTime := uint64(now.UnixNano())
1013-
endTime := uint64(now.Add(time.Second).UnixNano())
1031+
startTime := uint64(opts.time.UnixNano())
1032+
endTime := uint64(opts.time.Add(time.Second).UnixNano())
10141033
req := test.MakePushBytesRequest(t, 10, traceID, startTime, endTime)
1015-
records, err := ingest.Encode(0, tenantID, req, 1_000_000)
1034+
records, err := ingest.Encode(opts.partition, opts.tenantID, req, 1_000_000)
10161035
require.NoError(t, err)
10171036

10181037
res := client.ProduceSync(ctx, records...)
@@ -1021,6 +1040,10 @@ func sendReq(t testing.TB, ctx context.Context, client *kgo.Client, tenantID str
10211040
return records
10221041
}
10231042

1043+
func sendReq(ctx context.Context, t testing.TB, client *kgo.Client, tenantID string) []*kgo.Record {
1044+
return sendReqWithOpts(ctx, t, client, reqOpts{partition: 0, time: time.Now(), tenantID: tenantID})
1045+
}
1046+
10241047
// nolint: revive,unparam
10251048
func sendTracesFor(t *testing.T, ctx context.Context, client *kgo.Client, dur, interval time.Duration) []*kgo.Record {
10261049
ticker := time.NewTicker(interval)
@@ -1038,7 +1061,7 @@ func sendTracesFor(t *testing.T, ctx context.Context, client *kgo.Client, dur, i
10381061
case <-timer.C: // Exit the function when the timer is done
10391062
return producedRecords
10401063
case <-ticker.C:
1041-
records := sendReq(t, ctx, client, util.FakeTenantID)
1064+
records := sendReq(ctx, t, client, util.FakeTenantID)
10421065
producedRecords = append(producedRecords, records...)
10431066
}
10441067
}
@@ -1112,7 +1135,7 @@ func BenchmarkBlockBuilder(b *testing.B) {
11121135
b.StopTimer()
11131136
size := 0
11141137
for i := 0; i < 1000; i++ {
1115-
for _, r := range sendReq(b, ctx, client, util.FakeTenantID) {
1138+
for _, r := range sendReq(ctx, b, client, util.FakeTenantID) {
11161139
size += len(r.Value)
11171140
}
11181141
}
@@ -1124,3 +1147,100 @@ func BenchmarkBlockBuilder(b *testing.B) {
11241147
b.SetBytes(int64(size))
11251148
}
11261149
}
1150+
1151+
type slowStore struct {
1152+
storage.Store
1153+
wait chan struct{}
1154+
}
1155+
1156+
func (s *slowStore) WriteBlock(ctx context.Context, block tempodb.WriteableBlock) error {
1157+
s.wait <- struct{}{} // send a signal to a goroutine
1158+
<-s.wait // wait for the signal from the goroutine
1159+
return s.Store.WriteBlock(ctx, block)
1160+
}
1161+
1162+
// lock waits for the signal from WriteBlock locking the operation
1163+
func (s *slowStore) lock() {
1164+
<-s.wait
1165+
}
1166+
1167+
// unlock sends a signal to WriteBlock unlocking the operation
1168+
func (s *slowStore) unlock() {
1169+
s.wait <- struct{}{}
1170+
}
1171+
1172+
// TestBlockbuilder_twoPartitions_secondEmpty verifies correct handling of two Kafka
1173+
// partitions where the second partition is initially empty and receives data later.
1174+
// It uses a channel-gated store to step consumption, injects records between consume
1175+
// cycles (sleeping longer than ConsumeCycleDuration), and asserts that:
1176+
// - both partitions are assigned,
1177+
// - three blocks are flushed (p0 initial, p0 later, p1 later), and
1178+
// - committed offsets equal number of sent records.
1179+
// The test highly coupled with the blockbuilder implementation. In case it stuck
1180+
// due to channel, it is advised to debug consume cycle step by step.
1181+
func TestBlockbuilder_twoPartitions_secondEmpty(t *testing.T) {
1182+
ctx, cancel := context.WithCancelCause(context.Background())
1183+
t.Cleanup(func() { cancel(errors.New("test done")) })
1184+
reqTime := time.Now().Add(-1 * time.Minute) // to be sure it won't be filtered out by cycle duration check
1185+
1186+
// Create a Kafka cluster with 2 partitions
1187+
_, address := testkafka.CreateCluster(t, 2, testTopic)
1188+
1189+
// Setup block-builder
1190+
ch := make(chan struct{})
1191+
store := &slowStore{Store: newStore(ctx, t), wait: ch}
1192+
cfg := blockbuilderConfig(t, address, []int32{0, 1})
1193+
cfg.ConsumeCycleDuration = time.Second
1194+
partitionRing := newPartitionRingReaderWithPartitions(map[int32]ring.PartitionDesc{
1195+
0: {Id: 0, State: ring.PartitionActive},
1196+
1: {Id: 1, State: ring.PartitionActive},
1197+
})
1198+
1199+
client := newKafkaClient(t, cfg.IngestStorageConfig.Kafka)
1200+
// First, produce to partition 0
1201+
sendReqWithOpts(ctx, t, client, reqOpts{partition: 0, time: reqTime, tenantID: util.FakeTenantID})
1202+
1203+
// And only then create block builder
1204+
b, err := New(cfg, test.NewTestingLogger(t), partitionRing, &mockOverrides{}, store)
1205+
require.NoError(t, err)
1206+
1207+
// Verify builder is listening to both partitions
1208+
parts := b.getAssignedPartitions()
1209+
require.ElementsMatch(t, []int32{0, 1}, parts)
1210+
1211+
require.NoError(t, services.StartAndAwaitRunning(ctx, b))
1212+
t.Cleanup(func() {
1213+
require.NoError(t, services.StopAndAwaitTerminated(ctx, b))
1214+
})
1215+
1216+
// after initial consumption, add more records
1217+
store.lock()
1218+
sendReqWithOpts(ctx, t, client, reqOpts{partition: 0, time: reqTime, tenantID: util.FakeTenantID})
1219+
store.unlock()
1220+
1221+
// after processing the first partition, add more records to the second partition
1222+
store.lock()
1223+
sendReqWithOpts(ctx, t, client, reqOpts{partition: 1, time: reqTime, tenantID: util.FakeTenantID})
1224+
store.unlock()
1225+
1226+
// wait for the second partition to finish
1227+
store.lock()
1228+
store.unlock()
1229+
1230+
// Wait for the block to be flushed (one block per each consumePartition call)
1231+
require.Eventually(t, func() bool {
1232+
return len(store.BlockMetas(util.FakeTenantID)) == 3 && countFlushedTraces(store) == 3
1233+
}, 20*time.Second, time.Second)
1234+
1235+
// Verify offsets
1236+
offsets, err := kadm.NewClient(client).FetchOffsetsForTopics(ctx, testConsumerGroup, testTopic)
1237+
require.NoError(t, err)
1238+
for partition, expectedOffset := range map[int32]int64{
1239+
0: 2,
1240+
1: 1,
1241+
} {
1242+
offset, ok := offsets.Lookup(testTopic, partition)
1243+
require.True(t, ok, "partition %d should have a committed offset", partition)
1244+
require.Equal(t, expectedOffset, offset.At)
1245+
}
1246+
}

0 commit comments

Comments
 (0)