Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compactor: multi-store support #7447

Merged
merged 33 commits into from
Apr 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
28e490a
compactor: add multi-store support
ashwanthgoli Oct 14, 2022
8a120f1
compactor: add test cases for multi-store
ashwanthgoli Oct 18, 2022
161cb2a
fix lint in compactor_test
ashwanthgoli Oct 18, 2022
91b8f02
add documentation
ashwanthgoli Oct 18, 2022
555b921
nit
ashwanthgoli Oct 26, 2022
fccb9c0
Merge branch 'main' into ashwanth/compactor-multi-store
ashwanthgoli Feb 9, 2023
5a06049
add better defaults for delete_request_store
ashwanthgoli Feb 9, 2023
47a5828
fix tests
ashwanthgoli Feb 9, 2023
16bf55d
add upgrade instructions
ashwanthgoli Feb 10, 2023
195167d
prefer tsdb compactor defaults over boltdb
ashwanthgoli Feb 21, 2023
a85d01c
Merge branch 'main' into ashwanth/compactor-multi-store
ashwanthgoli Feb 21, 2023
8ab700d
docs squad suggestions
ashwanthgoli Feb 27, 2023
0a0d7c0
docs squad suggestions
ashwanthgoli Feb 28, 2023
55bd54e
run make doc
ashwanthgoli Feb 28, 2023
b563f6c
migrate markers based on shared store or it's legacy defaults
ashwanthgoli Mar 8, 2023
5a01a55
upgrade upgrade instructions
ashwanthgoli Mar 8, 2023
a27165d
nit
ashwanthgoli Mar 8, 2023
cd34d49
Merge branch 'main' into ashwanth/compactor-multi-store
ashwanthgoli Mar 8, 2023
bed6d81
hide yaml field
ashwanthgoli Mar 8, 2023
d512928
chunk client use fs encoder
ashwanthgoli Mar 8, 2023
146e906
migrate markers to all store specific dirs
ashwanthgoli Apr 13, 2023
501fcfc
Merge branch 'main' into ashwanth/compactor-multi-store
ashwanthgoli Apr 14, 2023
2b473e3
update release version in upgrade notes
ashwanthgoli Apr 14, 2023
7d0f01d
fix lint
ashwanthgoli Apr 14, 2023
bccb9ce
init deletes before marker
ashwanthgoli Apr 14, 2023
48b44be
Merge branch 'main' into ashwanth/compactor-multi-store
ashwanthgoli Apr 17, 2023
5871a03
do not set compactor shared_store defaults based on common storage co…
ashwanthgoli Apr 18, 2023
b17b302
update deletion test to use multiple periods
ashwanthgoli Apr 18, 2023
774e712
minor changes for code readablitiy
ashwanthgoli Apr 25, 2023
8cf21f4
fixup! minor changes for code readablitiy
ashwanthgoli Apr 25, 2023
5225c80
update release notes
ashwanthgoli Apr 25, 2023
c2eaa8d
remove release version from upgrade notes
ashwanthgoli Apr 25, 2023
4d300ae
Merge branch 'main' into ashwanth/compactor-multi-store
ashwanthgoli Apr 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 8 additions & 1 deletion docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2014,7 +2014,9 @@ The `compactor` block configures the compactor component, which compacts index s
[working_directory: <string> | default = ""]

# The shared store used for storing boltdb files. Supported types: gcs, s3,
# azure, swift, filesystem, bos, cos.
# azure, swift, filesystem, bos, cos. If not set, compactor will be initialized
# to operate on all the object stores that contain either boltdb-shipper or tsdb
# index.
# CLI flag: -boltdb.shipper.compactor.shared-store
[shared_store: <string> | default = ""]

Expand Down Expand Up @@ -2051,6 +2053,11 @@ The `compactor` block configures the compactor component, which compacts index s
# CLI flag: -boltdb.shipper.compactor.retention-table-timeout
[retention_table_timeout: <duration> | default = 0s]

# Store used for managing delete requests. Defaults to
# -boltdb.shipper.compactor.shared-store.
# CLI flag: -boltdb.shipper.compactor.delete-request-store
[delete_request_store: <string> | default = ""]

# The max number of delete requests to run per compaction cycle.
# CLI flag: -boltdb.shipper.compactor.delete-batch-size
[delete_batch_size: <int> | default = 70]
Expand Down
29 changes: 25 additions & 4 deletions docs/sources/upgrading/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,38 @@ The output is incredibly verbose as it shows the entire internal config struct u

### Loki

