Skip to content

Commit

Permalink
[bloom-compactor] remove BloomCompactorMinTableAge check (#11546)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
`BloomCompactorMinTableAge` was added with the idea that we will skip
most recent index files from ingesters that are not compacted into TSDB
indexes yet.

The default value is an hour. This blocks bloom-compacter processing
TSBD of the day, because end timestamp is always in the last 15 mins. We
can either reduce it something lower than the frequency of TSDB indexes
being built (< 15mins).
Here I choose to remove it altogether, assuming uncompacted indexes from
ingesters will not be processed as a table as there won't be a schema
for that table with the [check
here](https://github.com/grafana/loki/blob/main/pkg/bloomcompactor/bloomcompactor.go#L268-L272).
  • Loading branch information
poyzannur committed Jan 4, 2024
1 parent ce57448 commit 599eed7
Show file tree
Hide file tree
Showing 4 changed files with 2 additions and 21 deletions.
7 changes: 0 additions & 7 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -3084,13 +3084,6 @@ shard_streams:
# CLI flag: -bloom-compactor.max-table-age
[bloom_compactor_max_table_age: <duration> | default = 168h]

# The minimum age of a table before it is compacted. Do not compact tables newer
# than the the configured time. Default to 1 hour. 0s means no limit. This is
# useful to avoid compacting tables that will be updated with out-of-order
# writes.
# CLI flag: -bloom-compactor.min-table-age
[bloom_compactor_min_table_age: <duration> | default = 1h]

# Whether to compact chunks into bloom filters.
# CLI flag: -bloom-compactor.enable-compaction
[bloom_compactor_enable_compaction: <boolean> | default = false]
Expand Down
9 changes: 2 additions & 7 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,13 +246,11 @@ func (c *Compactor) runCompaction(ctx context.Context) error {
_ = concurrency.ForEachJob(ctx, len(tables), parallelism, func(ctx context.Context, i int) error {
tableName := tables[i]
logger := log.With(c.logger, "table", tableName)
level.Info(logger).Log("msg", "compacting table")
err := c.compactTable(ctx, logger, tableName, tablesIntervals[tableName])
if err != nil {
errs.Add(err)
return nil
}
level.Info(logger).Log("msg", "finished compacting table")
return nil
})

Expand Down Expand Up @@ -307,12 +305,7 @@ func (c *Compactor) compactUsers(ctx context.Context, logger log.Logger, sc stor

// Skip this table if it is too new/old for the tenant limits.
now := model.Now()
tableMinAge := c.limits.BloomCompactorMinTableAge(tenant)
tableMaxAge := c.limits.BloomCompactorMaxTableAge(tenant)
if tableMinAge > 0 && tableInterval.End.After(now.Add(-tableMinAge)) {
level.Debug(tenantLogger).Log("msg", "skipping tenant because table is too new ", "table-min-age", tableMinAge, "table-end", tableInterval.End, "now", now)
continue
}
if tableMaxAge > 0 && tableInterval.Start.Before(now.Add(-tableMaxAge)) {
level.Debug(tenantLogger).Log("msg", "skipping tenant because table is too old", "table-max-age", tableMaxAge, "table-start", tableInterval.Start, "now", now)
continue
Expand Down Expand Up @@ -507,6 +500,7 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,
}
}()

level.Info(logger).Log("msg", "started compacting table", "table", job.tableName, "tenant", job.tenantID)
if len(blocksMatchingJob) == 0 && len(metasMatchingJob) > 0 {
// There is no change to any blocks, no compaction needed
level.Info(logger).Log("msg", "No changes to tsdb, no compaction needed")
Expand Down Expand Up @@ -597,5 +591,6 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,
level.Error(logger).Log("msg", "failed uploading meta.json to storage", "err", err)
return err
}
level.Info(logger).Log("msg", "finished compacting table", "table", job.tableName, "tenant", job.tenantID)
return nil
}
1 change: 0 additions & 1 deletion pkg/bloomcompactor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ type Limits interface {
downloads.Limits
BloomCompactorShardSize(tenantID string) int
BloomCompactorMaxTableAge(tenantID string) time.Duration
BloomCompactorMinTableAge(tenantID string) time.Duration
BloomCompactorEnabled(tenantID string) bool
BloomNGramLength(tenantID string) int
BloomNGramSkip(tenantID string) int
Expand Down
6 changes: 0 additions & 6 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ type Limits struct {

BloomCompactorShardSize int `yaml:"bloom_compactor_shard_size" json:"bloom_compactor_shard_size"`
BloomCompactorMaxTableAge time.Duration `yaml:"bloom_compactor_max_table_age" json:"bloom_compactor_max_table_age"`
BloomCompactorMinTableAge time.Duration `yaml:"bloom_compactor_min_table_age" json:"bloom_compactor_min_table_age"`
BloomCompactorEnabled bool `yaml:"bloom_compactor_enable_compaction" json:"bloom_compactor_enable_compaction"`
BloomNGramLength int `yaml:"bloom_ngram_length" json:"bloom_ngram_length"`
BloomNGramSkip int `yaml:"bloom_ngram_skip" json:"bloom_ngram_skip"`
Expand Down Expand Up @@ -312,7 +311,6 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {

f.IntVar(&l.BloomCompactorShardSize, "bloom-compactor.shard-size", 1, "The shard size defines how many bloom compactors should be used by a tenant when computing blooms. If it's set to 0, shuffle sharding is disabled.")
f.DurationVar(&l.BloomCompactorMaxTableAge, "bloom-compactor.max-table-age", 7*24*time.Hour, "The maximum age of a table before it is compacted. Do not compact tables older than the the configured time. Default to 7 days. 0s means no limit.")
f.DurationVar(&l.BloomCompactorMinTableAge, "bloom-compactor.min-table-age", 1*time.Hour, "The minimum age of a table before it is compacted. Do not compact tables newer than the the configured time. Default to 1 hour. 0s means no limit. This is useful to avoid compacting tables that will be updated with out-of-order writes.")
f.BoolVar(&l.BloomCompactorEnabled, "bloom-compactor.enable-compaction", false, "Whether to compact chunks into bloom filters.")
f.IntVar(&l.BloomNGramLength, "bloom-compactor.ngram-length", 4, "Length of the n-grams created when computing blooms from log lines.")
f.IntVar(&l.BloomNGramSkip, "bloom-compactor.ngram-skip", 0, "Skip factor for the n-grams created when computing blooms from log lines.")
Expand Down Expand Up @@ -838,10 +836,6 @@ func (o *Overrides) BloomCompactorMaxTableAge(userID string) time.Duration {
return o.getOverridesForUser(userID).BloomCompactorMaxTableAge
}

func (o *Overrides) BloomCompactorMinTableAge(userID string) time.Duration {
return o.getOverridesForUser(userID).BloomCompactorMinTableAge
}

func (o *Overrides) BloomCompactorEnabled(userID string) bool {
return o.getOverridesForUser(userID).BloomCompactorEnabled
}
Expand Down

0 comments on commit 599eed7

Please sign in to comment.