diff --git a/pkg/dataobj/index/builder_test.go b/pkg/dataobj/index/builder_test.go index f75936015ad1b..c2ce8509361f0 100644 --- a/pkg/dataobj/index/builder_test.go +++ b/pkg/dataobj/index/builder_test.go @@ -77,12 +77,14 @@ func TestIndexBuilder_PartitionRevocation(t *testing.T) { "loki.metastore-events": {0, 1, 2}, }) + revokedProcessed := make(chan struct{}, 1) trigger := make(chan struct{}) go func() { <-trigger builder.handlePartitionsRevoked(ctx, nil, map[string][]int32{ "loki.metastore-events": {1}, }) + revokedProcessed <- struct{}{} }() // Trigger the revocation of a partition, but only after we've processed a couple of records. @@ -102,6 +104,7 @@ func TestIndexBuilder_PartitionRevocation(t *testing.T) { } // Verify that the partition was revoked. + <-revokedProcessed require.Equal(t, 2, len(builder.partitionStates)) require.Nil(t, builder.partitionStates[1]) } diff --git a/pkg/dataobj/index/calculate.go b/pkg/dataobj/index/calculate.go index 89809e341e95d..97a1733b3c5b4 100644 --- a/pkg/dataobj/index/calculate.go +++ b/pkg/dataobj/index/calculate.go @@ -7,12 +7,9 @@ import ( "io" "runtime" "sync" - "time" - "github.com/bits-and-blooms/bloom/v3" "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/prometheus/prometheus/model/labels" "golang.org/x/sync/errgroup" "github.com/grafana/loki/v3/pkg/dataobj" @@ -22,6 +19,33 @@ import ( "github.com/grafana/loki/v3/pkg/dataobj/sections/streams" ) +type logsIndexCalculation interface { + // Prepare is called before the first batch of logs is processed in order to initialize any state. + Prepare(ctx context.Context, section *dataobj.Section, stats logs.Stats) error + // ProcessBatch is called for each batch of logs records. + // Implementations can assume to have exclusive access to the builder via the calculation context. They must not retain references to it after the call returns. + ProcessBatch(ctx context.Context, context *logsCalculationContext, batch []logs.Record) error + // Flush is called after all logs in a section have been processed. + // Implementations can assume to have exclusive access to the builder via the calculation context. They must not retain references to it after the call returns. + Flush(ctx context.Context, context *logsCalculationContext) error +} + +type logsCalculationContext struct { + tenantID string + objectPath string + sectionIdx int64 + streamIDLookup map[int64]int64 + builder *indexobj.Builder +} + +// These steps are applied to all logs and are unique to a section +func getLogsCalculationSteps() []logsIndexCalculation { + return []logsIndexCalculation{ + &streamStatisticsCalculation{}, + &columnValuesCalculation{}, + } +} + // Calculator is used to calculate the indexes for a logs object and write them to the builder. // It reads data from the logs object in order to build bloom filters and per-section stream metadata. type Calculator struct { @@ -138,14 +162,6 @@ func (c *Calculator) processStreamsSection(ctx context.Context, section *dataobj // processLogsSection reads information from the logs section in order to build index information in the c.indexobjBuilder. func (c *Calculator) processLogsSection(ctx context.Context, sectionLogger log.Logger, objectPath string, section *dataobj.Section, sectionIdx int64, streamIDLookup map[int64]int64) error { logsBuf := make([]logs.Record, 8192) - type logInfo struct { - objectPath string - sectionIdx int64 - streamID int64 - timestamp time.Time - length int64 - } - logsInfo := make([]logInfo, len(logsBuf)) logsSection, err := logs.Open(ctx, section) if err != nil { @@ -160,22 +176,24 @@ func (c *Calculator) processLogsSection(ctx context.Context, sectionLogger log.L return fmt.Errorf("failed to read log section stats: %w", err) } - columnBloomBuilders := make(map[string]*bloom.BloomFilter) - columnIndexes := make(map[string]int64) - for _, column := range stats.Columns { - logsType, _ := logs.ParseColumnType(column.Type) - if logsType != logs.ColumnTypeMetadata { - continue + calculationContext := &logsCalculationContext{ + tenantID: tenantID, + objectPath: objectPath, + sectionIdx: sectionIdx, + streamIDLookup: streamIDLookup, + builder: c.indexobjBuilder, + } + + calculationSteps := getLogsCalculationSteps() + + for _, calculation := range calculationSteps { + if err := calculation.Prepare(ctx, section, stats); err != nil { + return fmt.Errorf("failed to prepare calculation: %w", err) } - columnBloomBuilders[column.Name] = bloom.NewWithEstimates(uint(column.Cardinality), 1.0/128.0) - columnIndexes[column.Name] = column.ColumnIndex } - // Read the whole logs section to extract all the column values. - cnt := 0 // TODO(benclive): Switch to a columnar reader instead of row based - // This is also likely to be more performant, especially if we don't need to read the whole log line. - // Note: the source object would need a new column storing just the length to avoid reading the log line itself. + cnt := 0 rowReader := logs.NewRowReader(logsSection) for { n, err := rowReader.Read(ctx, logsBuf) @@ -186,43 +204,25 @@ func (c *Calculator) processLogsSection(ctx context.Context, sectionLogger log.L break } - for i, log := range logsBuf[:n] { - cnt++ - log.Metadata.Range(func(md labels.Label) { - columnBloomBuilders[md.Name].Add([]byte(md.Value)) - }) - logsInfo[i].objectPath = objectPath - logsInfo[i].sectionIdx = sectionIdx - logsInfo[i].streamID = log.StreamID - logsInfo[i].timestamp = log.Timestamp - logsInfo[i].length = int64(len(log.Line)) - } - - // Lock the mutex once per read for perf reasons. + cnt += n c.builderMtx.Lock() - for _, log := range logsInfo[:n] { - err = c.indexobjBuilder.ObserveLogLine(tenantID, log.objectPath, log.sectionIdx, log.streamID, streamIDLookup[log.streamID], log.timestamp, log.length) - if err != nil { + for _, calculation := range calculationSteps { + if err := calculation.ProcessBatch(ctx, calculationContext, logsBuf[:n]); err != nil { c.builderMtx.Unlock() - return fmt.Errorf("failed to observe log line: %w", err) + return fmt.Errorf("failed to process batch: %w", err) } } c.builderMtx.Unlock() } - // Write the indexes (bloom filters) to the new index object. - for columnName, bloom := range columnBloomBuilders { - bloomBytes, err := bloom.MarshalBinary() - if err != nil { - return fmt.Errorf("failed to marshal bloom filter: %w", err) - } - c.builderMtx.Lock() - err = c.indexobjBuilder.AppendColumnIndex(tenantID, objectPath, sectionIdx, columnName, columnIndexes[columnName], bloomBytes) - c.builderMtx.Unlock() - if err != nil { - return fmt.Errorf("failed to append column index: %w", err) + c.builderMtx.Lock() + for _, calculation := range calculationSteps { + if err := calculation.Flush(ctx, calculationContext); err != nil { + c.builderMtx.Unlock() + return fmt.Errorf("failed to flush calculation results: %w", err) } } + c.builderMtx.Unlock() level.Info(sectionLogger).Log("msg", "finished processing logs section", "rowsProcessed", cnt) return nil diff --git a/pkg/dataobj/index/column_values.go b/pkg/dataobj/index/column_values.go new file mode 100644 index 0000000000000..e3fda403e6ec5 --- /dev/null +++ b/pkg/dataobj/index/column_values.go @@ -0,0 +1,55 @@ +package index + +import ( + "context" + "fmt" + + "github.com/bits-and-blooms/bloom/v3" + "github.com/prometheus/prometheus/model/labels" + + "github.com/grafana/loki/v3/pkg/dataobj" + "github.com/grafana/loki/v3/pkg/dataobj/sections/logs" +) + +type columnValuesCalculation struct { + columnBloomBuilders map[string]*bloom.BloomFilter + columnIndexes map[string]int64 +} + +func (c *columnValuesCalculation) Prepare(_ context.Context, _ *dataobj.Section, stats logs.Stats) error { + c.columnBloomBuilders = make(map[string]*bloom.BloomFilter) + c.columnIndexes = make(map[string]int64) + + for _, column := range stats.Columns { + logsType, _ := logs.ParseColumnType(column.Type) + if logsType != logs.ColumnTypeMetadata { + continue + } + c.columnBloomBuilders[column.Name] = bloom.NewWithEstimates(uint(column.Cardinality), 1.0/128.0) + c.columnIndexes[column.Name] = column.ColumnIndex + } + return nil +} + +func (c *columnValuesCalculation) ProcessBatch(_ context.Context, _ *logsCalculationContext, batch []logs.Record) error { + for _, log := range batch { + log.Metadata.Range(func(md labels.Label) { + c.columnBloomBuilders[md.Name].Add([]byte(md.Value)) + }) + } + return nil +} + +func (c *columnValuesCalculation) Flush(_ context.Context, context *logsCalculationContext) error { + for columnName, bloom := range c.columnBloomBuilders { + bloomBytes, err := bloom.MarshalBinary() + if err != nil { + return fmt.Errorf("failed to marshal bloom filter: %w", err) + } + err = context.builder.AppendColumnIndex(context.tenantID, context.objectPath, context.sectionIdx, columnName, c.columnIndexes[columnName], bloomBytes) + if err != nil { + return fmt.Errorf("failed to append column index: %w", err) + } + } + return nil +} diff --git a/pkg/dataobj/index/stream_statistics.go b/pkg/dataobj/index/stream_statistics.go new file mode 100644 index 0000000000000..346e673b424cd --- /dev/null +++ b/pkg/dataobj/index/stream_statistics.go @@ -0,0 +1,29 @@ +package index + +import ( + "context" + "fmt" + + "github.com/grafana/loki/v3/pkg/dataobj" + "github.com/grafana/loki/v3/pkg/dataobj/sections/logs" +) + +type streamStatisticsCalculation struct{} + +func (c *streamStatisticsCalculation) Prepare(_ context.Context, _ *dataobj.Section, _ logs.Stats) error { + return nil +} + +func (c *streamStatisticsCalculation) ProcessBatch(_ context.Context, context *logsCalculationContext, batch []logs.Record) error { + for _, log := range batch { + err := context.builder.ObserveLogLine(context.tenantID, context.objectPath, context.sectionIdx, log.StreamID, context.streamIDLookup[log.StreamID], log.Timestamp, int64(len(log.Line))) + if err != nil { + return fmt.Errorf("failed to observe log line: %w", err) + } + } + return nil +} + +func (c *streamStatisticsCalculation) Flush(_ context.Context, _ *logsCalculationContext) error { + return nil +}