#### Index shipper multi-store support
In previous releases, if you did not explicitly configure `-boltdb.shipper.shared-store`, `-tsdb.shipper.shared-store`, those values default to the `object_store` configured in the latest `period_config` of the corresponding index type.
These defaults are removed in favor of uploading indexes to multiple stores. If you do not explicitly configure a `shared-store`, the boltdb and tsdb indexes will be shipped to the `object_store` configured for that period.

#### Shutdown marker file

A shutdown marker file can be written by the `/ingester/prepare_shutdown` endpoint.
If the new `ingester.shutdown_marker_path` config setting has a value that value is used.
If not the`common.path_prefix` config setting is used if it has a value. Otherwise a warning is shown
in the logs on startup and the `/ingester/prepare_shutdown` endpoint will return a 500 status code.

#### Compactor multi-store support

In previous releases, setting `-boltdb.shipper.compactor.shared-store` configured the following:
- store used for managing delete requests.
- store on which index compaction should be performed.

If `-boltdb.shipper.compactor.shared-store` was not set, it used to default to the `object_store` configured in the latest `period_config` that uses either the tsdb or boltdb-shipper index.

Compactor now supports index compaction on multiple buckets/object stores.
And going forward loki will not set any defaults on `-boltdb.shipper.compactor.shared-store`, this has a couple of side effects detailed as follows:

##### store on which index compaction should be performed:
If `-boltdb.shipper.compactor.shared-store` is configured by the user, loki would run index compaction only on the store specified by the config.
If not set, compaction would be performed on all the object stores that contain either a boltdb-shipper or tsdb index.

##### store used for managing delete requests:
A new config option `-boltdb.shipper.compactor.delete-request-store` decides where delete requests should be stored. This new option takes precedence over `-boltdb.shipper.compactor.shared-store`.

In the case where neither of these options are set, the `object_store` configured in the latest `period_config` that uses either a tsdb or boltdb-shipper index is used for storing delete requests to ensure pending requests are processed.


## 2.8.0

### Loki
Expand Down Expand Up @@ -150,10 +175,6 @@ level=info ts=2022-12-20T15:27:54.858554127Z caller=metrics.go:147 component=fro

These statistics are also displayed when using `--stats` with LogCLI.

#### Index shipper multi-store support
In releases prior to 2.8.1, if you did not explicitly configure `-boltdb.shipper.shared-store`, `-tsdb.shipper.shared-store`, those values default to the `object_store` configured in the latest `period_config` of the corresponding index type.
In releases 2.8.1 and later, these defaults are removed in favor of uploading indexes to multiple stores. If you do not explicitly configure a `shared-store`, the boltdb and tsdb indexes will be shipped to the `object_store` configured for that period.

## 2.7.0

### Loki
Expand Down
1 change: 0 additions & 1 deletion integration/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ storage_config:

compactor:
working_directory: {{.dataPath}}/retention
shared_store: filesystem
retention_enabled: true

analytics:
Expand Down
36 changes: 23 additions & 13 deletions integration/loki_micro_services_delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

