diff --git a/aws/dynamodb_table_client.go b/aws/dynamodb_table_client.go index f9bc715723e5..0eea1d599b11 100644 --- a/aws/dynamodb_table_client.go +++ b/aws/dynamodb_table_client.go @@ -189,6 +189,24 @@ func (d dynamoTableClient) CreateTable(ctx context.Context, desc chunk.TableDesc return nil } +func (d dynamoTableClient) DeleteTable(ctx context.Context, name string) error { + if err := d.backoffAndRetry(ctx, func(ctx context.Context) error { + return instrument.TimeRequestHistogram(ctx, "DynamoDB.DeleteTable", dynamoRequestDuration, func(ctx context.Context) error { + input := &dynamodb.DeleteTableInput{TableName: aws.String(name)} + _, err := d.DynamoDB.DeleteTableWithContext(ctx, input) + if err != nil { + return err + } + + return nil + }) + }); err != nil { + return err + } + + return nil +} + func (d dynamoTableClient) DescribeTable(ctx context.Context, name string) (desc chunk.TableDesc, isActive bool, err error) { var tableARN *string err = d.backoffAndRetry(ctx, func(ctx context.Context) error { diff --git a/cassandra/table_client.go b/cassandra/table_client.go index 1e683fc40ca4..e4335632ae45 100644 --- a/cassandra/table_client.go +++ b/cassandra/table_client.go @@ -50,6 +50,12 @@ func (c *tableClient) CreateTable(ctx context.Context, desc chunk.TableDesc) err return errors.WithStack(err) } +func (c *tableClient) DeleteTable(ctx context.Context, name string) error { + err := c.session.Query(fmt.Sprintf(` + DROP TABLE IF EXISTS %s;`, name)).WithContext(ctx).Exec() + return errors.WithStack(err) +} + func (c *tableClient) DescribeTable(ctx context.Context, name string) (desc chunk.TableDesc, isActive bool, err error) { return chunk.TableDesc{ Name: name, diff --git a/gcp/table_client.go b/gcp/table_client.go index 7e48b6a13634..2a62c1b04216 100644 --- a/gcp/table_client.go +++ b/gcp/table_client.go @@ -74,6 +74,14 @@ func alreadyExistsError(err error) bool { return ok && strings.Contains(serr.Message(), "already exists") } +func (c *tableClient) DeleteTable(ctx context.Context, name string) error { + if err := c.client.DeleteTable(ctx, name); err != nil { + return err + } + + return nil +} + func (c *tableClient) DescribeTable(ctx context.Context, name string) (desc chunk.TableDesc, isActive bool, err error) { return chunk.TableDesc{ Name: name, diff --git a/inmemory_storage_client.go b/inmemory_storage_client.go index 000c6c072fa4..a397b74db8fe 100644 --- a/inmemory_storage_client.go +++ b/inmemory_storage_client.go @@ -74,6 +74,20 @@ func (m *MockStorage) CreateTable(_ context.Context, desc TableDesc) error { return nil } +// DeleteTable implements StorageClient. +func (m *MockStorage) DeleteTable(_ context.Context, name string) error { + m.mtx.Lock() + defer m.mtx.Unlock() + + if _, ok := m.tables[name]; !ok { + return fmt.Errorf("table does not exist") + } + + delete(m.tables, name) + + return nil +} + // DescribeTable implements StorageClient. func (m *MockStorage) DescribeTable(_ context.Context, name string) (desc TableDesc, isActive bool, err error) { m.mtx.RLock() diff --git a/schema_config.go b/schema_config.go index 453ae83c7054..f787ab2b97d0 100644 --- a/schema_config.go +++ b/schema_config.go @@ -335,13 +335,14 @@ func (cfg *AutoScalingConfig) RegisterFlags(argPrefix string, f *flag.FlagSet) { f.Float64Var(&cfg.TargetValue, argPrefix+".target-value", 80, "DynamoDB target ratio of consumed capacity to provisioned capacity.") } -func (cfg *PeriodicTableConfig) periodicTables(from, through model.Time, pCfg ProvisionConfig, beginGrace, endGrace time.Duration) []TableDesc { +func (cfg *PeriodicTableConfig) periodicTables(from, through model.Time, pCfg ProvisionConfig, beginGrace, endGrace time.Duration, retention time.Duration) []TableDesc { var ( periodSecs = int64(cfg.Period / time.Second) beginGraceSecs = int64(beginGrace / time.Second) endGraceSecs = int64(endGrace / time.Second) firstTable = from.Unix() / periodSecs lastTable = through.Unix() / periodSecs + tablesToKeep = int64(int64(retention/time.Second) / periodSecs) now = mtime.Now().Unix() result = []TableDesc{} ) @@ -349,6 +350,10 @@ func (cfg *PeriodicTableConfig) periodicTables(from, through model.Time, pCfg Pr if through.Unix()%secondsInDay == 0 { lastTable-- } + // Don't make tables further back than the configured retention + if retention > 0 && lastTable > tablesToKeep && lastTable-firstTable >= tablesToKeep { + firstTable = lastTable - tablesToKeep + } for i := firstTable; i <= lastTable; i++ { table := TableDesc{ // Name construction needs to be consistent with chunk_store.bigBuckets diff --git a/table_client.go b/table_client.go index 24d175229b14..c7a447992bd3 100644 --- a/table_client.go +++ b/table_client.go @@ -6,6 +6,7 @@ import "context" type TableClient interface { ListTables(ctx context.Context) ([]string, error) CreateTable(ctx context.Context, desc TableDesc) error + DeleteTable(ctx context.Context, name string) error DescribeTable(ctx context.Context, name string) (desc TableDesc, isActive bool, err error) UpdateTable(ctx context.Context, current, expected TableDesc) error } diff --git a/table_manager.go b/table_manager.go index d49b32088477..07b3c0ed8208 100644 --- a/table_manager.go +++ b/table_manager.go @@ -47,6 +47,12 @@ type TableManagerConfig struct { // Master 'off-switch' for table capacity updates, e.g. when troubleshooting ThroughputUpdatesDisabled bool + // Master 'on-switch' for table retention deletions + RetentionDeletesEnabled bool + + // How far back tables will be kept before they are deleted + RetentionPeriod time.Duration + // Period with which the table manager will poll for tables. DynamoDBPollInterval time.Duration @@ -72,6 +78,8 @@ type ProvisionConfig struct { // RegisterFlags adds the flags required to config this to the given FlagSet. func (cfg *TableManagerConfig) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.ThroughputUpdatesDisabled, "table-manager.throughput-updates-disabled", false, "If true, disable all changes to DB capacity") + f.BoolVar(&cfg.RetentionDeletesEnabled, "table-manager.retention-deletes-enabled", false, "If true, enables retention deletes of DB tables") + f.DurationVar(&cfg.RetentionPeriod, "table-manager.retention-period", 0, "Tables older than this retention period are deleted. Note: This setting is destructive to data!(default: 0, which disables deletion)") f.DurationVar(&cfg.DynamoDBPollInterval, "dynamodb.poll-interval", 2*time.Minute, "How frequently to poll DynamoDB to learn our capacity.") f.DurationVar(&cfg.CreationGracePeriod, "dynamodb.periodic-table.grace-period", 10*time.Minute, "DynamoDB periodic tables grace period (duration which table will be created/deleted before/after it's needed).") @@ -198,11 +206,15 @@ func (m *TableManager) SyncTables(ctx context.Context) error { expected := m.calculateExpectedTables() level.Info(util.Logger).Log("msg", "synching tables", "num_expected_tables", len(expected), "expected_tables", len(expected)) - toCreate, toCheckThroughput, err := m.partitionTables(ctx, expected) + toCreate, toCheckThroughput, toDelete, err := m.partitionTables(ctx, expected) if err != nil { return err } + if err := m.deleteTables(ctx, toDelete); err != nil { + return err + } + if err := m.createTables(ctx, toCreate); err != nil { return err } @@ -259,11 +271,11 @@ func (m *TableManager) calculateExpectedTables() []TableDesc { } endModelTime := model.TimeFromUnix(endTime.Unix()) result = append(result, config.IndexTables.periodicTables( - config.From, endModelTime, m.cfg.IndexTables, m.cfg.CreationGracePeriod, m.maxChunkAge, + config.From, endModelTime, m.cfg.IndexTables, m.cfg.CreationGracePeriod, m.maxChunkAge, m.cfg.RetentionPeriod, )...) if config.ChunkTables.Prefix != "" { result = append(result, config.ChunkTables.periodicTables( - config.From, endModelTime, m.cfg.ChunkTables, m.cfg.CreationGracePeriod, m.maxChunkAge, + config.From, endModelTime, m.cfg.ChunkTables, m.cfg.CreationGracePeriod, m.maxChunkAge, m.cfg.RetentionPeriod, )...) } } @@ -274,14 +286,20 @@ func (m *TableManager) calculateExpectedTables() []TableDesc { } // partitionTables works out tables that need to be created vs tables that need to be updated -func (m *TableManager) partitionTables(ctx context.Context, descriptions []TableDesc) ([]TableDesc, []TableDesc, error) { +func (m *TableManager) partitionTables(ctx context.Context, descriptions []TableDesc) ([]TableDesc, []TableDesc, []TableDesc, error) { existingTables, err := m.client.ListTables(ctx) if err != nil { - return nil, nil, err + return nil, nil, nil, err } sort.Strings(existingTables) - toCreate, toCheck := []TableDesc{}, []TableDesc{} + tablePrefixes := map[string]struct{}{} + for _, cfg := range m.schemaCfg.Configs { + tablePrefixes[cfg.IndexTables.Prefix] = struct{}{} + tablePrefixes[cfg.ChunkTables.Prefix] = struct{}{} + } + + toCreate, toCheck, toDelete := []TableDesc{}, []TableDesc{}, []TableDesc{} i, j := 0, 0 for i < len(descriptions) && j < len(existingTables) { if descriptions[i].Name < existingTables[j] { @@ -289,7 +307,15 @@ func (m *TableManager) partitionTables(ctx context.Context, descriptions []Table toCreate = append(toCreate, descriptions[i]) i++ } else if descriptions[i].Name > existingTables[j] { - // existingTables[j].name isn't in descriptions, can ignore + // existingTables[j].name isn't in descriptions, and can be removed + if m.cfg.RetentionPeriod > 0 { + for tblPrefix := range tablePrefixes { + if strings.HasPrefix(existingTables[j], tblPrefix) { + toDelete = append(toDelete, TableDesc{Name: existingTables[j]}) + break + } + } + } j++ } else { // Table exists, need to check it has correct throughput @@ -302,7 +328,7 @@ func (m *TableManager) partitionTables(ctx context.Context, descriptions []Table toCreate = append(toCreate, descriptions[i]) } - return toCreate, toCheck, nil + return toCreate, toCheck, toDelete, nil } func (m *TableManager) createTables(ctx context.Context, descriptions []TableDesc) error { @@ -316,6 +342,22 @@ func (m *TableManager) createTables(ctx context.Context, descriptions []TableDes return nil } +func (m *TableManager) deleteTables(ctx context.Context, descriptions []TableDesc) error { + for _, desc := range descriptions { + level.Info(util.Logger).Log("msg", "table has exceeded the retention period", "table", desc.Name) + if !m.cfg.RetentionDeletesEnabled { + continue + } + + level.Info(util.Logger).Log("msg", "deleting table", "table", desc.Name) + err := m.client.DeleteTable(ctx, desc.Name) + if err != nil { + return err + } + } + return nil +} + func (m *TableManager) updateTables(ctx context.Context, descriptions []TableDesc) error { for _, expected := range descriptions { level.Debug(util.Logger).Log("msg", "checking provisioned throughput on table", "table", expected.Name) diff --git a/table_manager_test.go b/table_manager_test.go index 7e70b6f48274..19fd8334949b 100644 --- a/table_manager_test.go +++ b/table_manager_test.go @@ -18,6 +18,7 @@ const ( table2Prefix = "cortex2_" chunkTablePrefix = "chunks_" chunkTable2Prefix = "chunks2_" + tableRetention = 2 * 7 * 24 * time.Hour tablePeriod = 7 * 24 * time.Hour gracePeriod = 15 * time.Minute maxChunkAge = 12 * time.Hour @@ -69,6 +70,14 @@ func (m *mockTableClient) CreateTable(_ context.Context, desc TableDesc) error { return nil } +func (m *mockTableClient) DeleteTable(_ context.Context, name string) error { + m.Lock() + defer m.Unlock() + + delete(m.tables, name) + return nil +} + func (m *mockTableClient) DescribeTable(_ context.Context, name string) (desc TableDesc, isActive bool, err error) { m.Lock() defer m.Unlock() @@ -440,3 +449,141 @@ func TestTableManagerTags(t *testing.T) { ) } } + +func TestTableManagerRetentionOnly(t *testing.T) { + client := newMockTableClient() + + cfg := SchemaConfig{ + Configs: []PeriodConfig{ + { + From: model.TimeFromUnix(baseTableStart.Unix()), + IndexTables: PeriodicTableConfig{ + Prefix: tablePrefix, + Period: tablePeriod, + }, + + ChunkTables: PeriodicTableConfig{ + Prefix: chunkTablePrefix, + Period: tablePeriod, + }, + }, + }, + } + tbmConfig := TableManagerConfig{ + RetentionPeriod: tableRetention, + RetentionDeletesEnabled: true, + CreationGracePeriod: gracePeriod, + IndexTables: ProvisionConfig{ + ProvisionedWriteThroughput: write, + ProvisionedReadThroughput: read, + InactiveWriteThroughput: inactiveWrite, + InactiveReadThroughput: inactiveRead, + InactiveWriteScale: inactiveScalingConfig, + InactiveWriteScaleLastN: autoScaleLastN, + }, + ChunkTables: ProvisionConfig{ + ProvisionedWriteThroughput: write, + ProvisionedReadThroughput: read, + InactiveWriteThroughput: inactiveWrite, + InactiveReadThroughput: inactiveRead, + }, + } + tableManager, err := NewTableManager(tbmConfig, cfg, maxChunkAge, client) + if err != nil { + t.Fatal(err) + } + + // Check at time zero, we have one weekly table + tmTest(t, client, tableManager, + "Initial test", + baseTableStart, + []TableDesc{ + {Name: tablePrefix + "0", ProvisionedRead: read, ProvisionedWrite: write}, + {Name: chunkTablePrefix + "0", ProvisionedRead: read, ProvisionedWrite: write}, + }, + ) + + // Check after one week, we have two weekly tables + tmTest(t, client, tableManager, + "Move forward by one table period", + baseTableStart.Add(tablePeriod), + []TableDesc{ + {Name: tablePrefix + "0", ProvisionedRead: read, ProvisionedWrite: write}, + {Name: tablePrefix + "1", ProvisionedRead: read, ProvisionedWrite: write}, + {Name: chunkTablePrefix + "0", ProvisionedRead: read, ProvisionedWrite: write}, + {Name: chunkTablePrefix + "1", ProvisionedRead: read, ProvisionedWrite: write}, + }, + ) + + // Check after two weeks, we have three tables (two previous periods and the new one) + tmTest(t, client, tableManager, + "Move forward by two table periods", + baseTableStart.Add(tablePeriod*2), + []TableDesc{ + {Name: tablePrefix + "0", ProvisionedRead: inactiveRead, ProvisionedWrite: inactiveWrite, WriteScale: inactiveScalingConfig}, + {Name: tablePrefix + "1", ProvisionedRead: read, ProvisionedWrite: write}, + {Name: tablePrefix + "2", ProvisionedRead: read, ProvisionedWrite: write}, + {Name: chunkTablePrefix + "0", ProvisionedRead: inactiveRead, ProvisionedWrite: inactiveWrite}, + {Name: chunkTablePrefix + "1", ProvisionedRead: read, ProvisionedWrite: write}, + {Name: chunkTablePrefix + "2", ProvisionedRead: read, ProvisionedWrite: write}, + }, + ) + + // Check after three weeks, we have three tables (two previous periods and the new one), table 0 was deleted + tmTest(t, client, tableManager, + "Move forward by three table periods", + baseTableStart.Add(tablePeriod*3), + []TableDesc{ + {Name: tablePrefix + "1", ProvisionedRead: inactiveRead, ProvisionedWrite: inactiveWrite, WriteScale: inactiveScalingConfig}, + {Name: tablePrefix + "2", ProvisionedRead: read, ProvisionedWrite: write}, + {Name: tablePrefix + "3", ProvisionedRead: read, ProvisionedWrite: write}, + {Name: chunkTablePrefix + "1", ProvisionedRead: inactiveRead, ProvisionedWrite: inactiveWrite}, + {Name: chunkTablePrefix + "2", ProvisionedRead: read, ProvisionedWrite: write}, + {Name: chunkTablePrefix + "3", ProvisionedRead: read, ProvisionedWrite: write}, + }, + ) + + // Verify that without RetentionDeletesEnabled no tables are removed + tableManager.cfg.RetentionDeletesEnabled = false + // Retention > 0 will prevent older tables from being created so we need to create the old tables manually for the test + client.CreateTable(nil, TableDesc{Name: tablePrefix + "0", ProvisionedRead: inactiveRead, ProvisionedWrite: inactiveWrite, WriteScale: inactiveScalingConfig}) + client.CreateTable(nil, TableDesc{Name: chunkTablePrefix + "0", ProvisionedRead: inactiveRead, ProvisionedWrite: inactiveWrite}) + tmTest(t, client, tableManager, + "Move forward by three table periods (no deletes)", + baseTableStart.Add(tablePeriod*3), + []TableDesc{ + {Name: tablePrefix + "0", ProvisionedRead: inactiveRead, ProvisionedWrite: inactiveWrite, WriteScale: inactiveScalingConfig}, + {Name: tablePrefix + "1", ProvisionedRead: inactiveRead, ProvisionedWrite: inactiveWrite, WriteScale: inactiveScalingConfig}, + {Name: tablePrefix + "2", ProvisionedRead: read, ProvisionedWrite: write}, + {Name: tablePrefix + "3", ProvisionedRead: read, ProvisionedWrite: write}, + {Name: chunkTablePrefix + "0", ProvisionedRead: inactiveRead, ProvisionedWrite: inactiveWrite}, + {Name: chunkTablePrefix + "1", ProvisionedRead: inactiveRead, ProvisionedWrite: inactiveWrite}, + {Name: chunkTablePrefix + "2", ProvisionedRead: read, ProvisionedWrite: write}, + {Name: chunkTablePrefix + "3", ProvisionedRead: read, ProvisionedWrite: write}, + }, + ) + + // Re-enable table deletions + tableManager.cfg.RetentionDeletesEnabled = true + + // Verify that with a retention period of zero no tables outside the configs 'From' range are removed + tableManager.cfg.RetentionPeriod = 0 + tableManager.schemaCfg.Configs[0].From = model.TimeFromUnix(baseTableStart.Add(tablePeriod).Unix()) + // Retention > 0 will prevent older tables from being created so we need to create the old tables manually for the test + client.CreateTable(nil, TableDesc{Name: tablePrefix + "0", ProvisionedRead: inactiveRead, ProvisionedWrite: inactiveWrite, WriteScale: inactiveScalingConfig}) + client.CreateTable(nil, TableDesc{Name: chunkTablePrefix + "0", ProvisionedRead: inactiveRead, ProvisionedWrite: inactiveWrite}) + tmTest(t, client, tableManager, + "Move forward by three table periods (no deletes) and move From one table forward", + baseTableStart.Add(tablePeriod*3), + []TableDesc{ + {Name: tablePrefix + "0", ProvisionedRead: inactiveRead, ProvisionedWrite: inactiveWrite, WriteScale: inactiveScalingConfig}, + {Name: tablePrefix + "1", ProvisionedRead: inactiveRead, ProvisionedWrite: inactiveWrite, WriteScale: inactiveScalingConfig}, + {Name: tablePrefix + "2", ProvisionedRead: read, ProvisionedWrite: write}, + {Name: tablePrefix + "3", ProvisionedRead: read, ProvisionedWrite: write}, + {Name: chunkTablePrefix + "0", ProvisionedRead: inactiveRead, ProvisionedWrite: inactiveWrite}, + {Name: chunkTablePrefix + "1", ProvisionedRead: inactiveRead, ProvisionedWrite: inactiveWrite}, + {Name: chunkTablePrefix + "2", ProvisionedRead: read, ProvisionedWrite: write}, + {Name: chunkTablePrefix + "3", ProvisionedRead: read, ProvisionedWrite: write}, + }, + ) +}