From 48562af78b92ea43bdd29a378714e59f259f320a Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Wed, 26 Nov 2025 21:08:55 +0100 Subject: [PATCH 1/2] chore(dataobj): Use common base config for indexobj and logsobj builders Use the same config struct for both the indexobj builder and the logsobj builder, since they are configured identically, only with different default values. --- docs/sources/shared/configuration.md | 24 +++-- pkg/dataobj/config/config.go | 3 + pkg/dataobj/consumer/logsobj/builder.go | 51 ++++++++--- pkg/dataobj/consumer/logsobj/builder_test.go | 15 ++-- .../consumer/partition_processor_test.go | 14 +-- pkg/dataobj/index/builder.go | 2 +- pkg/dataobj/index/builder_test.go | 25 +++--- pkg/dataobj/index/calculate_test.go | 14 +-- pkg/dataobj/index/config.go | 30 +++++-- pkg/dataobj/index/indexobj/builder.go | 89 +------------------ pkg/dataobj/index/indexobj/builder_metrics.go | 3 +- pkg/dataobj/index/indexobj/builder_test.go | 3 +- pkg/dataobj/metastore/object_test.go | 16 ++-- pkg/dataobj/metastore/toc_writer.go | 3 +- pkg/dataobj/metastore/toc_writer_test.go | 5 +- .../internal/executor/dataobjscan_test.go | 14 +-- pkg/engine/internal/util/objtest/objtest.go | 4 +- pkg/logql/bench/store_dataobj.go | 17 ++-- tools/dataobj-sort/main.go | 14 +-- 19 files changed, 164 insertions(+), 182 deletions(-) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 925bc72133453..798b01baef905 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -1431,29 +1431,35 @@ dataobj: [topic: | default = ""] index: - # The size of the target page to use for the index object builder. + # The target maximum amount of uncompressed data to hold in data pages (for + # columnar sections). Uncompressed size is used for consistent I/O and + # planning. # CLI flag: -dataobj-index-builder.target-page-size [target_page_size: | default = 128KiB] - # The maximum row count for pages to use for the index builder. A value of 0 - # means no limit. + # The maximum row count for pages to use for the data object builder. A + # value of 0 means no limit. # CLI flag: -dataobj-index-builder.max-page-rows [max_page_rows: | default = 0] - # The size of the target object to use for the index object builder. - # CLI flag: -dataobj-index-builder.target-object-size + # The target maximum size of the encoded object and all of its encoded + # sections (after compression), to limit memory usage of a builder. + # CLI flag: -dataobj-index-builder.target-builder-memory-limit [target_object_size: | default = 64MiB] - # Configures a maximum size for sections, for sections that support it. + # The target maximum amount of uncompressed data to hold in sections, for + # sections that support being limited by size. Uncompressed size is used for + # consistent I/O and planning. # CLI flag: -dataobj-index-builder.target-section-size [target_section_size: | default = 16MiB] - # The size of the buffer to use for sorting logs. + # The size of logs to buffer in memory before adding into columnar builders, + # used to reduce CPU load of sorting. # CLI flag: -dataobj-index-builder.buffer-size [buffer_size: | default = 2MiB] - # The maximum number of stripes to merge into a section at once. Must be - # greater than 1. + # The maximum number of log section stripes to merge into a section at once. + # Must be greater than 1. # CLI flag: -dataobj-index-builder.section-stripe-merge-limit [section_stripe_merge_limit: | default = 2] diff --git a/pkg/dataobj/config/config.go b/pkg/dataobj/config/config.go index ea89a9e86f047..a4231bc4e1083 100644 --- a/pkg/dataobj/config/config.go +++ b/pkg/dataobj/config/config.go @@ -43,6 +43,9 @@ func (cfg *Config) Validate() error { if err := cfg.Consumer.Validate(); err != nil { return err } + if err := cfg.Index.Validate(); err != nil { + return err + } if err := cfg.Metastore.Validate(); err != nil { return err } diff --git a/pkg/dataobj/consumer/logsobj/builder.go b/pkg/dataobj/consumer/logsobj/builder.go index 2ffda9b2be0ab..2cb0de3b8af8a 100644 --- a/pkg/dataobj/consumer/logsobj/builder.go +++ b/pkg/dataobj/consumer/logsobj/builder.go @@ -38,8 +38,8 @@ const ( sortTimestampDESC = "timestamp-desc" ) -// BuilderConfig configures a [Builder]. -type BuilderConfig struct { +// BuilderBaseConfig configures a data object builder. +type BuilderBaseConfig struct { // TargetPageSize configures a target size for encoded pages within the data // object. TargetPageSize accounts for encoding, but not for compression. TargetPageSize flagext.Bytes `yaml:"target_page_size"` @@ -74,30 +74,20 @@ type BuilderConfig struct { // values of MergeSize trade off lower memory overhead for higher time spent // merging. SectionStripeMergeLimit int `yaml:"section_stripe_merge_limit"` - - // DataobjSortOrder defines the order in which the rows of the logs sections are sorted. - // They can either be sorted by [streamID ASC, timestamp DESC] or [timestamp DESC, streamID ASC]. - DataobjSortOrder string `yaml:"dataobj_sort_order" doc:"hidden"` } // RegisterFlagsWithPrefix registers flags with the given prefix. -func (cfg *BuilderConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { - _ = cfg.TargetPageSize.Set("2MB") - _ = cfg.TargetObjectSize.Set("1GB") - _ = cfg.BufferSize.Set("16MB") - _ = cfg.TargetSectionSize.Set("128MB") - +func (cfg *BuilderBaseConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.Var(&cfg.TargetPageSize, prefix+"target-page-size", "The target maximum amount of uncompressed data to hold in data pages (for columnar sections). Uncompressed size is used for consistent I/O and planning.") f.IntVar(&cfg.MaxPageRows, prefix+"max-page-rows", 0, "The maximum row count for pages to use for the data object builder. A value of 0 means no limit.") f.Var(&cfg.TargetObjectSize, prefix+"target-builder-memory-limit", "The target maximum size of the encoded object and all of its encoded sections (after compression), to limit memory usage of a builder.") f.Var(&cfg.TargetSectionSize, prefix+"target-section-size", "The target maximum amount of uncompressed data to hold in sections, for sections that support being limited by size. Uncompressed size is used for consistent I/O and planning.") f.Var(&cfg.BufferSize, prefix+"buffer-size", "The size of logs to buffer in memory before adding into columnar builders, used to reduce CPU load of sorting.") f.IntVar(&cfg.SectionStripeMergeLimit, prefix+"section-stripe-merge-limit", 2, "The maximum number of log section stripes to merge into a section at once. Must be greater than 1.") - f.StringVar(&cfg.DataobjSortOrder, prefix+"dataobj-sort-order", sortStreamASC, "The desired sort order of the logs section. Can either be `stream-asc` (order by streamID ascending and timestamp descending) or `timestamp-desc` (order by timestamp descending and streamID ascending).") } // Validate validates the BuilderConfig. -func (cfg *BuilderConfig) Validate() error { +func (cfg *BuilderBaseConfig) Validate() error { var errs []error if cfg.TargetPageSize <= 0 { @@ -122,9 +112,42 @@ func (cfg *BuilderConfig) Validate() error { errs = append(errs, errors.New("LogsMergeStripesMax must be greater than 1")) } + return errors.Join(errs...) +} + +// BuilderConfig configures a [Builder]. +type BuilderConfig struct { + BuilderBaseConfig `yaml:",inline"` + + // DataobjSortOrder defines the order in which the rows of the logs sections are sorted. + // They can either be sorted by [streamID ASC, timestamp DESC] or [timestamp DESC, streamID ASC]. + DataobjSortOrder string `yaml:"dataobj_sort_order" doc:"hidden"` +} + +// RegisterFlagsWithPrefix registers flags with the given prefix. +func (cfg *BuilderConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + // Set defaults for base builder configuration + _ = cfg.BuilderBaseConfig.TargetPageSize.Set("2MB") + _ = cfg.BuilderBaseConfig.TargetObjectSize.Set("1GB") + _ = cfg.BuilderBaseConfig.BufferSize.Set("16MB") + _ = cfg.BuilderBaseConfig.TargetSectionSize.Set("128MB") + cfg.BuilderBaseConfig.RegisterFlagsWithPrefix(prefix, f) + + f.StringVar(&cfg.DataobjSortOrder, prefix+"dataobj-sort-order", sortStreamASC, "The desired sort order of the logs section. Can either be `stream-asc` (order by streamID ascending and timestamp descending) or `timestamp-desc` (order by timestamp descending and streamID ascending).") +} + +// Validate validates the BuilderConfig. +func (cfg *BuilderConfig) Validate() error { + var errs []error + + if err := cfg.BuilderBaseConfig.Validate(); err != nil { + errs = append(errs, err) + } + if cfg.DataobjSortOrder == "" { cfg.DataobjSortOrder = sortStreamASC // default to [streamID ASC, timestamp DESC] sorting } + if cfg.DataobjSortOrder != sortStreamASC && cfg.DataobjSortOrder != sortTimestampDESC { errs = append(errs, fmt.Errorf("invalid dataobj sort order. must be one of `stream-asc` or `timestamp-desc`, got: %s", cfg.DataobjSortOrder)) } diff --git a/pkg/dataobj/consumer/logsobj/builder_test.go b/pkg/dataobj/consumer/logsobj/builder_test.go index a76ac1e1285cf..3866376fc3ba8 100644 --- a/pkg/dataobj/consumer/logsobj/builder_test.go +++ b/pkg/dataobj/consumer/logsobj/builder_test.go @@ -21,13 +21,14 @@ import ( ) var testBuilderConfig = BuilderConfig{ - TargetPageSize: 2048, - TargetObjectSize: 1 << 20, // 1 MiB - TargetSectionSize: 8 << 10, // 8 KiB - - BufferSize: 2048 * 8, - SectionStripeMergeLimit: 2, - DataobjSortOrder: sortTimestampDESC, + BuilderBaseConfig: BuilderBaseConfig{ + TargetPageSize: 2048, + TargetObjectSize: 1 << 20, // 1 MiB + TargetSectionSize: 8 << 10, // 8 KiB + BufferSize: 2048 * 8, + SectionStripeMergeLimit: 2, + }, + DataobjSortOrder: sortTimestampDESC, } func TestBuilder(t *testing.T) { diff --git a/pkg/dataobj/consumer/partition_processor_test.go b/pkg/dataobj/consumer/partition_processor_test.go index cc0eaf5668e45..d80f7eb8615a8 100644 --- a/pkg/dataobj/consumer/partition_processor_test.go +++ b/pkg/dataobj/consumer/partition_processor_test.go @@ -21,12 +21,14 @@ import ( ) var testBuilderConfig = logsobj.BuilderConfig{ - TargetPageSize: 2048, - MaxPageRows: 10, - TargetObjectSize: 1 << 22, // 4 MiB - TargetSectionSize: 1 << 22, // 4 MiB - BufferSize: 2048 * 8, - SectionStripeMergeLimit: 2, + BuilderBaseConfig: logsobj.BuilderBaseConfig{ + TargetPageSize: 2048, + MaxPageRows: 10, + TargetObjectSize: 1 << 22, // 4 MiB + TargetSectionSize: 1 << 22, // 4 MiB + BufferSize: 2048 * 8, + SectionStripeMergeLimit: 2, + }, } func TestPartitionProcessor_Flush(t *testing.T) { diff --git a/pkg/dataobj/index/builder.go b/pkg/dataobj/index/builder.go index b81bca354cb0e..2cef796de7a95 100644 --- a/pkg/dataobj/index/builder.go +++ b/pkg/dataobj/index/builder.go @@ -135,7 +135,7 @@ func NewIndexBuilder( } // Create index building dependencies - builder, err := indexobj.NewBuilder(cfg.BuilderConfig, scratchStore) + builder, err := indexobj.NewBuilder(cfg.BuilderBaseConfig, scratchStore) if err != nil { return nil, fmt.Errorf("failed to create index builder: %w", err) } diff --git a/pkg/dataobj/index/builder_test.go b/pkg/dataobj/index/builder_test.go index 21dddc40b6746..93d91e3e68658 100644 --- a/pkg/dataobj/index/builder_test.go +++ b/pkg/dataobj/index/builder_test.go @@ -17,7 +17,6 @@ import ( "github.com/grafana/loki/v3/pkg/dataobj" "github.com/grafana/loki/v3/pkg/dataobj/consumer/logsobj" - "github.com/grafana/loki/v3/pkg/dataobj/index/indexobj" "github.com/grafana/loki/v3/pkg/dataobj/metastore" "github.com/grafana/loki/v3/pkg/dataobj/sections/pointers" "github.com/grafana/loki/v3/pkg/kafka" @@ -25,7 +24,7 @@ import ( "github.com/grafana/loki/v3/pkg/logproto" ) -var testBuilderConfig = indexobj.BuilderConfig{ +var testBuilderConfig = logsobj.BuilderBaseConfig{ TargetPageSize: 128 * 1024, TargetObjectSize: 4 * 1024 * 1024, TargetSectionSize: 2 * 1024 * 1024, @@ -53,8 +52,8 @@ func TestIndexBuilder_PartitionRevocation(t *testing.T) { // Create a builder with mocks for dependencies builder, err := NewIndexBuilder( Config{ - BuilderConfig: testBuilderConfig, - EventsPerIndex: 1, + BuilderBaseConfig: testBuilderConfig, + EventsPerIndex: 1, }, metastore.Config{}, kafka.Config{}, @@ -121,8 +120,8 @@ func TestIndexBuilder(t *testing.T) { p, err := NewIndexBuilder( Config{ - BuilderConfig: testBuilderConfig, - EventsPerIndex: 3, + BuilderBaseConfig: testBuilderConfig, + EventsPerIndex: 3, }, metastore.Config{}, kafka.Config{}, @@ -231,13 +230,13 @@ func (m *mockKafkaClient) Close() {} func buildLogObject(t *testing.T, app string, path string, bucket objstore.Bucket) { candidate, err := logsobj.NewBuilder(logsobj.BuilderConfig{ - TargetPageSize: 128 * 1024, - TargetObjectSize: 4 * 1024 * 1024, - TargetSectionSize: 2 * 1024 * 1024, - - BufferSize: 4 * 1024 * 1024, - SectionStripeMergeLimit: 2, - + BuilderBaseConfig: logsobj.BuilderBaseConfig{ + TargetPageSize: 128 * 1024, + TargetObjectSize: 4 * 1024 * 1024, + TargetSectionSize: 2 * 1024 * 1024, + BufferSize: 4 * 1024 * 1024, + SectionStripeMergeLimit: 2, + }, DataobjSortOrder: "stream-asc", }, nil) require.NoError(t, err) diff --git a/pkg/dataobj/index/calculate_test.go b/pkg/dataobj/index/calculate_test.go index 6f99d4a16fd15..108cc0397e26b 100644 --- a/pkg/dataobj/index/calculate_test.go +++ b/pkg/dataobj/index/calculate_test.go @@ -24,7 +24,7 @@ import ( "github.com/grafana/loki/pkg/push" ) -var testCalculatorConfig = indexobj.BuilderConfig{ +var testCalculatorConfig = logsobj.BuilderBaseConfig{ TargetPageSize: 2048, TargetObjectSize: 1 << 22, // 4 MiB BufferSize: 2048 * 8, @@ -39,11 +39,13 @@ func createTestLogObject(t *testing.T, tenants int) *dataobj.Object { t.Helper() builder, err := logsobj.NewBuilder(logsobj.BuilderConfig{ - TargetPageSize: 2048, - TargetObjectSize: 1 << 22, - TargetSectionSize: 1 << 21, - BufferSize: 2048 * 8, - SectionStripeMergeLimit: 2, + BuilderBaseConfig: logsobj.BuilderBaseConfig{ + TargetPageSize: 2048, + TargetObjectSize: 1 << 22, + TargetSectionSize: 1 << 21, + BufferSize: 2048 * 8, + SectionStripeMergeLimit: 2, + }, }, nil) require.NoError(t, err) diff --git a/pkg/dataobj/index/config.go b/pkg/dataobj/index/config.go index abf7a00f189ae..592a3959ddf68 100644 --- a/pkg/dataobj/index/config.go +++ b/pkg/dataobj/index/config.go @@ -1,17 +1,18 @@ package index import ( + "errors" "flag" "time" - "github.com/grafana/loki/v3/pkg/dataobj/index/indexobj" + "github.com/grafana/loki/v3/pkg/dataobj/consumer/logsobj" ) type Config struct { - indexobj.BuilderConfig `yaml:",inline"` - EventsPerIndex int `yaml:"events_per_index" experimental:"true"` - FlushInterval time.Duration `yaml:"flush_interval" experimental:"true"` - MaxIdleTime time.Duration `yaml:"max_idle_time" experimental:"true"` + logsobj.BuilderBaseConfig `yaml:",inline"` + EventsPerIndex int `yaml:"events_per_index" experimental:"true"` + FlushInterval time.Duration `yaml:"flush_interval" experimental:"true"` + MaxIdleTime time.Duration `yaml:"max_idle_time" experimental:"true"` } func (cfg *Config) RegisterFlags(f *flag.FlagSet) { @@ -19,8 +20,25 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { } func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { - cfg.BuilderConfig.RegisterFlagsWithPrefix(prefix, f) + // Set defaults for base builder configuration + _ = cfg.BuilderBaseConfig.TargetPageSize.Set("128KB") + _ = cfg.BuilderBaseConfig.TargetObjectSize.Set("64MB") + _ = cfg.BuilderBaseConfig.BufferSize.Set("2MB") + _ = cfg.BuilderBaseConfig.TargetSectionSize.Set("16MB") + cfg.BuilderBaseConfig.RegisterFlagsWithPrefix(prefix, f) + f.IntVar(&cfg.EventsPerIndex, prefix+"events-per-index", 32, "Experimental: The number of events to batch before building an index") f.DurationVar(&cfg.FlushInterval, prefix+"flush-interval", 1*time.Minute, "Experimental: How often to check for stale partitions to flush") f.DurationVar(&cfg.MaxIdleTime, prefix+"max-idle-time", 30*time.Minute, "Experimental: Maximum time to wait before flushing buffered events") } + +// Validate validates the BuilderConfig. +func (cfg *Config) Validate() error { + var errs []error + + if err := cfg.BuilderBaseConfig.Validate(); err != nil { + errs = append(errs, err) + } + + return errors.Join(errs...) +} diff --git a/pkg/dataobj/index/indexobj/builder.go b/pkg/dataobj/index/indexobj/builder.go index 09daefbb6cc33..892cf772ef504 100644 --- a/pkg/dataobj/index/indexobj/builder.go +++ b/pkg/dataobj/index/indexobj/builder.go @@ -4,17 +4,16 @@ package indexobj import ( "context" "errors" - "flag" "fmt" "io" "time" - "github.com/grafana/dskit/flagext" lru "github.com/hashicorp/golang-lru/v2" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" "github.com/grafana/loki/v3/pkg/dataobj" + "github.com/grafana/loki/v3/pkg/dataobj/consumer/logsobj" "github.com/grafana/loki/v3/pkg/dataobj/metastore/multitenancy" "github.com/grafana/loki/v3/pkg/dataobj/sections/indexpointers" "github.com/grafana/loki/v3/pkg/dataobj/sections/pointers" @@ -29,88 +28,6 @@ var ( ErrBuilderEmpty = errors.New("builder empty") ) -// BuilderConfig configures a [Builder]. -type BuilderConfig struct { - // TargetPageSize configures a target size for encoded pages within the data - // object. TargetPageSize accounts for encoding, but not for compression. - TargetPageSize flagext.Bytes `yaml:"target_page_size"` - - // MaxPageRows configures a maximum row count for encoded pages within the data - // object. If set to 0 or negative number, the page size will not be limited by a - // row count. - MaxPageRows int `yaml:"max_page_rows"` - - // TODO(rfratto): We need an additional parameter for TargetMetadataSize, as - // metadata payloads can't be split and must be downloaded in a single - // request. - // - // At the moment, we don't have a good mechanism for implementing a metadata - // size limit (we need to support some form of section splitting or column - // combinations), so the option is omitted for now. - - // TargetObjectSize configures a target size for data objects. - TargetObjectSize flagext.Bytes `yaml:"target_object_size"` - - // TargetSectionSize configures the maximum size of data in a section. Sections - // which support this parameter will place overflow data into new sections of - // the same type. - TargetSectionSize flagext.Bytes `yaml:"target_section_size"` - - // BufferSize configures the size of the buffer used to accumulate - // uncompressed logs in memory prior to sorting. - BufferSize flagext.Bytes `yaml:"buffer_size"` - - // SectionStripeMergeLimit configures the number of stripes to merge at once when - // flushing stripes into a section. MergeSize must be larger than 1. Lower - // values of MergeSize trade off lower memory overhead for higher time spent - // merging. - SectionStripeMergeLimit int `yaml:"section_stripe_merge_limit"` -} - -// RegisterFlagsWithPrefix registers flags with the given prefix. -func (cfg *BuilderConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { - _ = cfg.TargetPageSize.Set("128KB") - _ = cfg.TargetObjectSize.Set("64MB") - _ = cfg.BufferSize.Set("2MB") - _ = cfg.TargetSectionSize.Set("16MB") - - f.Var(&cfg.TargetPageSize, prefix+"target-page-size", "The size of the target page to use for the index object builder.") - f.IntVar(&cfg.MaxPageRows, prefix+"max-page-rows", 0, "The maximum row count for pages to use for the index builder. A value of 0 means no limit.") - f.Var(&cfg.TargetObjectSize, prefix+"target-object-size", "The size of the target object to use for the index object builder.") - f.Var(&cfg.TargetSectionSize, prefix+"target-section-size", "Configures a maximum size for sections, for sections that support it.") - f.Var(&cfg.BufferSize, prefix+"buffer-size", "The size of the buffer to use for sorting logs.") - f.IntVar(&cfg.SectionStripeMergeLimit, prefix+"section-stripe-merge-limit", 2, "The maximum number of stripes to merge into a section at once. Must be greater than 1.") -} - -// Validate validates the BuilderConfig. -func (cfg *BuilderConfig) Validate() error { - var errs []error - - if cfg.TargetPageSize <= 0 { - errs = append(errs, errors.New("TargetPageSize must be greater than 0")) - } else if cfg.TargetPageSize >= cfg.TargetObjectSize { - errs = append(errs, errors.New("TargetPageSize must be less than TargetObjectSize")) - } - - if cfg.TargetObjectSize <= 0 { - errs = append(errs, errors.New("TargetObjectSize must be greater than 0")) - } - - if cfg.BufferSize <= 0 { - errs = append(errs, errors.New("BufferSize must be greater than 0")) - } - - if cfg.TargetSectionSize <= 0 || cfg.TargetSectionSize > cfg.TargetObjectSize { - errs = append(errs, errors.New("SectionSize must be greater than 0 and less than or equal to TargetObjectSize")) - } - - if cfg.SectionStripeMergeLimit < 2 { - errs = append(errs, errors.New("LogsMergeStripesMax must be greater than 1")) - } - - return errors.Join(errs...) -} - // A Builder constructs a logs-oriented data object from a set of incoming // log data. Log data is appended by calling [LogBuilder.Append]. A complete // data object is constructed by by calling [LogBuilder.Flush]. @@ -118,7 +35,7 @@ func (cfg *BuilderConfig) Validate() error { // Methods on Builder are not goroutine-safe; callers are responsible for // synchronization. type Builder struct { - cfg BuilderConfig + cfg logsobj.BuilderBaseConfig metrics *builderMetrics labelCache *lru.Cache[string, labels.Labels] @@ -147,7 +64,7 @@ const ( // NewBuilder creates a new [Builder] which stores log-oriented data objects. // // NewBuilder returns an error if the provided config is invalid. -func NewBuilder(cfg BuilderConfig, scratchStore scratch.Store) (*Builder, error) { +func NewBuilder(cfg logsobj.BuilderBaseConfig, scratchStore scratch.Store) (*Builder, error) { if err := cfg.Validate(); err != nil { return nil, err } diff --git a/pkg/dataobj/index/indexobj/builder_metrics.go b/pkg/dataobj/index/indexobj/builder_metrics.go index 349363d99d3d3..b875d3979d220 100644 --- a/pkg/dataobj/index/indexobj/builder_metrics.go +++ b/pkg/dataobj/index/indexobj/builder_metrics.go @@ -6,6 +6,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/grafana/loki/v3/pkg/dataobj" + "github.com/grafana/loki/v3/pkg/dataobj/consumer/logsobj" "github.com/grafana/loki/v3/pkg/dataobj/sections/indexpointers" "github.com/grafana/loki/v3/pkg/dataobj/sections/pointers" "github.com/grafana/loki/v3/pkg/dataobj/sections/streams" @@ -116,7 +117,7 @@ func newBuilderMetrics() *builderMetrics { } // ObserveConfig updates config metrics based on the provided [BuilderConfig]. -func (m *builderMetrics) ObserveConfig(cfg BuilderConfig) { +func (m *builderMetrics) ObserveConfig(cfg logsobj.BuilderBaseConfig) { m.targetPageSize.Set(float64(cfg.TargetPageSize)) m.targetObjectSize.Set(float64(cfg.TargetObjectSize)) } diff --git a/pkg/dataobj/index/indexobj/builder_test.go b/pkg/dataobj/index/indexobj/builder_test.go index 8271171e2d7f4..737aeb384a9bb 100644 --- a/pkg/dataobj/index/indexobj/builder_test.go +++ b/pkg/dataobj/index/indexobj/builder_test.go @@ -9,13 +9,14 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" + "github.com/grafana/loki/v3/pkg/dataobj/consumer/logsobj" "github.com/grafana/loki/v3/pkg/dataobj/sections/indexpointers" "github.com/grafana/loki/v3/pkg/dataobj/sections/logs" "github.com/grafana/loki/v3/pkg/dataobj/sections/pointers" "github.com/grafana/loki/v3/pkg/dataobj/sections/streams" ) -var testBuilderConfig = BuilderConfig{ +var testBuilderConfig = logsobj.BuilderBaseConfig{ TargetPageSize: 2048, TargetObjectSize: 1 << 22, // 4 MiB TargetSectionSize: 1 << 21, // 2 MiB diff --git a/pkg/dataobj/metastore/object_test.go b/pkg/dataobj/metastore/object_test.go index 3413fd710964d..fada6796c3f74 100644 --- a/pkg/dataobj/metastore/object_test.go +++ b/pkg/dataobj/metastore/object_test.go @@ -216,7 +216,7 @@ func TestValuesEmptyMatcher(t *testing.T) { func TestSectionsForStreamMatchers(t *testing.T) { ctx := user.InjectOrgID(context.Background(), tenantID) - builder, err := indexobj.NewBuilder(indexobj.BuilderConfig{ + builder, err := indexobj.NewBuilder(logsobj.BuilderBaseConfig{ TargetPageSize: 1024 * 1024, TargetObjectSize: 10 * 1024 * 1024, TargetSectionSize: 128, @@ -363,7 +363,7 @@ func TestSectionsForStreamMatchers(t *testing.T) { func TestSectionsForPredicateMatchers(t *testing.T) { ctx := user.InjectOrgID(context.Background(), tenantID) - builder, err := indexobj.NewBuilder(indexobj.BuilderConfig{ + builder, err := indexobj.NewBuilder(logsobj.BuilderBaseConfig{ TargetPageSize: 1024 * 1024, TargetObjectSize: 10 * 1024 * 1024, TargetSectionSize: 128, @@ -503,11 +503,13 @@ func newTestDataBuilder(t *testing.T) *testDataBuilder { bucket := objstore.NewInMemBucket() builder, err := logsobj.NewBuilder(logsobj.BuilderConfig{ - TargetPageSize: 1024 * 1024, // 1MB - TargetObjectSize: 10 * 1024 * 1024, // 10MB - TargetSectionSize: 1024 * 1024, // 1MB - BufferSize: 1024 * 1024, // 1MB - SectionStripeMergeLimit: 2, + BuilderBaseConfig: logsobj.BuilderBaseConfig{ + TargetPageSize: 1024 * 1024, // 1MB + TargetObjectSize: 10 * 1024 * 1024, // 10MB + TargetSectionSize: 1024 * 1024, // 1MB + BufferSize: 1024 * 1024, // 1MB + SectionStripeMergeLimit: 2, + }, }, nil) require.NoError(t, err) diff --git a/pkg/dataobj/metastore/toc_writer.go b/pkg/dataobj/metastore/toc_writer.go index 34aabf1f1119e..20024b2c628a0 100644 --- a/pkg/dataobj/metastore/toc_writer.go +++ b/pkg/dataobj/metastore/toc_writer.go @@ -16,13 +16,14 @@ import ( "github.com/thanos-io/objstore" "github.com/grafana/loki/v3/pkg/dataobj" + "github.com/grafana/loki/v3/pkg/dataobj/consumer/logsobj" "github.com/grafana/loki/v3/pkg/dataobj/index/indexobj" "github.com/grafana/loki/v3/pkg/dataobj/metastore/multitenancy" "github.com/grafana/loki/v3/pkg/dataobj/sections/indexpointers" ) // Define our own builder config for the Table Of Contents object because they are smaller than logs objects. -var tocBuilderCfg = indexobj.BuilderConfig{ +var tocBuilderCfg = logsobj.BuilderBaseConfig{ TargetObjectSize: 32 * 1024 * 1024, TargetPageSize: 4 * 1024 * 1024, BufferSize: 32 * 1024 * 1024, // 8x page size diff --git a/pkg/dataobj/metastore/toc_writer_test.go b/pkg/dataobj/metastore/toc_writer_test.go index df658527e85d9..4da4dfb0e1f6b 100644 --- a/pkg/dataobj/metastore/toc_writer_test.go +++ b/pkg/dataobj/metastore/toc_writer_test.go @@ -14,6 +14,7 @@ import ( "github.com/thanos-io/objstore" "github.com/grafana/loki/v3/pkg/dataobj" + "github.com/grafana/loki/v3/pkg/dataobj/consumer/logsobj" "github.com/grafana/loki/v3/pkg/dataobj/index/indexobj" "github.com/grafana/loki/v3/pkg/dataobj/metastore/multitenancy" ) @@ -21,7 +22,7 @@ import ( func TestTableOfContentsWriter(t *testing.T) { t.Run("append new top-level object to new metastore", func(t *testing.T) { tenantID := "test" - tocBuilder, err := indexobj.NewBuilder(indexobj.BuilderConfig{ + tocBuilder, err := indexobj.NewBuilder(logsobj.BuilderBaseConfig{ TargetPageSize: tocBuilderCfg.TargetPageSize, TargetObjectSize: tocBuilderCfg.TargetObjectSize, TargetSectionSize: tocBuilderCfg.TargetSectionSize, @@ -53,7 +54,7 @@ func TestTableOfContentsWriter(t *testing.T) { t.Run("append default to new top-level metastore v1", func(t *testing.T) { tenantID := "test" - builder, err := indexobj.NewBuilder(indexobj.BuilderConfig{ + builder, err := indexobj.NewBuilder(logsobj.BuilderBaseConfig{ TargetPageSize: tocBuilderCfg.TargetPageSize, TargetObjectSize: tocBuilderCfg.TargetObjectSize, TargetSectionSize: tocBuilderCfg.TargetSectionSize, diff --git a/pkg/engine/internal/executor/dataobjscan_test.go b/pkg/engine/internal/executor/dataobjscan_test.go index d78572c72ba34..8192fd1aa27e8 100644 --- a/pkg/engine/internal/executor/dataobjscan_test.go +++ b/pkg/engine/internal/executor/dataobjscan_test.go @@ -333,12 +333,14 @@ func buildDataobj(t testing.TB, streams []logproto.Stream) *dataobj.Object { t.Helper() builder, err := logsobj.NewBuilder(logsobj.BuilderConfig{ - TargetPageSize: 8_000, - TargetObjectSize: math.MaxInt, - TargetSectionSize: 32_000, - BufferSize: 8_000, - SectionStripeMergeLimit: 2, - DataobjSortOrder: "timestamp-desc", + BuilderBaseConfig: logsobj.BuilderBaseConfig{ + TargetPageSize: 8_000, + TargetObjectSize: math.MaxInt, + TargetSectionSize: 32_000, + BufferSize: 8_000, + SectionStripeMergeLimit: 2, + }, + DataobjSortOrder: "timestamp-desc", }, nil) require.NoError(t, err) diff --git a/pkg/engine/internal/util/objtest/objtest.go b/pkg/engine/internal/util/objtest/objtest.go index 51d30f4acc439..9dd140a7ede49 100644 --- a/pkg/engine/internal/util/objtest/objtest.go +++ b/pkg/engine/internal/util/objtest/objtest.go @@ -134,9 +134,9 @@ func (b *Builder) Close() { } func (b *Builder) buildIndex(ctx context.Context) error { - var builderConfig indexobj.BuilderConfig + var builderConfig logsobj.BuilderConfig builderConfig.RegisterFlagsWithPrefix("", flag.NewFlagSet("", flag.PanicOnError)) // Acquire defaults - indexBuilder, err := indexobj.NewBuilder(builderConfig, nil) + indexBuilder, err := indexobj.NewBuilder(builderConfig.BuilderBaseConfig, nil) if err != nil { return fmt.Errorf("creating logs builder: %w", err) } diff --git a/pkg/logql/bench/store_dataobj.go b/pkg/logql/bench/store_dataobj.go index 00cca8f7a84bd..ad5d168ff96ec 100644 --- a/pkg/logql/bench/store_dataobj.go +++ b/pkg/logql/bench/store_dataobj.go @@ -74,13 +74,14 @@ func NewDataObjStore(dir, tenant string) (*DataObjStore, error) { } builder, err := logsobj.NewBuilder(logsobj.BuilderConfig{ - TargetPageSize: 2 * 1024 * 1024, // 2MB - MaxPageRows: 1000, - TargetObjectSize: 128 * 1024 * 1024, // 128MB - TargetSectionSize: 16 * 1024 * 1024, // 16MB - BufferSize: 16 * 1024 * 1024, // 16MB - - SectionStripeMergeLimit: 2, + BuilderBaseConfig: logsobj.BuilderBaseConfig{ + TargetPageSize: 2 * 1024 * 1024, // 2MB + MaxPageRows: 1000, + TargetObjectSize: 128 * 1024 * 1024, // 128MB + TargetSectionSize: 16 * 1024 * 1024, // 16MB + BufferSize: 16 * 1024 * 1024, // 16MB + SectionStripeMergeLimit: 2, + }, }, nil) if err != nil { return nil, fmt.Errorf("failed to create builder: %w", err) @@ -204,7 +205,7 @@ func (s *DataObjStore) buildIndex() error { return nil } - builder, err := indexobj.NewBuilder(indexobj.BuilderConfig{ + builder, err := indexobj.NewBuilder(logsobj.BuilderBaseConfig{ TargetPageSize: 128 * 1024, // 128KB TargetObjectSize: 128 * 1024 * 1024, // 128MB TargetSectionSize: 16 * 1024 * 1024, // 16MB diff --git a/tools/dataobj-sort/main.go b/tools/dataobj-sort/main.go index b16da0078fef1..8bfa41d01bff4 100644 --- a/tools/dataobj-sort/main.go +++ b/tools/dataobj-sort/main.go @@ -40,12 +40,14 @@ func main() { } cfg := logsobj.BuilderConfig{ - TargetPageSize: 64 << 10, - MaxPageRows: 1000, - TargetObjectSize: 512 << 20, - TargetSectionSize: 512 << 20, - BufferSize: 16 << 20, - SectionStripeMergeLimit: 8, + BuilderBaseConfig: logsobj.BuilderBaseConfig{ + TargetPageSize: 64 << 10, + MaxPageRows: 1000, + TargetObjectSize: 512 << 20, + TargetSectionSize: 512 << 20, + BufferSize: 16 << 20, + SectionStripeMergeLimit: 8, + }, } scr, err := scratch.NewFilesystem(gokitlog.NewNopLogger(), os.TempDir()) if err != nil { From 92b6fe0d2c90e4efa2fff98ddc3884e6ef0a1027 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Fri, 28 Nov 2025 08:34:19 +0100 Subject: [PATCH 2/2] fixup: Comments from code review --- docs/sources/shared/configuration.md | 6 +++--- pkg/dataobj/consumer/logsobj/builder.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 798b01baef905..57e72dcd5c227 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -1211,7 +1211,7 @@ dataobj: # CLI flag: -dataobj-consumer.buffer-size [buffer_size: | default = 16MiB] - # The maximum number of log section stripes to merge into a section at + # The maximum number of dataobj section stripes to merge into a section at # once. Must be greater than 1. # CLI flag: -dataobj-consumer.section-stripe-merge-limit [section_stripe_merge_limit: | default = 2] @@ -1458,8 +1458,8 @@ dataobj: # CLI flag: -dataobj-index-builder.buffer-size [buffer_size: | default = 2MiB] - # The maximum number of log section stripes to merge into a section at once. - # Must be greater than 1. + # The maximum number of dataobj section stripes to merge into a section at + # once. Must be greater than 1. # CLI flag: -dataobj-index-builder.section-stripe-merge-limit [section_stripe_merge_limit: | default = 2] diff --git a/pkg/dataobj/consumer/logsobj/builder.go b/pkg/dataobj/consumer/logsobj/builder.go index 2cb0de3b8af8a..6bf3d38c21982 100644 --- a/pkg/dataobj/consumer/logsobj/builder.go +++ b/pkg/dataobj/consumer/logsobj/builder.go @@ -83,7 +83,7 @@ func (cfg *BuilderBaseConfig) RegisterFlagsWithPrefix(prefix string, f *flag.Fla f.Var(&cfg.TargetObjectSize, prefix+"target-builder-memory-limit", "The target maximum size of the encoded object and all of its encoded sections (after compression), to limit memory usage of a builder.") f.Var(&cfg.TargetSectionSize, prefix+"target-section-size", "The target maximum amount of uncompressed data to hold in sections, for sections that support being limited by size. Uncompressed size is used for consistent I/O and planning.") f.Var(&cfg.BufferSize, prefix+"buffer-size", "The size of logs to buffer in memory before adding into columnar builders, used to reduce CPU load of sorting.") - f.IntVar(&cfg.SectionStripeMergeLimit, prefix+"section-stripe-merge-limit", 2, "The maximum number of log section stripes to merge into a section at once. Must be greater than 1.") + f.IntVar(&cfg.SectionStripeMergeLimit, prefix+"section-stripe-merge-limit", 2, "The maximum number of dataobj section stripes to merge into a section at once. Must be greater than 1.") } // Validate validates the BuilderConfig.