func TestMicroServicesDeleteRequest(t *testing.T) {
storage.ResetBoltDBIndexClientsWithShipper()
clu := cluster.New(nil)
clu := cluster.New(nil, cluster.WithAdditionalBoltDBPeriod)
defer func() {
assert.NoError(t, clu.Cleanup())
storage.ResetBoltDBIndexClientsWithShipper()
Expand Down Expand Up @@ -97,11 +97,11 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
},
Values: [][]string{
{
strconv.FormatInt(now.Add(-45*time.Minute).UnixNano(), 10),
strconv.FormatInt(now.Add(-46*time.Hour).UnixNano(), 10),
"lineA",
},
{
strconv.FormatInt(now.Add(-45*time.Minute).UnixNano(), 10),
strconv.FormatInt(now.Add(-46*time.Hour).UnixNano(), 10),
"lineB",
},
{
Expand All @@ -118,32 +118,32 @@ func TestMicroServicesDeleteRequest(t *testing.T) {

expectedDeleteRequests := []client.DeleteRequest{
{
StartTime: now.Add(-time.Hour).Unix(),
StartTime: now.Add(-48 * time.Hour).Unix(),
EndTime: now.Unix(),
Query: `{deletion_type="filter"} |= "lineB"`,
Status: "received",
},
{
StartTime: now.Add(-time.Hour).Unix(),
StartTime: now.Add(-48 * time.Hour).Unix(),
EndTime: now.Unix(),
Query: `{deletion_type="filter_no_match"} |= "foo"`,
Status: "received",
},
{
StartTime: now.Add(-time.Hour).Unix(),
StartTime: now.Add(-48 * time.Hour).Unix(),
EndTime: now.Add(-10 * time.Minute).Unix(),
Query: `{deletion_type="partially_by_time"}`,
Status: "received",
},
{
StartTime: now.Add(-time.Hour).Unix(),
StartTime: now.Add(-48 * time.Hour).Unix(),
EndTime: now.Unix(),
Query: `{deletion_type="whole"}`,
Status: "received",
},
}

validateQueryResponse := func(resp *client.Response) {
validateQueryResponse := func(expectedStreams []client.StreamValues, resp *client.Response) {
t.Helper()
assert.Equal(t, "streams", resp.Data.ResultType)

Expand Down Expand Up @@ -173,7 +173,14 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
t.Run("query", func(t *testing.T) {
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
validateQueryResponse(resp)

// given default value of query_ingesters_within is 3h, older samples won't be present in the response
var es []client.StreamValues
for _, stream := range expectedStreams {
stream.Values = stream.Values[2:]
es = append(es, stream)
}
validateQueryResponse(es, resp)
})

t.Run("flush-logs-and-restart-ingester-querier", func(t *testing.T) {
Expand All @@ -195,7 +202,7 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
t.Run("query again to verify logs being served from storage", func(t *testing.T) {
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
validateQueryResponse(resp)
validateQueryResponse(expectedStreams, resp)
})

t.Run("add-delete-requests", func(t *testing.T) {
Expand Down Expand Up @@ -230,7 +237,7 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)

validateQueryResponse(resp)
validateQueryResponse(expectedStreams, resp)
})

// Wait until delete request is finished
Expand Down Expand Up @@ -264,7 +271,10 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
metrics, err := cliCompactor.Metrics()
require.NoError(t, err)
checkUserLabelAndMetricValue(t, "loki_compactor_delete_requests_processed_total", metrics, tenantID, float64(len(expectedDeleteRequests)))
checkUserLabelAndMetricValue(t, "loki_compactor_deleted_lines", metrics, tenantID, 1)

// ideally this metric should be equal to 1 given that a single line matches the line filter
// but the same chunk is indexed in 3 tables
checkUserLabelAndMetricValue(t, "loki_compactor_deleted_lines", metrics, tenantID, 3)
})

// Query lines
Expand All @@ -284,7 +294,7 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)

validateQueryResponse(resp)
validateQueryResponse(expectedStreams, resp)
})
}

Expand Down
14 changes: 4 additions & 10 deletions pkg/loki/config_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,6 @@ func applyStorageConfig(cfg, defaults *ConfigWrapper) error {
r.Ruler.StoreConfig.Azure = r.Common.Storage.Azure
r.StorageConfig.AzureStorageConfig = r.Common.Storage.Azure
r.StorageConfig.Hedging = r.Common.Storage.Hedging
r.CompactorConfig.SharedStoreType = config.StorageTypeAzure
}
}

Expand All @@ -460,7 +459,6 @@ func applyStorageConfig(cfg, defaults *ConfigWrapper) error {
r.Ruler.StoreConfig.Type = "local"
r.Ruler.StoreConfig.Local = local.Config{Directory: r.Common.Storage.FSConfig.RulesDirectory}
r.StorageConfig.FSConfig.Directory = r.Common.Storage.FSConfig.ChunksDirectory
r.CompactorConfig.SharedStoreType = config.StorageTypeFileSystem
}
}

Expand All @@ -471,7 +469,6 @@ func applyStorageConfig(cfg, defaults *ConfigWrapper) error {
r.Ruler.StoreConfig.Type = "gcs"
r.Ruler.StoreConfig.GCS = r.Common.Storage.GCS
r.StorageConfig.GCSConfig = r.Common.Storage.GCS
r.CompactorConfig.SharedStoreType = config.StorageTypeGCS
r.StorageConfig.Hedging = r.Common.Storage.Hedging
}
}
Expand All @@ -483,7 +480,6 @@ func applyStorageConfig(cfg, defaults *ConfigWrapper) error {
r.Ruler.StoreConfig.Type = "s3"
r.Ruler.StoreConfig.S3 = r.Common.Storage.S3
r.StorageConfig.AWSStorageConfig.S3Config = r.Common.Storage.S3
r.CompactorConfig.SharedStoreType = config.StorageTypeS3
r.StorageConfig.Hedging = r.Common.Storage.Hedging
}
}
Expand All @@ -494,7 +490,6 @@ func applyStorageConfig(cfg, defaults *ConfigWrapper) error {
r.Ruler.StoreConfig.Type = "bos"
r.Ruler.StoreConfig.BOS = r.Common.Storage.BOS
r.StorageConfig.BOSStorageConfig = r.Common.Storage.BOS
r.CompactorConfig.SharedStoreType = config.StorageTypeBOS
}
}

