Skip to content

Commit

Permalink
Fix loki build
Browse files Browse the repository at this point in the history
Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>
  • Loading branch information
kavirajk committed Aug 13, 2023
1 parent c3037de commit c904bf9
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 6 deletions.
28 changes: 26 additions & 2 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,14 @@ func (i *instance) createStream(pushReqStream logproto.Stream, record *wal.Recor
fp := i.getHashForLabels(labels)

sortedLabels := i.index.Add(logproto.FromLabelsToLabelAdapters(labels), fp)
s := newStream(i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics, i.writeFailures)

// NOTE(kavi): Hack
chunkFormat, err := i.LiveChunkFormat()
if err != nil {
return nil, err
}

s := newStream(chunkFormat, i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics, i.writeFailures)

// record will be nil when replaying the wal (we don't want to rewrite wal entries as we replay them).
if record != nil {
Expand Down Expand Up @@ -317,7 +324,8 @@ func (i *instance) createStream(pushReqStream logproto.Stream, record *wal.Recor

func (i *instance) createStreamByFP(ls labels.Labels, fp model.Fingerprint) *stream {
sortedLabels := i.index.Add(logproto.FromLabelsToLabelAdapters(ls), fp)
s := newStream(i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics, i.writeFailures)
ck, _ := i.LiveChunkFormat()
s := newStream(ck, i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics, i.writeFailures)

i.streamsCreatedTotal.Inc()
memoryStreams.WithLabelValues(i.instanceID).Inc()
Expand All @@ -327,6 +335,22 @@ func (i *instance) createStreamByFP(ls labels.Labels, fp model.Fingerprint) *str
return s
}

func (i *instance) LiveChunkFormat() (byte, error) {
// NOTE(kavi): Hack
periodConfig, err := i.schemaconfig.SchemaForTime(model.Now())
if err != nil {
return 0, err
}

chunkFormat, err := periodConfig.ChunkVersion()
if err != nil {
return 0, err
}

return chunkFormat, nil

}

// getOrCreateStream returns the stream or creates it.
// It's safe to use this function if returned stream is not consistency sensitive to streamsMap(e.g. ingesterRecoverer),
// otherwise use streamsMap.LoadOrStoreNew with locking stream's chunkMtx inside.
Expand Down
8 changes: 6 additions & 2 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,10 +292,12 @@ func setupTestStreams(t *testing.T) (*instance, time.Time, int) {
{Labels: "{app=\"test\",job=\"varlogs2\"}", Entries: entries(5, currentTime.Add(12*time.Nanosecond))},
}

ck, _ := instance.LiveChunkFormat()

for _, testStream := range testStreams {
stream, err := instance.getOrCreateStream(testStream, recordPool.GetRecord())
require.NoError(t, err)
chunk := newStream(cfg, limiter, "fake", 0, nil, true, NewStreamRateCalculator(), NilMetrics, nil).NewChunk()
chunk := newStream(ck, cfg, limiter, "fake", 0, nil, true, NewStreamRateCalculator(), NilMetrics, nil).NewChunk()
for _, entry := range testStream.Entries {
err = chunk.Append(&entry)
require.NoError(t, err)
Expand Down Expand Up @@ -546,9 +548,11 @@ func Benchmark_instance_addNewTailer(b *testing.B) {
}
})
lbs := makeRandomLabels()

ck, _ := inst.LiveChunkFormat()
b.Run("addTailersToNewStream", func(b *testing.B) {
for n := 0; n < b.N; n++ {
inst.addTailersToNewStream(newStream(nil, limiter, "fake", 0, lbs, true, NewStreamRateCalculator(), NilMetrics, nil))
inst.addTailersToNewStream(newStream(ck, nil, limiter, "fake", 0, lbs, true, NewStreamRateCalculator(), NilMetrics, nil))
}
})
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/ingester/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ type stream struct {
streamRateCalculator *StreamRateCalculator

writeFailures *writefailures.Manager

// NOTE(kavi): Hack
chunkFormat byte
}

type chunkDesc struct {
Expand All @@ -91,7 +94,7 @@ type entryWithError struct {
e error
}

func newStream(cfg *Config, limits RateLimiterStrategy, tenant string, fp model.Fingerprint, labels labels.Labels, unorderedWrites bool, streamRateCalculator *StreamRateCalculator, metrics *ingesterMetrics, writeFailures *writefailures.Manager) *stream {
func newStream(chunkFormat byte, cfg *Config, limits RateLimiterStrategy, tenant string, fp model.Fingerprint, labels labels.Labels, unorderedWrites bool, streamRateCalculator *StreamRateCalculator, metrics *ingesterMetrics, writeFailures *writefailures.Manager) *stream {
hashNoShard, _ := labels.HashWithoutLabels(make([]byte, 0, 1024), ShardLbName)
return &stream{
limiter: NewStreamRateLimiter(limits, tenant, 10*time.Second),
Expand All @@ -108,6 +111,7 @@ func newStream(cfg *Config, limits RateLimiterStrategy, tenant string, fp model.

unorderedWrites: unorderedWrites,
writeFailures: writeFailures,
chunkFormat: chunkFormat,
}
}

Expand Down Expand Up @@ -145,7 +149,7 @@ func (s *stream) setChunks(chunks []Chunk) (bytesAdded, entriesAdded int, err er
}

func (s *stream) NewChunk() *chunkenc.MemChunk {
return chunkenc.NewMemChunk(s.cfg.parsedEncoding, headBlockType(s.unorderedWrites), s.cfg.BlockSize, s.cfg.TargetChunkSize)
return chunkenc.NewMemChunk(s.chunkFormat, s.cfg.parsedEncoding, headBlockType(s.unorderedWrites), s.cfg.BlockSize, s.cfg.TargetChunkSize)
}

func (s *stream) Push(
Expand Down

0 comments on commit c904bf9

Please sign in to comment.