New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: Size based retention #7927
base: main
Are you sure you want to change the base?
Conversation
|
After some weeks of radio silence (we were working hard), we return to this PR. We are at a point this is kind of working as expected, using a config file like this one: auth_enabled: false
server:
http_listen_port: 3100
grpc_listen_port: 9096
common:
instance_addr: 127.0.0.1
path_prefix: /tmp/loki
storage:
filesystem:
chunks_directory: /tmp/loki/chunks
rules_directory: /tmp/loki/rules
size_based_retention_percentage: 2
replication_factor: 1
ring:
kvstore:
store: inmemory
storage_config:
boltdb_shipper:
active_index_directory: /tmp/loki/boltdb-shipper-active
shared_store: filesystem
filesystem:
directory: /tmp/loki/chunks
schema_config:
configs:
- from: 2020-10-24
store: boltdb-shipper
object_store: filesystem
schema: v11
index:
prefix: index_
period: 24h
ingester:
wal:
enabled: true
dir: /tmp/loki/chunks/wal
flush_on_shutdown: true
compactor:
working_directory: /tmp/loki/retention
shared_store: filesystem
compaction_interval: 30s
retention_enabled: true
retention_delete_delay: 1m
retention_delete_worker_count: 150
limits_config:
ingestion_rate_mb: 10
Note the May you give it a try and let us known your thoughts? |
I'm a buried at the moment but I will test when I get the chance! |
Hey mate :) Sorry for being a pain in the ass. We're really keen on getting this functionality into shape. Would you mind giving it another review? Thanks, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry again for the delay to look at this. I can't run it because things have drifted but I have done a review.
This is looking good and makes sense. I've added comments about a couple of concerns. I'm also concerned there there's a lot of logic here and very few tests. I notice that you merged my original suggestions. That's fair to avoid rework but also those were illustrative and should probably be tested
return nil | ||
} | ||
|
||
func (c *Compactor) sizeBasedCompactionInterval(ctx context.Context) error { | ||
if exceeded, err := c.sizeBasedRetention.ThresholdExceeded(); !exceeded { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't handle the error when exceeded == true
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are not handling this situation, because the function ThresholdExceeded()
does not return this combination: true, err
. The possible returns are:
false, err
false, nil
true, nil
Do you think we should modify this anyway?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For uniform error handling, I think it would be best to modify this to handle true, err
. This guards against future changes to ThresholdExceeded
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewing the code again seems that there is no point in checking all the combinations.
We need just 2 guards:
- Log a message and
return nil
if there is an error, or return nil
if doesn't exceed threshold.
In other situations we should continue with the execution.
Pushing these changes with less nesting, let me know your thoughts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes sense. Do we return nil
on error to not exit compactions? That makes sense, but we should probably add a metric here so an alert can be raised on successive failures.
@@ -90,7 +227,17 @@ type Marker struct { | |||
markTimeout time.Duration | |||
} | |||
|
|||
var mu sync.Mutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need the mutex here. The compactor should probably own the marker metrics and pass it to markers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not implement this.
This section of the code was written by @rbarry82 who sadly passed away a month ago.
Do you mean something like this?
func NewMarker(workingDirectory string, expiration ExpirationChecker, markTimeout time.Duration, chunkClient client.Client, metrics *markerMetrics) (*Marker, error) {
return &Marker{
workingDirectory: workingDirectory,
expiration: expiration,
markerMetrics: metrics,
chunkClient: chunkClient,
markTimeout: markTimeout,
}, nil
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sorry to hear about the loss of your colleague 🙁
Metrics can only be registered once. If we try to register them a second time, a runtime panic occurs. The code, as is prevents registering a second time but it's a little akward.
I think we could change
func NewMarker(workingDirectory string, expiration ExpirationChecker, markTimeout time.Duration, chunkClient client.Client, r prometheus.Registerer) (*Marker, error)
to take metrics like this
func NewMarker(workingDirectory string, expiration ExpirationChecker, markTimeout time.Duration, chunkClient client.Client, metrics *markerMetrics) (*Marker, error)
But then we need a place to instantiate the metrics. We could probably do it in the compactor's init
function and make it a field on the compactor.
If we do this, the mutex can be deleted altogether.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
About mutex, now I remember that Ryan added the following to Compactor
struct:
// Size based compaction means that two compactions might try to happen at the same time.
// Use this to ensure size-based and normal compaction can't step on eachother.
compactionMtx sync.Mutex
Does make sense or he was wrong?
About your suggestion, I have this draft right now (didn't push it yet)... Is it close to what you suggest?
modified pkg/storage/stores/indexshipper/compactor/compactor.go
@@ -182,6 +182,7 @@ type Compactor struct {
// Size based compaction means that two compactions might try to happen at the same time.
// Use this to ensure size-based and normal compaction can't step on eachother.
compactionMtx sync.Mutex
+ markerMetrics *retention.markerMetrics
// one for each object store
storeContainers map[string]storeContainer
@@ -310,6 +311,8 @@ func (c *Compactor) init(objectStoreClients map[string]client.ObjectClient, sche
}
}
+ c.markerMetrics = retention.newMarkerMetrics(r)
+
c.storeContainers = make(map[string]storeContainer, len(objectStoreClients))
for objectStoreType, objectClient := range objectStoreClients {
var sc storeContainer
@@ -338,7 +341,7 @@ func (c *Compactor) init(objectStoreClients map[string]client.ObjectClient, sche
return fmt.Errorf("failed to init sweeper: %w", err)
}
- sc.tableMarker, err = retention.NewMarker(retentionWorkDir, c.expirationChecker, c.cfg.RetentionTableTimeout, chunkClient, r)
+ sc.tableMarker, err = retention.NewMarker(retentionWorkDir, c.expirationChecker, c.cfg.RetentionTableTimeout, chunkClient, c.markerMetrics)
if err != nil {
return fmt.Errorf("failed to init table marker: %w", err)
}
modified pkg/storage/stores/indexshipper/compactor/retention/retention.go
@@ -5,9 +5,9 @@ import (
"context"
"errors"
"fmt"
- "sync"
"os"
"path/filepath"
+ "sync"
"time"
@@ -227,21 +227,11 @@ type Marker struct {
markTimeout time.Duration
}
-var mu sync.Mutex
-var metrics *markerMetrics
-
-func NewMarker(workingDirectory string, expiration ExpirationChecker, markTimeout time.Duration, chunkClient client.Client, r prometheus.Registerer) (*Marker, error) {
- mu.Lock()
- defer mu.Unlock()
-
- if metrics == nil {
- metrics = newMarkerMetrics(r)
- }
-
+func NewMarker(workingDirectory string, expiration ExpirationChecker, markTimeout time.Duration, chunkClient client.Client, metrics *markerMetrics) (*Marker, error) {
return &Marker{
workingDirectory: workingDirectory,
expiration: expiration,
- markerMetrics: newMarkerMetrics(r),
+ markerMetrics: metrics,
chunkClient: chunkClient,
markTimeout: markTimeout,
}, nil
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry again for the delay to look at this. I can't run it because things have drifted but I have done a review.
This is looking good and makes sense. I've added comments about a couple of concerns. I'm also concerned there there's a lot of logic here and very few tests. I notice that you merged my original suggestions. That's fair to avoid rework but also those were illustrative and should probably be tested
I'm working on the test right now. |
This is taking a lot longer than we expected as we've bumped into quite a lot of issues with the delta between when we originally forked and the current state. In combination with @rbarry82's unfortunate passing, things are going slower than usual. Work is however still ongoing and we're still looking to get this across the finish line. |
Any update on this? |
Some, but nothing substantial. Loki moves quite rapidly, and as this feature touches multiple parts of the code base, it's been somewhat of a moving target. I'll get back to you all at the beginning of next year with more details. |
We would absolutely love this feature. Any progress so far this year? |
What this PR does / why we need it:
This
WorkInProgress
RP is the first approach to try to solve #6876.I'm opening the PR at this stage to share the progress we made and fundamentally to read suggestions about it!
Which issue(s) this PR fixes:
Fixes #6876
Special notes for your reviewer:
To activate the size_based_retention policy you have to add the
size_based_retention_percentage
in thestorage.filesystem
section:Besides in the
compactor
section, the value forretention_enabled
must betrue
, for instance:How to test:
make loki
./cmd/loki/loki -config.file=cmd/loki/loki-local-config.yaml
2%
Loki will start deleting chunk files and you will see in the logs:Doubts and considerations
Settings
Despite of the fact it is working seems the solution may be improved a lot, for instance we need 2 settings in the config file:
size_based_retention_percentage
instorage.filesystem
sectionretention_enabled: true
incompactor
section.I would love to avoid the need for
retention_enabled: true
....About the author
Since I'm mostly a Python/PHP Software Engineer writing my first lines in Go and it's my first contact with the Loki codebase, there are surely many things that can be improved. Please feel free to comment!! 😄
Checklist
CONTRIBUTING.md
guideCHANGELOG.md
updateddocs/sources/upgrading/_index.md