Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/dataobj/index/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,14 @@ func TestIndexBuilder_PartitionRevocation(t *testing.T) {
"loki.metastore-events": {0, 1, 2},
})

revokedProcessed := make(chan struct{}, 1)
trigger := make(chan struct{})
go func() {
<-trigger
builder.handlePartitionsRevoked(ctx, nil, map[string][]int32{
"loki.metastore-events": {1},
})
revokedProcessed <- struct{}{}
}()

// Trigger the revocation of a partition, but only after we've processed a couple of records.
Expand All @@ -102,6 +104,7 @@ func TestIndexBuilder_PartitionRevocation(t *testing.T) {
}

// Verify that the partition was revoked.
<-revokedProcessed
require.Equal(t, 2, len(builder.partitionStates))
require.Nil(t, builder.partitionStates[1])
}
Expand Down
102 changes: 51 additions & 51 deletions pkg/dataobj/index/calculate.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,9 @@ import (
"io"
"runtime"
"sync"
"time"

"github.com/bits-and-blooms/bloom/v3"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/prometheus/model/labels"
"golang.org/x/sync/errgroup"

"github.com/grafana/loki/v3/pkg/dataobj"
Expand All @@ -22,6 +19,33 @@ import (
"github.com/grafana/loki/v3/pkg/dataobj/sections/streams"
)

type logsIndexCalculation interface {
// Prepare is called before the first batch of logs is processed in order to initialize any state.
Prepare(ctx context.Context, section *dataobj.Section, stats logs.Stats) error
// ProcessBatch is called for each batch of logs records.
// Implementations can assume to have exclusive access to the builder via the calculation context. They must not retain references to it after the call returns.
ProcessBatch(ctx context.Context, context *logsCalculationContext, batch []logs.Record) error
// Flush is called after all logs in a section have been processed.
// Implementations can assume to have exclusive access to the builder via the calculation context. They must not retain references to it after the call returns.
Flush(ctx context.Context, context *logsCalculationContext) error
}

type logsCalculationContext struct {
tenantID string
objectPath string
sectionIdx int64
streamIDLookup map[int64]int64
builder *indexobj.Builder
}

// These steps are applied to all logs and are unique to a section
func getLogsCalculationSteps() []logsIndexCalculation {
return []logsIndexCalculation{
&streamStatisticsCalculation{},
&columnValuesCalculation{},
}
}

// Calculator is used to calculate the indexes for a logs object and write them to the builder.
// It reads data from the logs object in order to build bloom filters and per-section stream metadata.
type Calculator struct {
Expand Down Expand Up @@ -138,14 +162,6 @@ func (c *Calculator) processStreamsSection(ctx context.Context, section *dataobj
// processLogsSection reads information from the logs section in order to build index information in the c.indexobjBuilder.
func (c *Calculator) processLogsSection(ctx context.Context, sectionLogger log.Logger, objectPath string, section *dataobj.Section, sectionIdx int64, streamIDLookup map[int64]int64) error {
logsBuf := make([]logs.Record, 8192)
type logInfo struct {
objectPath string
sectionIdx int64
streamID int64
timestamp time.Time
length int64
}
logsInfo := make([]logInfo, len(logsBuf))

logsSection, err := logs.Open(ctx, section)
if err != nil {
Expand All @@ -160,22 +176,24 @@ func (c *Calculator) processLogsSection(ctx context.Context, sectionLogger log.L
return fmt.Errorf("failed to read log section stats: %w", err)
}

columnBloomBuilders := make(map[string]*bloom.BloomFilter)
columnIndexes := make(map[string]int64)
for _, column := range stats.Columns {
logsType, _ := logs.ParseColumnType(column.Type)
if logsType != logs.ColumnTypeMetadata {
continue
calculationContext := &logsCalculationContext{
tenantID: tenantID,
objectPath: objectPath,
sectionIdx: sectionIdx,
streamIDLookup: streamIDLookup,
builder: c.indexobjBuilder,
}

calculationSteps := getLogsCalculationSteps()

for _, calculation := range calculationSteps {
if err := calculation.Prepare(ctx, section, stats); err != nil {
return fmt.Errorf("failed to prepare calculation: %w", err)
}
columnBloomBuilders[column.Name] = bloom.NewWithEstimates(uint(column.Cardinality), 1.0/128.0)
columnIndexes[column.Name] = column.ColumnIndex
}

// Read the whole logs section to extract all the column values.
cnt := 0
// TODO(benclive): Switch to a columnar reader instead of row based
// This is also likely to be more performant, especially if we don't need to read the whole log line.
// Note: the source object would need a new column storing just the length to avoid reading the log line itself.
cnt := 0
rowReader := logs.NewRowReader(logsSection)
for {
n, err := rowReader.Read(ctx, logsBuf)
Expand All @@ -186,43 +204,25 @@ func (c *Calculator) processLogsSection(ctx context.Context, sectionLogger log.L
break
}

for i, log := range logsBuf[:n] {
cnt++
log.Metadata.Range(func(md labels.Label) {
columnBloomBuilders[md.Name].Add([]byte(md.Value))
})
logsInfo[i].objectPath = objectPath
logsInfo[i].sectionIdx = sectionIdx
logsInfo[i].streamID = log.StreamID
logsInfo[i].timestamp = log.Timestamp
logsInfo[i].length = int64(len(log.Line))
}

// Lock the mutex once per read for perf reasons.
cnt += n
c.builderMtx.Lock()
for _, log := range logsInfo[:n] {
err = c.indexobjBuilder.ObserveLogLine(tenantID, log.objectPath, log.sectionIdx, log.streamID, streamIDLookup[log.streamID], log.timestamp, log.length)
if err != nil {
for _, calculation := range calculationSteps {
if err := calculation.ProcessBatch(ctx, calculationContext, logsBuf[:n]); err != nil {
c.builderMtx.Unlock()
return fmt.Errorf("failed to observe log line: %w", err)
return fmt.Errorf("failed to process batch: %w", err)
}
}
c.builderMtx.Unlock()
}

// Write the indexes (bloom filters) to the new index object.
for columnName, bloom := range columnBloomBuilders {
bloomBytes, err := bloom.MarshalBinary()
if err != nil {
return fmt.Errorf("failed to marshal bloom filter: %w", err)
}
c.builderMtx.Lock()
err = c.indexobjBuilder.AppendColumnIndex(tenantID, objectPath, sectionIdx, columnName, columnIndexes[columnName], bloomBytes)
c.builderMtx.Unlock()
if err != nil {
return fmt.Errorf("failed to append column index: %w", err)
c.builderMtx.Lock()
for _, calculation := range calculationSteps {
if err := calculation.Flush(ctx, calculationContext); err != nil {
c.builderMtx.Unlock()
return fmt.Errorf("failed to flush calculation results: %w", err)
}
}
c.builderMtx.Unlock()

level.Info(sectionLogger).Log("msg", "finished processing logs section", "rowsProcessed", cnt)
return nil
Expand Down
55 changes: 55 additions & 0 deletions pkg/dataobj/index/column_values.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package index

import (
"context"
"fmt"

"github.com/bits-and-blooms/bloom/v3"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/sections/logs"
)

type columnValuesCalculation struct {
columnBloomBuilders map[string]*bloom.BloomFilter
columnIndexes map[string]int64
}

func (c *columnValuesCalculation) Prepare(_ context.Context, _ *dataobj.Section, stats logs.Stats) error {
c.columnBloomBuilders = make(map[string]*bloom.BloomFilter)
c.columnIndexes = make(map[string]int64)

for _, column := range stats.Columns {
logsType, _ := logs.ParseColumnType(column.Type)
if logsType != logs.ColumnTypeMetadata {
continue
}
c.columnBloomBuilders[column.Name] = bloom.NewWithEstimates(uint(column.Cardinality), 1.0/128.0)
c.columnIndexes[column.Name] = column.ColumnIndex
}
return nil
}

func (c *columnValuesCalculation) ProcessBatch(_ context.Context, _ *logsCalculationContext, batch []logs.Record) error {
for _, log := range batch {
log.Metadata.Range(func(md labels.Label) {
c.columnBloomBuilders[md.Name].Add([]byte(md.Value))
})
}
return nil
}

func (c *columnValuesCalculation) Flush(_ context.Context, context *logsCalculationContext) error {
for columnName, bloom := range c.columnBloomBuilders {
bloomBytes, err := bloom.MarshalBinary()
if err != nil {
return fmt.Errorf("failed to marshal bloom filter: %w", err)
}
err = context.builder.AppendColumnIndex(context.tenantID, context.objectPath, context.sectionIdx, columnName, c.columnIndexes[columnName], bloomBytes)
if err != nil {
return fmt.Errorf("failed to append column index: %w", err)
}
}
return nil
}
29 changes: 29 additions & 0 deletions pkg/dataobj/index/stream_statistics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package index

import (
"context"
"fmt"

"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/sections/logs"
)

type streamStatisticsCalculation struct{}

func (c *streamStatisticsCalculation) Prepare(_ context.Context, _ *dataobj.Section, _ logs.Stats) error {
return nil
}

func (c *streamStatisticsCalculation) ProcessBatch(_ context.Context, context *logsCalculationContext, batch []logs.Record) error {
for _, log := range batch {
err := context.builder.ObserveLogLine(context.tenantID, context.objectPath, context.sectionIdx, log.StreamID, context.streamIDLookup[log.StreamID], log.Timestamp, int64(len(log.Line)))
if err != nil {
return fmt.Errorf("failed to observe log line: %w", err)
}
}
return nil
}

func (c *streamStatisticsCalculation) Flush(_ context.Context, _ *logsCalculationContext) error {
return nil
}
Loading