Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add replication factor to block meta [1 of 3] #3628

Merged
merged 10 commits into from
May 20, 2024
4 changes: 2 additions & 2 deletions modules/generator/processor/localblocks/livetraces.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ func (l *liveTraces) Push(traceID []byte, batch *v1.ResourceSpans, max uint64) b
return true
}

func (l *liveTraces) CutIdle(idleSince time.Time) []*liveTrace {
func (l *liveTraces) CutIdle(idleSince time.Time, immediate bool) []*liveTrace {
res := []*liveTrace{}

for k, tr := range l.traces {
if tr.timestamp.Before(idleSince) {
if tr.timestamp.Before(idleSince) || immediate {
res = append(res, tr)
delete(l.traces, k)
}
Expand Down
14 changes: 8 additions & 6 deletions modules/generator/processor/localblocks/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,11 +577,7 @@ func (p *Processor) cutIdleTraces(immediate bool) error {
metricLiveTraces.WithLabelValues(p.tenant).Set(float64(len(p.liveTraces.traces)))

since := time.Now().Add(-p.Cfg.TraceIdlePeriod)
if immediate {
since = time.Time{}
}

tracesToCut := p.liveTraces.CutIdle(since)
tracesToCut := p.liveTraces.CutIdle(since, immediate)

p.liveTracesMtx.Unlock()

Expand Down Expand Up @@ -634,7 +630,13 @@ func (p *Processor) writeHeadBlock(id common.ID, tr *tempopb.Trace) error {
}

func (p *Processor) resetHeadBlock() error {
block, err := p.wal.NewBlockWithDedicatedColumns(uuid.New(), p.tenant, model.CurrentEncoding, p.overrides.DedicatedColumns(p.tenant))
meta := &backend.BlockMeta{
BlockID: uuid.New(),
TenantID: p.tenant,
DedicatedColumns: p.overrides.DedicatedColumns(p.tenant),
ReplicationFactor: 1,
}
block, err := p.wal.NewBlock(meta, model.CurrentEncoding)
if err != nil {
return err
}
Expand Down
52 changes: 50 additions & 2 deletions modules/generator/processor/localblocks/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/grafana/tempo/pkg/tempopb"
v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1"
"github.com/grafana/tempo/pkg/util/test"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/encoding/common"
"github.com/grafana/tempo/tempodb/wal"
"github.com/stretchr/testify/require"
)

type mockOverrides struct{}
Expand Down Expand Up @@ -153,3 +152,52 @@ func TestProcessorDoesNotRace(t *testing.T) {
wg.Wait()
p.Shutdown(ctx)
}

func TestReplicationFactor(t *testing.T) {
wal, err := wal.New(&wal.Config{
Filepath: t.TempDir(),
Version: encoding.DefaultEncoding().Version(),
})
require.NoError(t, err)

cfg := Config{
FlushCheckPeriod: time.Minute,
TraceIdlePeriod: time.Minute,
CompleteBlockTimeout: time.Minute,
Block: &common.BlockConfig{
BloomShardSizeBytes: 100_000,
BloomFP: 0.05,
Version: encoding.DefaultEncoding().Version(),
},
Metrics: MetricsConfig{
ConcurrentBlocks: 10,
TimeOverlapCutoff: 0.2,
},
FilterServerSpans: false,
}

p, err := New(cfg, "fake", wal, &mockOverrides{})
require.NoError(t, err)

tr := test.MakeTrace(10, []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
p.PushSpans(context.TODO(), &tempopb.PushSpansRequest{
Batches: tr.Batches,
})

require.NoError(t, p.cutIdleTraces(true))
verifyReplicationFactor(t, p.headBlock)

require.NoError(t, p.cutBlocks(true))
for _, b := range p.walBlocks {
verifyReplicationFactor(t, b)
}

require.NoError(t, p.completeBlock())
for _, b := range p.completeBlocks {
verifyReplicationFactor(t, b)
}
}

func verifyReplicationFactor(t *testing.T, b common.BackendBlock) {
require.Equal(t, 1, int(b.BlockMeta().ReplicationFactor))
}
7 changes: 6 additions & 1 deletion modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,12 @@ func (i *instance) resetHeadBlock() error {

dedicatedColumns := i.getDedicatedColumns()

newHeadBlock, err := i.writer.WAL().NewBlockWithDedicatedColumns(uuid.New(), i.instanceID, model.CurrentEncoding, dedicatedColumns)
meta := &backend.BlockMeta{
BlockID: uuid.New(),
TenantID: i.instanceID,
DedicatedColumns: dedicatedColumns,
}
newHeadBlock, err := i.writer.WAL().NewBlock(meta, model.CurrentEncoding)
if err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions tempodb/backend/block_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ type BlockMeta struct {
FooterSize uint32 `json:"footerSize"`
// DedicatedColumns configuration for attributes (used by vParquet3)
DedicatedColumns DedicatedColumns `json:"dedicatedColumns,omitempty"`
// ReplicationFactor is the number of times the data written in this block has been replicated.
// It's left unset if replication factor is 3. Default is 0 (RF3).
ReplicationFactor uint32 `json:"replicationFactor,omitempty"`
}

// DedicatedColumn contains the configuration for a single attribute with the given name that should
Expand Down
8 changes: 4 additions & 4 deletions tempodb/compaction_block_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,25 +78,25 @@ func newTimeWindowBlockSelector(blocklist []*backend.BlockMeta, maxCompactionRan
// inside active window.
// Group by compaction level and window.
// Choose lowest compaction level and most recent windows first.
entry.group = fmt.Sprintf("A-%v-%016X", b.CompactionLevel, age)
entry.group = fmt.Sprintf("A-%v-%016X-%v", b.CompactionLevel, age, b.ReplicationFactor)

// Within group choose smallest blocks first.
// update after parquet: we want to make sure blocks of the same version end up together
// update afert vParquet3: we want to make sure blocks of the same dedicated columns end up together
entry.order = fmt.Sprintf("%016X-%v-%016X", entry.meta.TotalObjects, entry.meta.Version, entry.meta.DedicatedColumnsHash())

entry.hash = fmt.Sprintf("%v-%v-%v", b.TenantID, b.CompactionLevel, w)
entry.hash = fmt.Sprintf("%v-%v-%v-%v", b.TenantID, b.CompactionLevel, w, b.ReplicationFactor)
} else {
// outside active window.
// Group by window only. Choose most recent windows first.
entry.group = fmt.Sprintf("B-%016X", age)
entry.group = fmt.Sprintf("B-%016X-%v", age, b.ReplicationFactor)

// Within group chose lowest compaction lvl and smallest blocks first.
// update after parquet: we want to make sure blocks of the same version end up together
// update afert vParquet3: we want to make sure blocks of the same dedicated columns end up together
entry.order = fmt.Sprintf("%v-%016X-%v-%016X", b.CompactionLevel, entry.meta.TotalObjects, entry.meta.Version, entry.meta.DedicatedColumnsHash())

entry.hash = fmt.Sprintf("%v-%v", b.TenantID, w)
entry.hash = fmt.Sprintf("%v-%v-%v", b.TenantID, w, b.ReplicationFactor)
}

twbs.entries = append(twbs.entries, entry)
Expand Down