diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 925bc72133453..57e72dcd5c227 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -1211,7 +1211,7 @@ dataobj: # CLI flag: -dataobj-consumer.buffer-size [buffer_size: | default = 16MiB] - # The maximum number of log section stripes to merge into a section at + # The maximum number of dataobj section stripes to merge into a section at # once. Must be greater than 1. # CLI flag: -dataobj-consumer.section-stripe-merge-limit [section_stripe_merge_limit: | default = 2] @@ -1431,29 +1431,35 @@ dataobj: [topic: | default = ""] index: - # The size of the target page to use for the index object builder. + # The target maximum amount of uncompressed data to hold in data pages (for + # columnar sections). Uncompressed size is used for consistent I/O and + # planning. # CLI flag: -dataobj-index-builder.target-page-size [target_page_size: | default = 128KiB] - # The maximum row count for pages to use for the index builder. A value of 0 - # means no limit. + # The maximum row count for pages to use for the data object builder. A + # value of 0 means no limit. # CLI flag: -dataobj-index-builder.max-page-rows [max_page_rows: | default = 0] - # The size of the target object to use for the index object builder. - # CLI flag: -dataobj-index-builder.target-object-size + # The target maximum size of the encoded object and all of its encoded + # sections (after compression), to limit memory usage of a builder. + # CLI flag: -dataobj-index-builder.target-builder-memory-limit [target_object_size: | default = 64MiB] - # Configures a maximum size for sections, for sections that support it. + # The target maximum amount of uncompressed data to hold in sections, for + # sections that support being limited by size. Uncompressed size is used for + # consistent I/O and planning. # CLI flag: -dataobj-index-builder.target-section-size [target_section_size: | default = 16MiB] - # The size of the buffer to use for sorting logs. + # The size of logs to buffer in memory before adding into columnar builders, + # used to reduce CPU load of sorting. # CLI flag: -dataobj-index-builder.buffer-size [buffer_size: | default = 2MiB] - # The maximum number of stripes to merge into a section at once. Must be - # greater than 1. + # The maximum number of dataobj section stripes to merge into a section at + # once. Must be greater than 1. # CLI flag: -dataobj-index-builder.section-stripe-merge-limit [section_stripe_merge_limit: | default = 2] diff --git a/pkg/dataobj/config/config.go b/pkg/dataobj/config/config.go index ea89a9e86f047..a4231bc4e1083 100644 --- a/pkg/dataobj/config/config.go +++ b/pkg/dataobj/config/config.go @@ -43,6 +43,9 @@ func (cfg *Config) Validate() error { if err := cfg.Consumer.Validate(); err != nil { return err } + if err := cfg.Index.Validate(); err != nil { + return err + } if err := cfg.Metastore.Validate(); err != nil { return err } diff --git a/pkg/dataobj/consumer/logsobj/builder.go b/pkg/dataobj/consumer/logsobj/builder.go index 2ffda9b2be0ab..6bf3d38c21982 100644 --- a/pkg/dataobj/consumer/logsobj/builder.go +++ b/pkg/dataobj/consumer/logsobj/builder.go @@ -38,8 +38,8 @@ const ( sortTimestampDESC = "timestamp-desc" ) -// BuilderConfig configures a [Builder]. -type BuilderConfig struct { +// BuilderBaseConfig configures a data object builder. +type BuilderBaseConfig struct { // TargetPageSize configures a target size for encoded pages within the data // object. TargetPageSize accounts for encoding, but not for compression. TargetPageSize flagext.Bytes `yaml:"target_page_size"` @@ -74,30 +74,20 @@ type BuilderConfig struct { // values of MergeSize trade off lower memory overhead for higher time spent // merging. SectionStripeMergeLimit int `yaml:"section_stripe_merge_limit"` - - // DataobjSortOrder defines the order in which the rows of the logs sections are sorted. - // They can either be sorted by [streamID ASC, timestamp DESC] or [timestamp DESC, streamID ASC]. - DataobjSortOrder string `yaml:"dataobj_sort_order" doc:"hidden"` } // RegisterFlagsWithPrefix registers flags with the given prefix. -func (cfg *BuilderConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { - _ = cfg.TargetPageSize.Set("2MB") - _ = cfg.TargetObjectSize.Set("1GB") - _ = cfg.BufferSize.Set("16MB") - _ = cfg.TargetSectionSize.Set("128MB") - +func (cfg *BuilderBaseConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.Var(&cfg.TargetPageSize, prefix+"target-page-size", "The target maximum amount of uncompressed data to hold in data pages (for columnar sections). Uncompressed size is used for consistent I/O and planning.") f.IntVar(&cfg.MaxPageRows, prefix+"max-page-rows", 0, "The maximum row count for pages to use for the data object builder. A value of 0 means no limit.") f.Var(&cfg.TargetObjectSize, prefix+"target-builder-memory-limit", "The target maximum size of the encoded object and all of its encoded sections (after compression), to limit memory usage of a builder.") f.Var(&cfg.TargetSectionSize, prefix+"target-section-size", "The target maximum amount of uncompressed data to hold in sections, for sections that support being limited by size. Uncompressed size is used for consistent I/O and planning.") f.Var(&cfg.BufferSize, prefix+"buffer-size", "The size of logs to buffer in memory before adding into columnar builders, used to reduce CPU load of sorting.") - f.IntVar(&cfg.SectionStripeMergeLimit, prefix+"section-stripe-merge-limit", 2, "The maximum number of log section stripes to merge into a section at once. Must be greater than 1.") - f.StringVar(&cfg.DataobjSortOrder, prefix+"dataobj-sort-order", sortStreamASC, "The desired sort order of the logs section. Can either be `stream-asc` (order by streamID ascending and timestamp descending) or `timestamp-desc` (order by timestamp descending and streamID ascending).") + f.IntVar(&cfg.SectionStripeMergeLimit, prefix+"section-stripe-merge-limit", 2, "The maximum number of dataobj section stripes to merge into a section at once. Must be greater than 1.") } // Validate validates the BuilderConfig. -func (cfg *BuilderConfig) Validate() error { +func (cfg *BuilderBaseConfig) Validate() error { var errs []error if cfg.TargetPageSize <= 0 { @@ -122,9 +112,42 @@ func (cfg *BuilderConfig) Validate() error { errs = append(errs, errors.New("LogsMergeStripesMax must be greater than 1")) } + return errors.Join(errs...) +} + +// BuilderConfig configures a [Builder]. +type BuilderConfig struct { + BuilderBaseConfig `yaml:",inline"` + + // DataobjSortOrder defines the order in which the rows of the logs sections are sorted. + // They can either be sorted by [streamID ASC, timestamp DESC] or [timestamp DESC, streamID ASC]. + DataobjSortOrder string `yaml:"dataobj_sort_order" doc:"hidden"` +} + +// RegisterFlagsWithPrefix registers flags with the given prefix. +func (cfg *BuilderConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + // Set defaults for base builder configuration + _ = cfg.BuilderBaseConfig.TargetPageSize.Set("2MB") + _ = cfg.BuilderBaseConfig.TargetObjectSize.Set("1GB") + _ = cfg.BuilderBaseConfig.BufferSize.Set("16MB") + _ = cfg.BuilderBaseConfig.TargetSectionSize.Set("128MB") + cfg.BuilderBaseConfig.RegisterFlagsWithPrefix(prefix, f) + + f.StringVar(&cfg.DataobjSortOrder, prefix+"dataobj-sort-order", sortStreamASC, "The desired sort order of the logs section. Can either be `stream-asc` (order by streamID ascending and timestamp descending) or `timestamp-desc` (order by timestamp descending and streamID ascending).") +} + +// Validate validates the BuilderConfig. +func (cfg *BuilderConfig) Validate() error { + var errs []error + + if err := cfg.BuilderBaseConfig.Validate(); err != nil { + errs = append(errs, err) + } + if cfg.DataobjSortOrder == "" { cfg.DataobjSortOrder = sortStreamASC // default to [streamID ASC, timestamp DESC] sorting } + if cfg.DataobjSortOrder != sortStreamASC && cfg.DataobjSortOrder != sortTimestampDESC { errs = append(errs, fmt.Errorf("invalid dataobj sort order. must be one of `stream-asc` or `timestamp-desc`, got: %s", cfg.DataobjSortOrder)) } diff --git a/pkg/dataobj/consumer/logsobj/builder_test.go b/pkg/dataobj/consumer/logsobj/builder_test.go index a76ac1e1285cf..3866376fc3ba8 100644 --- a/pkg/dataobj/consumer/logsobj/builder_test.go +++ b/pkg/dataobj/consumer/logsobj/builder_test.go @@ -21,13 +21,14 @@ import ( ) var testBuilderConfig = BuilderConfig{ - TargetPageSize: 2048, - TargetObjectSize: 1 << 20, // 1 MiB - TargetSectionSize: 8 << 10, // 8 KiB - - BufferSize: 2048 * 8, - SectionStripeMergeLimit: 2, - DataobjSortOrder: sortTimestampDESC, + BuilderBaseConfig: BuilderBaseConfig{ + TargetPageSize: 2048, + TargetObjectSize: 1 << 20, // 1 MiB + TargetSectionSize: 8 << 10, // 8 KiB + BufferSize: 2048 * 8, + SectionStripeMergeLimit: 2, + }, + DataobjSortOrder: sortTimestampDESC, } func TestBuilder(t *testing.T) { diff --git a/pkg/dataobj/consumer/partition_processor_test.go b/pkg/dataobj/consumer/partition_processor_test.go index cc0eaf5668e45..d80f7eb8615a8 100644 --- a/pkg/dataobj/consumer/partition_processor_test.go +++ b/pkg/dataobj/consumer/partition_processor_test.go @@ -21,12 +21,14 @@ import ( ) var testBuilderConfig = logsobj.BuilderConfig{ - TargetPageSize: 2048, - MaxPageRows: 10, - TargetObjectSize: 1 << 22, // 4 MiB - TargetSectionSize: 1 << 22, // 4 MiB - BufferSize: 2048 * 8, - SectionStripeMergeLimit: 2, + BuilderBaseConfig: logsobj.BuilderBaseConfig{ + TargetPageSize: 2048, + MaxPageRows: 10, + TargetObjectSize: 1 << 22, // 4 MiB + TargetSectionSize: 1 << 22, // 4 MiB + BufferSize: 2048 * 8, + SectionStripeMergeLimit: 2, + }, } func TestPartitionProcessor_Flush(t *testing.T) { diff --git a/pkg/dataobj/index/builder.go b/pkg/dataobj/index/builder.go index b81bca354cb0e..2cef796de7a95 100644 --- a/pkg/dataobj/index/builder.go +++ b/pkg/dataobj/index/builder.go @@ -135,7 +135,7 @@ func NewIndexBuilder( } // Create index building dependencies - builder, err := indexobj.NewBuilder(cfg.BuilderConfig, scratchStore) + builder, err := indexobj.NewBuilder(cfg.BuilderBaseConfig, scratchStore) if err != nil { return nil, fmt.Errorf("failed to create index builder: %w", err) } diff --git a/pkg/dataobj/index/builder_test.go b/pkg/dataobj/index/builder_test.go index 21dddc40b6746..93d91e3e68658 100644 --- a/pkg/dataobj/index/builder_test.go +++ b/pkg/dataobj/index/builder_test.go @@ -17,7 +17,6 @@ import ( "github.com/grafana/loki/v3/pkg/dataobj" "github.com/grafana/loki/v3/pkg/dataobj/consumer/logsobj" - "github.com/grafana/loki/v3/pkg/dataobj/index/indexobj" "github.com/grafana/loki/v3/pkg/dataobj/metastore" "github.com/grafana/loki/v3/pkg/dataobj/sections/pointers" "github.com/grafana/loki/v3/pkg/kafka" @@ -25,7 +24,7 @@ import ( "github.com/grafana/loki/v3/pkg/logproto" ) -var testBuilderConfig = indexobj.BuilderConfig{ +var testBuilderConfig = logsobj.BuilderBaseConfig{ TargetPageSize: 128 * 1024, TargetObjectSize: 4 * 1024 * 1024, TargetSectionSize: 2 * 1024 * 1024, @@ -53,8 +52,8 @@ func TestIndexBuilder_PartitionRevocation(t *testing.T) { // Create a builder with mocks for dependencies builder, err := NewIndexBuilder( Config{ - BuilderConfig: testBuilderConfig, - EventsPerIndex: 1, + BuilderBaseConfig: testBuilderConfig, + EventsPerIndex: 1, }, metastore.Config{}, kafka.Config{}, @@ -121,8 +120,8 @@ func TestIndexBuilder(t *testing.T) { p, err := NewIndexBuilder( Config{ - BuilderConfig: testBuilderConfig, - EventsPerIndex: 3, + BuilderBaseConfig: testBuilderConfig, + EventsPerIndex: 3, }, metastore.Config{}, kafka.Config{}, @@ -231,13 +230,13 @@ func (m *mockKafkaClient) Close() {} func buildLogObject(t *testing.T, app string, path string, bucket objstore.Bucket) { candidate, err := logsobj.NewBuilder(logsobj.BuilderConfig{ - TargetPageSize: 128 * 1024, - TargetObjectSize: 4 * 1024 * 1024, - TargetSectionSize: 2 * 1024 * 1024, - - BufferSize: 4 * 1024 * 1024, - SectionStripeMergeLimit: 2, - + BuilderBaseConfig: logsobj.BuilderBaseConfig{ + TargetPageSize: 128 * 1024, + TargetObjectSize: 4 * 1024 * 1024, + TargetSectionSize: 2 * 1024 * 1024, + BufferSize: 4 * 1024 * 1024, + SectionStripeMergeLimit: 2, + }, DataobjSortOrder: "stream-asc", }, nil) require.NoError(t, err) diff --git a/pkg/dataobj/index/calculate_test.go b/pkg/dataobj/index/calculate_test.go index 6f99d4a16fd15..108cc0397e26b 100644 --- a/pkg/dataobj/index/calculate_test.go +++ b/pkg/dataobj/index/calculate_test.go @@ -24,7 +24,7 @@ import ( "github.com/grafana/loki/pkg/push" ) -var testCalculatorConfig = indexobj.BuilderConfig{ +var testCalculatorConfig = logsobj.BuilderBaseConfig{ TargetPageSize: 2048, TargetObjectSize: 1 << 22, // 4 MiB BufferSize: 2048 * 8, @@ -39,11 +39,13 @@ func createTestLogObject(t *testing.T, tenants int) *dataobj.Object { t.Helper() builder, err := logsobj.NewBuilder(logsobj.BuilderConfig{ - TargetPageSize: 2048, - TargetObjectSize: 1 << 22, - TargetSectionSize: 1 << 21, - BufferSize: 2048 * 8, - SectionStripeMergeLimit: 2, + BuilderBaseConfig: logsobj.BuilderBaseConfig{ + TargetPageSize: 2048, + TargetObjectSize: 1 << 22, + TargetSectionSize: 1 << 21, + BufferSize: 2048 * 8, + SectionStripeMergeLimit: 2, + }, }, nil) require.NoError(t, err) diff --git a/pkg/dataobj/index/config.go b/pkg/dataobj/index/config.go index abf7a00f189ae..592a3959ddf68 100644 --- a/pkg/dataobj/index/config.go +++ b/pkg/dataobj/index/config.go @@ -1,17 +1,18 @@ package index import ( + "errors" "flag" "time" - "github.com/grafana/loki/v3/pkg/dataobj/index/indexobj" + "github.com/grafana/loki/v3/pkg/dataobj/consumer/logsobj" ) type Config struct { - indexobj.BuilderConfig `yaml:",inline"` - EventsPerIndex int `yaml:"events_per_index" experimental:"true"` - FlushInterval time.Duration `yaml:"flush_interval" experimental:"true"` - MaxIdleTime time.Duration `yaml:"max_idle_time" experimental:"true"` + logsobj.BuilderBaseConfig `yaml:",inline"` + EventsPerIndex int `yaml:"events_per_index" experimental:"true"` + FlushInterval time.Duration `yaml:"flush_interval" experimental:"true"` + MaxIdleTime time.Duration `yaml:"max_idle_time" experimental:"true"` } func (cfg *Config) RegisterFlags(f *flag.FlagSet) { @@ -19,8 +20,25 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { } func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { - cfg.BuilderConfig.RegisterFlagsWithPrefix(prefix, f) + // Set defaults for base builder configuration + _ = cfg.BuilderBaseConfig.TargetPageSize.Set("128KB") + _ = cfg.BuilderBaseConfig.TargetObjectSize.Set("64MB") + _ = cfg.BuilderBaseConfig.BufferSize.Set("2MB") + _ = cfg.BuilderBaseConfig.TargetSectionSize.Set("16MB") + cfg.BuilderBaseConfig.RegisterFlagsWithPrefix(prefix, f) + f.IntVar(&cfg.EventsPerIndex, prefix+"events-per-index", 32, "Experimental: The number of events to batch before building an index") f.DurationVar(&cfg.FlushInterval, prefix+"flush-interval", 1*time.Minute, "Experimental: How often to check for stale partitions to flush") f.DurationVar(&cfg.MaxIdleTime, prefix+"max-idle-time", 30*time.Minute, "Experimental: Maximum time to wait before flushing buffered events") } + +// Validate validates the BuilderConfig. +func (cfg *Config) Validate() error { + var errs []error + + if err := cfg.BuilderBaseConfig.Validate(); err != nil { + errs = append(errs, err) + } + + return errors.Join(errs...) +} diff --git a/pkg/dataobj/index/indexobj/builder.go b/pkg/dataobj/index/indexobj/builder.go index 09daefbb6cc33..892cf772ef504 100644 --- a/pkg/dataobj/index/indexobj/builder.go +++ b/pkg/dataobj/index/indexobj/builder.go @@ -4,17 +4,16 @@ package indexobj import ( "context" "errors" - "flag" "fmt" "io" "time" - "github.com/grafana/dskit/flagext" lru "github.com/hashicorp/golang-lru/v2" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" "github.com/grafana/loki/v3/pkg/dataobj" + "github.com/grafana/loki/v3/pkg/dataobj/consumer/logsobj" "github.com/grafana/loki/v3/pkg/dataobj/metastore/multitenancy" "github.com/grafana/loki/v3/pkg/dataobj/sections/indexpointers" "github.com/grafana/loki/v3/pkg/dataobj/sections/pointers" @@ -29,88 +28,6 @@ var ( ErrBuilderEmpty = errors.New("builder empty") ) -// BuilderConfig configures a [Builder]. -type BuilderConfig struct { - // TargetPageSize configures a target size for encoded pages within the data - // object. TargetPageSize accounts for encoding, but not for compression. - TargetPageSize flagext.Bytes `yaml:"target_page_size"` - - // MaxPageRows configures a maximum row count for encoded pages within the data - // object. If set to 0 or negative number, the page size will not be limited by a - // row count. - MaxPageRows int `yaml:"max_page_rows"` - - // TODO(rfratto): We need an additional parameter for TargetMetadataSize, as - // metadata payloads can't be split and must be downloaded in a single - // request. - // - // At the moment, we don't have a good mechanism for implementing a metadata - // size limit (we need to support some form of section splitting or column - // combinations), so the option is omitted for now. - - // TargetObjectSize configures a target size for data objects. - TargetObjectSize flagext.Bytes `yaml:"target_object_size"` - - // TargetSectionSize configures the maximum size of data in a section. Sections - // which support this parameter will place overflow data into new sections of - // the same type. - TargetSectionSize flagext.Bytes `yaml:"target_section_size"` - - // BufferSize configures the size of the buffer used to accumulate - // uncompressed logs in memory prior to sorting. - BufferSize flagext.Bytes `yaml:"buffer_size"` - - // SectionStripeMergeLimit configures the number of stripes to merge at once when - // flushing stripes into a section. MergeSize must be larger than 1. Lower - // values of MergeSize trade off lower memory overhead for higher time spent - // merging. - SectionStripeMergeLimit int `yaml:"section_stripe_merge_limit"` -} - -// RegisterFlagsWithPrefix registers flags with the given prefix. -func (cfg *BuilderConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { - _ = cfg.TargetPageSize.Set("128KB") - _ = cfg.TargetObjectSize.Set("64MB") - _ = cfg.BufferSize.Set("2MB") - _ = cfg.TargetSectionSize.Set("16MB") - - f.Var(&cfg.TargetPageSize, prefix+"target-page-size", "The size of the target page to use for the index object builder.") - f.IntVar(&cfg.MaxPageRows, prefix+"max-page-rows", 0, "The maximum row count for pages to use for the index builder. A value of 0 means no limit.") - f.Var(&cfg.TargetObjectSize, prefix+"target-object-size", "The size of the target object to use for the index object builder.") - f.Var(&cfg.TargetSectionSize, prefix+"target-section-size", "Configures a maximum size for sections, for sections that support it.") - f.Var(&cfg.BufferSize, prefix+"buffer-size", "The size of the buffer to use for sorting logs.") - f.IntVar(&cfg.SectionStripeMergeLimit, prefix+"section-stripe-merge-limit", 2, "The maximum number of stripes to merge into a section at once. Must be greater than 1.") -} - -// Validate validates the BuilderConfig. -func (cfg *BuilderConfig) Validate() error { - var errs []error - - if cfg.TargetPageSize <= 0 { - errs = append(errs, errors.New("TargetPageSize must be greater than 0")) - } else if cfg.TargetPageSize >= cfg.TargetObjectSize { - errs = append(errs, errors.New("TargetPageSize must be less than TargetObjectSize")) - } - - if cfg.TargetObjectSize <= 0 { - errs = append(errs, errors.New("TargetObjectSize must be greater than 0")) - } - - if cfg.BufferSize <= 0 { - errs = append(errs, errors.New("BufferSize must be greater than 0")) - } - - if cfg.TargetSectionSize <= 0 || cfg.TargetSectionSize > cfg.TargetObjectSize { - errs = append(errs, errors.New("SectionSize must be greater than 0 and less than or equal to TargetObjectSize")) - } - - if cfg.SectionStripeMergeLimit < 2 { - errs = append(errs, errors.New("LogsMergeStripesMax must be greater than 1")) - } - - return errors.Join(errs...) -} - // A Builder constructs a logs-oriented data object from a set of incoming // log data. Log data is appended by calling [LogBuilder.Append]. A complete // data object is constructed by by calling [LogBuilder.Flush]. @@ -118,7 +35,7 @@ func (cfg *BuilderConfig) Validate() error { // Methods on Builder are not goroutine-safe; callers are responsible for // synchronization. type Builder struct { - cfg BuilderConfig + cfg logsobj.BuilderBaseConfig metrics *builderMetrics labelCache *lru.Cache[string, labels.Labels] @@ -147,7 +64,7 @@ const ( // NewBuilder creates a new [Builder] which stores log-oriented data objects. // // NewBuilder returns an error if the provided config is invalid. -func NewBuilder(cfg BuilderConfig, scratchStore scratch.Store) (*Builder, error) { +func NewBuilder(cfg logsobj.BuilderBaseConfig, scratchStore scratch.Store) (*Builder, error) { if err := cfg.Validate(); err != nil { return nil, err } diff --git a/pkg/dataobj/index/indexobj/builder_metrics.go b/pkg/dataobj/index/indexobj/builder_metrics.go index 349363d99d3d3..b875d3979d220 100644 --- a/pkg/dataobj/index/indexobj/builder_metrics.go +++ b/pkg/dataobj/index/indexobj/builder_metrics.go @@ -6,6 +6,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/grafana/loki/v3/pkg/dataobj" + "github.com/grafana/loki/v3/pkg/dataobj/consumer/logsobj" "github.com/grafana/loki/v3/pkg/dataobj/sections/indexpointers" "github.com/grafana/loki/v3/pkg/dataobj/sections/pointers" "github.com/grafana/loki/v3/pkg/dataobj/sections/streams" @@ -116,7 +117,7 @@ func newBuilderMetrics() *builderMetrics { } // ObserveConfig updates config metrics based on the provided [BuilderConfig]. -func (m *builderMetrics) ObserveConfig(cfg BuilderConfig) { +func (m *builderMetrics) ObserveConfig(cfg logsobj.BuilderBaseConfig) { m.targetPageSize.Set(float64(cfg.TargetPageSize)) m.targetObjectSize.Set(float64(cfg.TargetObjectSize)) } diff --git a/pkg/dataobj/index/indexobj/builder_test.go b/pkg/dataobj/index/indexobj/builder_test.go index 8271171e2d7f4..737aeb384a9bb 100644 --- a/pkg/dataobj/index/indexobj/builder_test.go +++ b/pkg/dataobj/index/indexobj/builder_test.go @@ -9,13 +9,14 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" + "github.com/grafana/loki/v3/pkg/dataobj/consumer/logsobj" "github.com/grafana/loki/v3/pkg/dataobj/sections/indexpointers" "github.com/grafana/loki/v3/pkg/dataobj/sections/logs" "github.com/grafana/loki/v3/pkg/dataobj/sections/pointers" "github.com/grafana/loki/v3/pkg/dataobj/sections/streams" ) -var testBuilderConfig = BuilderConfig{ +var testBuilderConfig = logsobj.BuilderBaseConfig{ TargetPageSize: 2048, TargetObjectSize: 1 << 22, // 4 MiB TargetSectionSize: 1 << 21, // 2 MiB diff --git a/pkg/dataobj/metastore/object_test.go b/pkg/dataobj/metastore/object_test.go index 3413fd710964d..fada6796c3f74 100644 --- a/pkg/dataobj/metastore/object_test.go +++ b/pkg/dataobj/metastore/object_test.go @@ -216,7 +216,7 @@ func TestValuesEmptyMatcher(t *testing.T) { func TestSectionsForStreamMatchers(t *testing.T) { ctx := user.InjectOrgID(context.Background(), tenantID) - builder, err := indexobj.NewBuilder(indexobj.BuilderConfig{ + builder, err := indexobj.NewBuilder(logsobj.BuilderBaseConfig{ TargetPageSize: 1024 * 1024, TargetObjectSize: 10 * 1024 * 1024, TargetSectionSize: 128, @@ -363,7 +363,7 @@ func TestSectionsForStreamMatchers(t *testing.T) { func TestSectionsForPredicateMatchers(t *testing.T) { ctx := user.InjectOrgID(context.Background(), tenantID) - builder, err := indexobj.NewBuilder(indexobj.BuilderConfig{ + builder, err := indexobj.NewBuilder(logsobj.BuilderBaseConfig{ TargetPageSize: 1024 * 1024, TargetObjectSize: 10 * 1024 * 1024, TargetSectionSize: 128, @@ -503,11 +503,13 @@ func newTestDataBuilder(t *testing.T) *testDataBuilder { bucket := objstore.NewInMemBucket() builder, err := logsobj.NewBuilder(logsobj.BuilderConfig{ - TargetPageSize: 1024 * 1024, // 1MB - TargetObjectSize: 10 * 1024 * 1024, // 10MB - TargetSectionSize: 1024 * 1024, // 1MB - BufferSize: 1024 * 1024, // 1MB - SectionStripeMergeLimit: 2, + BuilderBaseConfig: logsobj.BuilderBaseConfig{ + TargetPageSize: 1024 * 1024, // 1MB + TargetObjectSize: 10 * 1024 * 1024, // 10MB + TargetSectionSize: 1024 * 1024, // 1MB + BufferSize: 1024 * 1024, // 1MB + SectionStripeMergeLimit: 2, + }, }, nil) require.NoError(t, err) diff --git a/pkg/dataobj/metastore/toc_writer.go b/pkg/dataobj/metastore/toc_writer.go index 34aabf1f1119e..20024b2c628a0 100644 --- a/pkg/dataobj/metastore/toc_writer.go +++ b/pkg/dataobj/metastore/toc_writer.go @@ -16,13 +16,14 @@ import ( "github.com/thanos-io/objstore" "github.com/grafana/loki/v3/pkg/dataobj" + "github.com/grafana/loki/v3/pkg/dataobj/consumer/logsobj" "github.com/grafana/loki/v3/pkg/dataobj/index/indexobj" "github.com/grafana/loki/v3/pkg/dataobj/metastore/multitenancy" "github.com/grafana/loki/v3/pkg/dataobj/sections/indexpointers" ) // Define our own builder config for the Table Of Contents object because they are smaller than logs objects. -var tocBuilderCfg = indexobj.BuilderConfig{ +var tocBuilderCfg = logsobj.BuilderBaseConfig{ TargetObjectSize: 32 * 1024 * 1024, TargetPageSize: 4 * 1024 * 1024, BufferSize: 32 * 1024 * 1024, // 8x page size diff --git a/pkg/dataobj/metastore/toc_writer_test.go b/pkg/dataobj/metastore/toc_writer_test.go index df658527e85d9..4da4dfb0e1f6b 100644 --- a/pkg/dataobj/metastore/toc_writer_test.go +++ b/pkg/dataobj/metastore/toc_writer_test.go @@ -14,6 +14,7 @@ import ( "github.com/thanos-io/objstore" "github.com/grafana/loki/v3/pkg/dataobj" + "github.com/grafana/loki/v3/pkg/dataobj/consumer/logsobj" "github.com/grafana/loki/v3/pkg/dataobj/index/indexobj" "github.com/grafana/loki/v3/pkg/dataobj/metastore/multitenancy" ) @@ -21,7 +22,7 @@ import ( func TestTableOfContentsWriter(t *testing.T) { t.Run("append new top-level object to new metastore", func(t *testing.T) { tenantID := "test" - tocBuilder, err := indexobj.NewBuilder(indexobj.BuilderConfig{ + tocBuilder, err := indexobj.NewBuilder(logsobj.BuilderBaseConfig{ TargetPageSize: tocBuilderCfg.TargetPageSize, TargetObjectSize: tocBuilderCfg.TargetObjectSize, TargetSectionSize: tocBuilderCfg.TargetSectionSize, @@ -53,7 +54,7 @@ func TestTableOfContentsWriter(t *testing.T) { t.Run("append default to new top-level metastore v1", func(t *testing.T) { tenantID := "test" - builder, err := indexobj.NewBuilder(indexobj.BuilderConfig{ + builder, err := indexobj.NewBuilder(logsobj.BuilderBaseConfig{ TargetPageSize: tocBuilderCfg.TargetPageSize, TargetObjectSize: tocBuilderCfg.TargetObjectSize, TargetSectionSize: tocBuilderCfg.TargetSectionSize, diff --git a/pkg/engine/internal/executor/dataobjscan_test.go b/pkg/engine/internal/executor/dataobjscan_test.go index d78572c72ba34..8192fd1aa27e8 100644 --- a/pkg/engine/internal/executor/dataobjscan_test.go +++ b/pkg/engine/internal/executor/dataobjscan_test.go @@ -333,12 +333,14 @@ func buildDataobj(t testing.TB, streams []logproto.Stream) *dataobj.Object { t.Helper() builder, err := logsobj.NewBuilder(logsobj.BuilderConfig{ - TargetPageSize: 8_000, - TargetObjectSize: math.MaxInt, - TargetSectionSize: 32_000, - BufferSize: 8_000, - SectionStripeMergeLimit: 2, - DataobjSortOrder: "timestamp-desc", + BuilderBaseConfig: logsobj.BuilderBaseConfig{ + TargetPageSize: 8_000, + TargetObjectSize: math.MaxInt, + TargetSectionSize: 32_000, + BufferSize: 8_000, + SectionStripeMergeLimit: 2, + }, + DataobjSortOrder: "timestamp-desc", }, nil) require.NoError(t, err) diff --git a/pkg/engine/internal/util/objtest/objtest.go b/pkg/engine/internal/util/objtest/objtest.go index 51d30f4acc439..9dd140a7ede49 100644 --- a/pkg/engine/internal/util/objtest/objtest.go +++ b/pkg/engine/internal/util/objtest/objtest.go @@ -134,9 +134,9 @@ func (b *Builder) Close() { } func (b *Builder) buildIndex(ctx context.Context) error { - var builderConfig indexobj.BuilderConfig + var builderConfig logsobj.BuilderConfig builderConfig.RegisterFlagsWithPrefix("", flag.NewFlagSet("", flag.PanicOnError)) // Acquire defaults - indexBuilder, err := indexobj.NewBuilder(builderConfig, nil) + indexBuilder, err := indexobj.NewBuilder(builderConfig.BuilderBaseConfig, nil) if err != nil { return fmt.Errorf("creating logs builder: %w", err) } diff --git a/pkg/logql/bench/store_dataobj.go b/pkg/logql/bench/store_dataobj.go index 00cca8f7a84bd..ad5d168ff96ec 100644 --- a/pkg/logql/bench/store_dataobj.go +++ b/pkg/logql/bench/store_dataobj.go @@ -74,13 +74,14 @@ func NewDataObjStore(dir, tenant string) (*DataObjStore, error) { } builder, err := logsobj.NewBuilder(logsobj.BuilderConfig{ - TargetPageSize: 2 * 1024 * 1024, // 2MB - MaxPageRows: 1000, - TargetObjectSize: 128 * 1024 * 1024, // 128MB - TargetSectionSize: 16 * 1024 * 1024, // 16MB - BufferSize: 16 * 1024 * 1024, // 16MB - - SectionStripeMergeLimit: 2, + BuilderBaseConfig: logsobj.BuilderBaseConfig{ + TargetPageSize: 2 * 1024 * 1024, // 2MB + MaxPageRows: 1000, + TargetObjectSize: 128 * 1024 * 1024, // 128MB + TargetSectionSize: 16 * 1024 * 1024, // 16MB + BufferSize: 16 * 1024 * 1024, // 16MB + SectionStripeMergeLimit: 2, + }, }, nil) if err != nil { return nil, fmt.Errorf("failed to create builder: %w", err) @@ -204,7 +205,7 @@ func (s *DataObjStore) buildIndex() error { return nil } - builder, err := indexobj.NewBuilder(indexobj.BuilderConfig{ + builder, err := indexobj.NewBuilder(logsobj.BuilderBaseConfig{ TargetPageSize: 128 * 1024, // 128KB TargetObjectSize: 128 * 1024 * 1024, // 128MB TargetSectionSize: 16 * 1024 * 1024, // 16MB diff --git a/tools/dataobj-sort/main.go b/tools/dataobj-sort/main.go index b16da0078fef1..8bfa41d01bff4 100644 --- a/tools/dataobj-sort/main.go +++ b/tools/dataobj-sort/main.go @@ -40,12 +40,14 @@ func main() { } cfg := logsobj.BuilderConfig{ - TargetPageSize: 64 << 10, - MaxPageRows: 1000, - TargetObjectSize: 512 << 20, - TargetSectionSize: 512 << 20, - BufferSize: 16 << 20, - SectionStripeMergeLimit: 8, + BuilderBaseConfig: logsobj.BuilderBaseConfig{ + TargetPageSize: 64 << 10, + MaxPageRows: 1000, + TargetObjectSize: 512 << 20, + TargetSectionSize: 512 << 20, + BufferSize: 16 << 20, + SectionStripeMergeLimit: 8, + }, } scr, err := scratch.NewFilesystem(gokitlog.NewNopLogger(), os.TempDir()) if err != nil {