Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1211,7 +1211,7 @@ dataobj:
# CLI flag: -dataobj-consumer.buffer-size
[buffer_size: <int> | default = 16MiB]

# The maximum number of log section stripes to merge into a section at
# The maximum number of dataobj section stripes to merge into a section at
# once. Must be greater than 1.
# CLI flag: -dataobj-consumer.section-stripe-merge-limit
[section_stripe_merge_limit: <int> | default = 2]
Expand Down Expand Up @@ -1431,29 +1431,35 @@ dataobj:
[topic: <string> | default = ""]

index:
# The size of the target page to use for the index object builder.
# The target maximum amount of uncompressed data to hold in data pages (for
# columnar sections). Uncompressed size is used for consistent I/O and
# planning.
# CLI flag: -dataobj-index-builder.target-page-size
[target_page_size: <int> | default = 128KiB]

# The maximum row count for pages to use for the index builder. A value of 0
# means no limit.
# The maximum row count for pages to use for the data object builder. A
# value of 0 means no limit.
# CLI flag: -dataobj-index-builder.max-page-rows
[max_page_rows: <int> | default = 0]

# The size of the target object to use for the index object builder.
# CLI flag: -dataobj-index-builder.target-object-size
# The target maximum size of the encoded object and all of its encoded
# sections (after compression), to limit memory usage of a builder.
# CLI flag: -dataobj-index-builder.target-builder-memory-limit
[target_object_size: <int> | default = 64MiB]

# Configures a maximum size for sections, for sections that support it.
# The target maximum amount of uncompressed data to hold in sections, for
# sections that support being limited by size. Uncompressed size is used for
# consistent I/O and planning.
# CLI flag: -dataobj-index-builder.target-section-size
[target_section_size: <int> | default = 16MiB]

# The size of the buffer to use for sorting logs.
# The size of logs to buffer in memory before adding into columnar builders,
# used to reduce CPU load of sorting.
# CLI flag: -dataobj-index-builder.buffer-size
[buffer_size: <int> | default = 2MiB]

# The maximum number of stripes to merge into a section at once. Must be
# greater than 1.
# The maximum number of dataobj section stripes to merge into a section at
# once. Must be greater than 1.
# CLI flag: -dataobj-index-builder.section-stripe-merge-limit
[section_stripe_merge_limit: <int> | default = 2]