Expand All @@ -505,7 +500,6 @@ func applyStorageConfig(cfg, defaults *ConfigWrapper) error {
r.Ruler.StoreConfig.Type = "swift"
r.Ruler.StoreConfig.Swift = r.Common.Storage.Swift
r.StorageConfig.Swift = r.Common.Storage.Swift
r.CompactorConfig.SharedStoreType = config.StorageTypeSwift
r.StorageConfig.Hedging = r.Common.Storage.Hedging
}
}
Expand All @@ -522,8 +516,8 @@ func applyStorageConfig(cfg, defaults *ConfigWrapper) error {
}

func betterBoltdbShipperDefaults(cfg, defaults *ConfigWrapper, period config.PeriodConfig) {
if cfg.CompactorConfig.SharedStoreType == defaults.CompactorConfig.SharedStoreType {
cfg.CompactorConfig.SharedStoreType = period.ObjectType
if cfg.CompactorConfig.DefaultDeleteRequestStore == defaults.CompactorConfig.DefaultDeleteRequestStore {
cfg.CompactorConfig.DefaultDeleteRequestStore = period.ObjectType
}

if cfg.Common.PathPrefix != "" {
Expand All @@ -540,8 +534,8 @@ func betterBoltdbShipperDefaults(cfg, defaults *ConfigWrapper, period config.Per
}

func betterTSDBShipperDefaults(cfg, defaults *ConfigWrapper, period config.PeriodConfig) {
if cfg.CompactorConfig.SharedStoreType == defaults.CompactorConfig.SharedStoreType {
cfg.CompactorConfig.SharedStoreType = period.ObjectType
if cfg.CompactorConfig.DefaultDeleteRequestStore == defaults.CompactorConfig.DefaultDeleteRequestStore {
cfg.CompactorConfig.DefaultDeleteRequestStore = period.ObjectType
}

if cfg.Common.PathPrefix != "" {
Expand Down
68 changes: 0 additions & 68 deletions pkg/loki/config_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -720,58 +720,6 @@ storage_config:
assert.EqualValues(t, 5*time.Minute, config.StorageConfig.GCSConfig.RequestTimeout)
})

t.Run("when common object store config is provided, compactor shared store is defaulted to use it", func(t *testing.T) {
for _, tt := range []struct {
configString string
expected string
}{
{
configString: `common:
storage:
s3:
s3: s3://foo-bucket/example
access_key_id: abc123
secret_access_key: def789`,
expected: config.StorageTypeS3,
},
{
configString: `common:
storage:
gcs:
bucket_name: foobar`,
expected: config.StorageTypeGCS,
},
{
configString: `common:
storage:
azure:
account_name: 3rd_planet
account_key: water`,
expected: config.StorageTypeAzure,
},
{
configString: `common:
storage:
swift:
username: steve
password: supersecret`,
expected: config.StorageTypeSwift,
},
{
configString: `common:
storage:
filesystem:
chunks_directory: /tmp/chunks
rules_directory: /tmp/rules`,
expected: config.StorageTypeFileSystem,
},
} {
config, _ := testContext(tt.configString, nil)

assert.Equal(t, tt.expected, config.CompactorConfig.SharedStoreType)
}
})

t.Run("explicit compactor shared_store config is preserved", func(t *testing.T) {
configString := `common:
storage:
Expand All @@ -788,22 +736,6 @@ compactor:
})

t.Run("when using boltdb storage type", func(t *testing.T) {
t.Run("default compactor.shared_store to the value of current_schema.object_store", func(t *testing.T) {
const boltdbSchemaConfig = `---
schema_config:
configs:
- from: 2021-08-01
store: boltdb-shipper
object_store: gcs
schema: v11
index:
prefix: index_
period: 24h`
cfg, _ := testContext(boltdbSchemaConfig, nil)

assert.Equal(t, config.StorageTypeGCS, cfg.CompactorConfig.SharedStoreType)
})

t.Run("shared store types provided via config file take precedence", func(t *testing.T) {
const boltdbSchemaConfig = `---
schema_config:
Expand Down