Expand Down
3 changes: 3 additions & 0 deletions pkg/dataobj/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ func (cfg *Config) Validate() error {
if err := cfg.Consumer.Validate(); err != nil {
return err
}
if err := cfg.Index.Validate(); err != nil {
return err
}
if err := cfg.Metastore.Validate(); err != nil {
return err
}
Expand Down
53 changes: 38 additions & 15 deletions pkg/dataobj/consumer/logsobj/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ const (
sortTimestampDESC = "timestamp-desc"
)

// BuilderConfig configures a [Builder].
type BuilderConfig struct {
// BuilderBaseConfig configures a data object builder.
type BuilderBaseConfig struct {
// TargetPageSize configures a target size for encoded pages within the data
// object. TargetPageSize accounts for encoding, but not for compression.
TargetPageSize flagext.Bytes `yaml:"target_page_size"`
Expand Down Expand Up @@ -74,30 +74,20 @@ type BuilderConfig struct {
// values of MergeSize trade off lower memory overhead for higher time spent
// merging.
SectionStripeMergeLimit int `yaml:"section_stripe_merge_limit"`

// DataobjSortOrder defines the order in which the rows of the logs sections are sorted.
// They can either be sorted by [streamID ASC, timestamp DESC] or [timestamp DESC, streamID ASC].
DataobjSortOrder string `yaml:"dataobj_sort_order" doc:"hidden"`
}

// RegisterFlagsWithPrefix registers flags with the given prefix.
func (cfg *BuilderConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
_ = cfg.TargetPageSize.Set("2MB")
_ = cfg.TargetObjectSize.Set("1GB")
_ = cfg.BufferSize.Set("16MB")
_ = cfg.TargetSectionSize.Set("128MB")

func (cfg *BuilderBaseConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.Var(&cfg.TargetPageSize, prefix+"target-page-size", "The target maximum amount of uncompressed data to hold in data pages (for columnar sections). Uncompressed size is used for consistent I/O and planning.")
f.IntVar(&cfg.MaxPageRows, prefix+"max-page-rows", 0, "The maximum row count for pages to use for the data object builder. A value of 0 means no limit.")
f.Var(&cfg.TargetObjectSize, prefix+"target-builder-memory-limit", "The target maximum size of the encoded object and all of its encoded sections (after compression), to limit memory usage of a builder.")
f.Var(&cfg.TargetSectionSize, prefix+"target-section-size", "The target maximum amount of uncompressed data to hold in sections, for sections that support being limited by size. Uncompressed size is used for consistent I/O and planning.")
f.Var(&cfg.BufferSize, prefix+"buffer-size", "The size of logs to buffer in memory before adding into columnar builders, used to reduce CPU load of sorting.")
f.IntVar(&cfg.SectionStripeMergeLimit, prefix+"section-stripe-merge-limit", 2, "The maximum number of log section stripes to merge into a section at once. Must be greater than 1.")
f.StringVar(&cfg.DataobjSortOrder, prefix+"dataobj-sort-order", sortStreamASC, "The desired sort order of the logs section. Can either be `stream-asc` (order by streamID ascending and timestamp descending) or `timestamp-desc` (order by timestamp descending and streamID ascending).")
f.IntVar(&cfg.SectionStripeMergeLimit, prefix+"section-stripe-merge-limit", 2, "The maximum number of dataobj section stripes to merge into a section at once. Must be greater than 1.")
}

// Validate validates the BuilderConfig.
func (cfg *BuilderConfig) Validate() error {
func (cfg *BuilderBaseConfig) Validate() error {
var errs []error

if cfg.TargetPageSize <= 0 {
Expand All @@ -122,9 +112,42 @@ func (cfg *BuilderConfig) Validate() error {
errs = append(errs, errors.New("LogsMergeStripesMax must be greater than 1"))
}

return errors.Join(errs...)
}

// BuilderConfig configures a [Builder].
type BuilderConfig struct {
BuilderBaseConfig `yaml:",inline"`

// DataobjSortOrder defines the order in which the rows of the logs sections are sorted.
// They can either be sorted by [streamID ASC, timestamp DESC] or [timestamp DESC, streamID ASC].
DataobjSortOrder string `yaml:"dataobj_sort_order" doc:"hidden"`
}

// RegisterFlagsWithPrefix registers flags with the given prefix.
func (cfg *BuilderConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
// Set defaults for base builder configuration
_ = cfg.BuilderBaseConfig.TargetPageSize.Set("2MB")
_ = cfg.BuilderBaseConfig.TargetObjectSize.Set("1GB")
_ = cfg.BuilderBaseConfig.BufferSize.Set("16MB")
_ = cfg.BuilderBaseConfig.TargetSectionSize.Set("128MB")
cfg.BuilderBaseConfig.RegisterFlagsWithPrefix(prefix, f)

f.StringVar(&cfg.DataobjSortOrder, prefix+"dataobj-sort-order", sortStreamASC, "The desired sort order of the logs section. Can either be `stream-asc` (order by streamID ascending and timestamp descending) or `timestamp-desc` (order by timestamp descending and streamID ascending).")
}

// Validate validates the BuilderConfig.
func (cfg *BuilderConfig) Validate() error {
var errs []error

if err := cfg.BuilderBaseConfig.Validate(); err != nil {
errs = append(errs, err)
}

if cfg.DataobjSortOrder == "" {
cfg.DataobjSortOrder = sortStreamASC // default to [streamID ASC, timestamp DESC] sorting
}

if cfg.DataobjSortOrder != sortStreamASC && cfg.DataobjSortOrder != sortTimestampDESC {
errs = append(errs, fmt.Errorf("invalid dataobj sort order. must be one of `stream-asc` or `timestamp-desc`, got: %s", cfg.DataobjSortOrder))
}
Expand Down
15 changes: 8 additions & 7 deletions pkg/dataobj/consumer/logsobj/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ import (
)

var testBuilderConfig = BuilderConfig{
TargetPageSize: 2048,
TargetObjectSize: 1 << 20, // 1 MiB
TargetSectionSize: 8 << 10, // 8 KiB

BufferSize: 2048 * 8,
SectionStripeMergeLimit: 2,
DataobjSortOrder: sortTimestampDESC,
BuilderBaseConfig: BuilderBaseConfig{
TargetPageSize: 2048,
TargetObjectSize: 1 << 20, // 1 MiB
TargetSectionSize: 8 << 10, // 8 KiB
BufferSize: 2048 * 8,
SectionStripeMergeLimit: 2,
},
DataobjSortOrder: sortTimestampDESC,
}

func TestBuilder(t *testing.T) {
Expand Down
14 changes: 8 additions & 6 deletions pkg/dataobj/consumer/partition_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ import (
)

var testBuilderConfig = logsobj.BuilderConfig{
TargetPageSize: 2048,
MaxPageRows: 10,
TargetObjectSize: 1 << 22, // 4 MiB
TargetSectionSize: 1 << 22, // 4 MiB
BufferSize: 2048 * 8,
SectionStripeMergeLimit: 2,
BuilderBaseConfig: logsobj.BuilderBaseConfig{
TargetPageSize: 2048,
MaxPageRows: 10,
TargetObjectSize: 1 << 22, // 4 MiB
TargetSectionSize: 1 << 22, // 4 MiB
BufferSize: 2048 * 8,
SectionStripeMergeLimit: 2,
},
}

func TestPartitionProcessor_Flush(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/dataobj/index/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func NewIndexBuilder(
}

// Create index building dependencies
builder, err := indexobj.NewBuilder(cfg.BuilderConfig, scratchStore)
builder, err := indexobj.NewBuilder(cfg.BuilderBaseConfig, scratchStore)
if err != nil {
return nil, fmt.Errorf("failed to create index builder: %w", err)
}
Expand Down
25 changes: 12 additions & 13 deletions pkg/dataobj/index/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@ import (

"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/consumer/logsobj"
"github.com/grafana/loki/v3/pkg/dataobj/index/indexobj"
"github.com/grafana/loki/v3/pkg/dataobj/metastore"
"github.com/grafana/loki/v3/pkg/dataobj/sections/pointers"
"github.com/grafana/loki/v3/pkg/kafka"
"github.com/grafana/loki/v3/pkg/kafka/testkafka"
"github.com/grafana/loki/v3/pkg/logproto"
)

var testBuilderConfig = indexobj.BuilderConfig{
var testBuilderConfig = logsobj.BuilderBaseConfig{
TargetPageSize: 128 * 1024,
TargetObjectSize: 4 * 1024 * 1024,
TargetSectionSize: 2 * 1024 * 1024,
Expand Down Expand Up @@ -53,8 +52,8 @@ func TestIndexBuilder_PartitionRevocation(t *testing.T) {
// Create a builder with mocks for dependencies
builder, err := NewIndexBuilder(
Config{
BuilderConfig: testBuilderConfig,
EventsPerIndex: 1,
BuilderBaseConfig: testBuilderConfig,
EventsPerIndex: 1,
},
metastore.Config{},
kafka.Config{},
Expand Down Expand Up @@ -121,8 +120,8 @@ func TestIndexBuilder(t *testing.T) {

p, err := NewIndexBuilder(
Config{
BuilderConfig: testBuilderConfig,
EventsPerIndex: 3,
BuilderBaseConfig: testBuilderConfig,
EventsPerIndex: 3,
},
metastore.Config{},
kafka.Config{},
Expand Down Expand Up @@ -231,13 +230,13 @@ func (m *mockKafkaClient) Close() {}

func buildLogObject(t *testing.T, app string, path string, bucket objstore.Bucket) {
candidate, err := logsobj.NewBuilder(logsobj.BuilderConfig{
TargetPageSize: 128 * 1024,
TargetObjectSize: 4 * 1024 * 1024,
TargetSectionSize: 2 * 1024 * 1024,

BufferSize: 4 * 1024 * 1024,
SectionStripeMergeLimit: 2,

BuilderBaseConfig: logsobj.BuilderBaseConfig{
TargetPageSize: 128 * 1024,
TargetObjectSize: 4 * 1024 * 1024,
TargetSectionSize: 2 * 1024 * 1024,
BufferSize: 4 * 1024 * 1024,
SectionStripeMergeLimit: 2,
},
DataobjSortOrder: "stream-asc",
}, nil)
require.NoError(t, err)
Expand Down
14 changes: 8 additions & 6 deletions pkg/dataobj/index/calculate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/grafana/loki/pkg/push"
)

var testCalculatorConfig = indexobj.BuilderConfig{
var testCalculatorConfig = logsobj.BuilderBaseConfig{
TargetPageSize: 2048,
TargetObjectSize: 1 << 22, // 4 MiB
BufferSize: 2048 * 8,
Expand All @@ -39,11 +39,13 @@ func createTestLogObject(t *testing.T, tenants int) *dataobj.Object {
t.Helper()

builder, err := logsobj.NewBuilder(logsobj.BuilderConfig{
TargetPageSize: 2048,
TargetObjectSize: 1 << 22,
TargetSectionSize: 1 << 21,
BufferSize: 2048 * 8,
SectionStripeMergeLimit: 2,
BuilderBaseConfig: logsobj.BuilderBaseConfig{
TargetPageSize: 2048,
TargetObjectSize: 1 << 22,
TargetSectionSize: 1 << 21,
BufferSize: 2048 * 8,
SectionStripeMergeLimit: 2,
},
}, nil)
require.NoError(t, err)

Expand Down
30 changes: 24 additions & 6 deletions pkg/dataobj/index/config.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,44 @@
package index

import (
"errors"
"flag"
"time"

"github.com/grafana/loki/v3/pkg/dataobj/index/indexobj"
"github.com/grafana/loki/v3/pkg/dataobj/consumer/logsobj"
)

type Config struct {
indexobj.BuilderConfig `yaml:",inline"`
EventsPerIndex int `yaml:"events_per_index" experimental:"true"`
FlushInterval time.Duration `yaml:"flush_interval" experimental:"true"`
MaxIdleTime time.Duration `yaml:"max_idle_time" experimental:"true"`
logsobj.BuilderBaseConfig `yaml:",inline"`
EventsPerIndex int `yaml:"events_per_index" experimental:"true"`
FlushInterval time.Duration `yaml:"flush_interval" experimental:"true"`
MaxIdleTime time.Duration `yaml:"max_idle_time" experimental:"true"`
}

func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix("dataobj-index-builder.", f)
}

func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.BuilderConfig.RegisterFlagsWithPrefix(prefix, f)
// Set defaults for base builder configuration
_ = cfg.BuilderBaseConfig.TargetPageSize.Set("128KB")
_ = cfg.BuilderBaseConfig.TargetObjectSize.Set("64MB")
_ = cfg.BuilderBaseConfig.BufferSize.Set("2MB")
_ = cfg.BuilderBaseConfig.TargetSectionSize.Set("16MB")
cfg.BuilderBaseConfig.RegisterFlagsWithPrefix(prefix, f)

f.IntVar(&cfg.EventsPerIndex, prefix+"events-per-index", 32, "Experimental: The number of events to batch before building an index")
f.DurationVar(&cfg.FlushInterval, prefix+"flush-interval", 1*time.Minute, "Experimental: How often to check for stale partitions to flush")
f.DurationVar(&cfg.MaxIdleTime, prefix+"max-idle-time", 30*time.Minute, "Experimental: Maximum time to wait before flushing buffered events")
}

// Validate validates the BuilderConfig.
func (cfg *Config) Validate() error {
var errs []error

if err := cfg.BuilderBaseConfig.Validate(); err != nil {
errs = append(errs, err)
}

return errors.Join(errs...)
}
Loading
Loading