Skip to content

Commit

Permalink
Merge pull request grafana#1133 from rsteneteg/rsteneteg/table-retention
Browse files Browse the repository at this point in the history
Added retention feature to table manager to automatically remove olde…
  • Loading branch information
bboreham committed Jan 10, 2019
2 parents 79f6122 + 23b90f7 commit 3e1384a
Show file tree
Hide file tree
Showing 8 changed files with 250 additions and 9 deletions.
18 changes: 18 additions & 0 deletions aws/dynamodb_table_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,24 @@ func (d dynamoTableClient) CreateTable(ctx context.Context, desc chunk.TableDesc
return nil
}

func (d dynamoTableClient) DeleteTable(ctx context.Context, name string) error {
if err := d.backoffAndRetry(ctx, func(ctx context.Context) error {
return instrument.TimeRequestHistogram(ctx, "DynamoDB.DeleteTable", dynamoRequestDuration, func(ctx context.Context) error {
input := &dynamodb.DeleteTableInput{TableName: aws.String(name)}
_, err := d.DynamoDB.DeleteTableWithContext(ctx, input)
if err != nil {
return err
}

return nil
})
}); err != nil {
return err
}

return nil
}

func (d dynamoTableClient) DescribeTable(ctx context.Context, name string) (desc chunk.TableDesc, isActive bool, err error) {
var tableARN *string
err = d.backoffAndRetry(ctx, func(ctx context.Context) error {
Expand Down
6 changes: 6 additions & 0 deletions cassandra/table_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ func (c *tableClient) CreateTable(ctx context.Context, desc chunk.TableDesc) err
return errors.WithStack(err)
}

func (c *tableClient) DeleteTable(ctx context.Context, name string) error {
err := c.session.Query(fmt.Sprintf(`
DROP TABLE IF EXISTS %s;`, name)).WithContext(ctx).Exec()
return errors.WithStack(err)
}

func (c *tableClient) DescribeTable(ctx context.Context, name string) (desc chunk.TableDesc, isActive bool, err error) {
return chunk.TableDesc{
Name: name,
Expand Down
8 changes: 8 additions & 0 deletions gcp/table_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ func alreadyExistsError(err error) bool {
return ok && strings.Contains(serr.Message(), "already exists")
}

func (c *tableClient) DeleteTable(ctx context.Context, name string) error {
if err := c.client.DeleteTable(ctx, name); err != nil {
return err
}

return nil
}

func (c *tableClient) DescribeTable(ctx context.Context, name string) (desc chunk.TableDesc, isActive bool, err error) {
return chunk.TableDesc{
Name: name,
Expand Down
14 changes: 14 additions & 0 deletions inmemory_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,20 @@ func (m *MockStorage) CreateTable(_ context.Context, desc TableDesc) error {
return nil
}

// DeleteTable implements StorageClient.
func (m *MockStorage) DeleteTable(_ context.Context, name string) error {
m.mtx.Lock()
defer m.mtx.Unlock()

if _, ok := m.tables[name]; !ok {
return fmt.Errorf("table does not exist")
}

delete(m.tables, name)

return nil
}

// DescribeTable implements StorageClient.
func (m *MockStorage) DescribeTable(_ context.Context, name string) (desc TableDesc, isActive bool, err error) {
m.mtx.RLock()
Expand Down
7 changes: 6 additions & 1 deletion schema_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,20 +335,25 @@ func (cfg *AutoScalingConfig) RegisterFlags(argPrefix string, f *flag.FlagSet) {
f.Float64Var(&cfg.TargetValue, argPrefix+".target-value", 80, "DynamoDB target ratio of consumed capacity to provisioned capacity.")
}

func (cfg *PeriodicTableConfig) periodicTables(from, through model.Time, pCfg ProvisionConfig, beginGrace, endGrace time.Duration) []TableDesc {
func (cfg *PeriodicTableConfig) periodicTables(from, through model.Time, pCfg ProvisionConfig, beginGrace, endGrace time.Duration, retention time.Duration) []TableDesc {
var (
periodSecs = int64(cfg.Period / time.Second)
beginGraceSecs = int64(beginGrace / time.Second)
endGraceSecs = int64(endGrace / time.Second)
firstTable = from.Unix() / periodSecs
lastTable = through.Unix() / periodSecs
tablesToKeep = int64(int64(retention/time.Second) / periodSecs)
now = mtime.Now().Unix()
result = []TableDesc{}
)
// If through ends on 00:00 of the day, don't include the upcoming day
if through.Unix()%secondsInDay == 0 {
lastTable--
}
// Don't make tables further back than the configured retention
if retention > 0 && lastTable > tablesToKeep && lastTable-firstTable >= tablesToKeep {
firstTable = lastTable - tablesToKeep
}
for i := firstTable; i <= lastTable; i++ {
table := TableDesc{
// Name construction needs to be consistent with chunk_store.bigBuckets
Expand Down
1 change: 1 addition & 0 deletions table_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import "context"
type TableClient interface {
ListTables(ctx context.Context) ([]string, error)
CreateTable(ctx context.Context, desc TableDesc) error
DeleteTable(ctx context.Context, name string) error
DescribeTable(ctx context.Context, name string) (desc TableDesc, isActive bool, err error)
UpdateTable(ctx context.Context, current, expected TableDesc) error
}
Expand Down
58 changes: 50 additions & 8 deletions table_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ type TableManagerConfig struct {
// Master 'off-switch' for table capacity updates, e.g. when troubleshooting
ThroughputUpdatesDisabled bool

// Master 'on-switch' for table retention deletions
RetentionDeletesEnabled bool

// How far back tables will be kept before they are deleted
RetentionPeriod time.Duration

// Period with which the table manager will poll for tables.
DynamoDBPollInterval time.Duration

Expand All @@ -72,6 +78,8 @@ type ProvisionConfig struct {
// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *TableManagerConfig) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.ThroughputUpdatesDisabled, "table-manager.throughput-updates-disabled", false, "If true, disable all changes to DB capacity")
f.BoolVar(&cfg.RetentionDeletesEnabled, "table-manager.retention-deletes-enabled", false, "If true, enables retention deletes of DB tables")
f.DurationVar(&cfg.RetentionPeriod, "table-manager.retention-period", 0, "Tables older than this retention period are deleted. Note: This setting is destructive to data!(default: 0, which disables deletion)")
f.DurationVar(&cfg.DynamoDBPollInterval, "dynamodb.poll-interval", 2*time.Minute, "How frequently to poll DynamoDB to learn our capacity.")
f.DurationVar(&cfg.CreationGracePeriod, "dynamodb.periodic-table.grace-period", 10*time.Minute, "DynamoDB periodic tables grace period (duration which table will be created/deleted before/after it's needed).")

Expand Down Expand Up @@ -198,11 +206,15 @@ func (m *TableManager) SyncTables(ctx context.Context) error {
expected := m.calculateExpectedTables()
level.Info(util.Logger).Log("msg", "synching tables", "num_expected_tables", len(expected), "expected_tables", len(expected))

toCreate, toCheckThroughput, err := m.partitionTables(ctx, expected)
toCreate, toCheckThroughput, toDelete, err := m.partitionTables(ctx, expected)
if err != nil {
return err
}

if err := m.deleteTables(ctx, toDelete); err != nil {
return err
}

if err := m.createTables(ctx, toCreate); err != nil {
return err
}
Expand Down Expand Up @@ -259,11 +271,11 @@ func (m *TableManager) calculateExpectedTables() []TableDesc {
}
endModelTime := model.TimeFromUnix(endTime.Unix())
result = append(result, config.IndexTables.periodicTables(
config.From, endModelTime, m.cfg.IndexTables, m.cfg.CreationGracePeriod, m.maxChunkAge,
config.From, endModelTime, m.cfg.IndexTables, m.cfg.CreationGracePeriod, m.maxChunkAge, m.cfg.RetentionPeriod,
)...)
if config.ChunkTables.Prefix != "" {
result = append(result, config.ChunkTables.periodicTables(
config.From, endModelTime, m.cfg.ChunkTables, m.cfg.CreationGracePeriod, m.maxChunkAge,
config.From, endModelTime, m.cfg.ChunkTables, m.cfg.CreationGracePeriod, m.maxChunkAge, m.cfg.RetentionPeriod,
)...)
}
}
Expand All @@ -274,22 +286,36 @@ func (m *TableManager) calculateExpectedTables() []TableDesc {
}

// partitionTables works out tables that need to be created vs tables that need to be updated
func (m *TableManager) partitionTables(ctx context.Context, descriptions []TableDesc) ([]TableDesc, []TableDesc, error) {
func (m *TableManager) partitionTables(ctx context.Context, descriptions []TableDesc) ([]TableDesc, []TableDesc, []TableDesc, error) {
existingTables, err := m.client.ListTables(ctx)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
sort.Strings(existingTables)

toCreate, toCheck := []TableDesc{}, []TableDesc{}
tablePrefixes := map[string]struct{}{}
for _, cfg := range m.schemaCfg.Configs {
tablePrefixes[cfg.IndexTables.Prefix] = struct{}{}
tablePrefixes[cfg.ChunkTables.Prefix] = struct{}{}
}

toCreate, toCheck, toDelete := []TableDesc{}, []TableDesc{}, []TableDesc{}
i, j := 0, 0
for i < len(descriptions) && j < len(existingTables) {
if descriptions[i].Name < existingTables[j] {
// Table descriptions[i] doesn't exist
toCreate = append(toCreate, descriptions[i])
i++
} else if descriptions[i].Name > existingTables[j] {
// existingTables[j].name isn't in descriptions, can ignore
// existingTables[j].name isn't in descriptions, and can be removed
if m.cfg.RetentionPeriod > 0 {
for tblPrefix := range tablePrefixes {
if strings.HasPrefix(existingTables[j], tblPrefix) {
toDelete = append(toDelete, TableDesc{Name: existingTables[j]})
break
}
}
}
j++
} else {
// Table exists, need to check it has correct throughput
Expand All @@ -302,7 +328,7 @@ func (m *TableManager) partitionTables(ctx context.Context, descriptions []Table
toCreate = append(toCreate, descriptions[i])
}

return toCreate, toCheck, nil
return toCreate, toCheck, toDelete, nil
}

func (m *TableManager) createTables(ctx context.Context, descriptions []TableDesc) error {
Expand All @@ -316,6 +342,22 @@ func (m *TableManager) createTables(ctx context.Context, descriptions []TableDes
return nil
}

func (m *TableManager) deleteTables(ctx context.Context, descriptions []TableDesc) error {
for _, desc := range descriptions {
level.Info(util.Logger).Log("msg", "table has exceeded the retention period", "table", desc.Name)
if !m.cfg.RetentionDeletesEnabled {
continue
}

level.Info(util.Logger).Log("msg", "deleting table", "table", desc.Name)
err := m.client.DeleteTable(ctx, desc.Name)
if err != nil {
return err
}
}
return nil
}

func (m *TableManager) updateTables(ctx context.Context, descriptions []TableDesc) error {
for _, expected := range descriptions {
level.Debug(util.Logger).Log("msg", "checking provisioned throughput on table", "table", expected.Name)
Expand Down
147 changes: 147 additions & 0 deletions table_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const (
table2Prefix = "cortex2_"
chunkTablePrefix = "chunks_"
chunkTable2Prefix = "chunks2_"
tableRetention = 2 * 7 * 24 * time.Hour
tablePeriod = 7 * 24 * time.Hour
gracePeriod = 15 * time.Minute
maxChunkAge = 12 * time.Hour
Expand Down Expand Up @@ -69,6 +70,14 @@ func (m *mockTableClient) CreateTable(_ context.Context, desc TableDesc) error {
return nil
}

func (m *mockTableClient) DeleteTable(_ context.Context, name string) error {
m.Lock()
defer m.Unlock()

delete(m.tables, name)
return nil
}

func (m *mockTableClient) DescribeTable(_ context.Context, name string) (desc TableDesc, isActive bool, err error) {
m.Lock()
defer m.Unlock()
Expand Down Expand Up @@ -440,3 +449,141 @@ func TestTableManagerTags(t *testing.T) {
)
}
}

func TestTableManagerRetentionOnly(t *testing.T) {
client := newMockTableClient()

cfg := SchemaConfig{
Configs: []PeriodConfig{
{
From: model.TimeFromUnix(baseTableStart.Unix()),
IndexTables: PeriodicTableConfig{
Prefix: tablePrefix,
Period: tablePeriod,
},

ChunkTables: PeriodicTableConfig{
Prefix: chunkTablePrefix,
Period: tablePeriod,
},
},
},
}
tbmConfig := TableManagerConfig{
RetentionPeriod: tableRetention,
RetentionDeletesEnabled: true,
CreationGracePeriod: gracePeriod,
IndexTables: ProvisionConfig{
ProvisionedWriteThroughput: write,
ProvisionedReadThroughput: read,
InactiveWriteThroughput: inactiveWrite,
InactiveReadThroughput: inactiveRead,
InactiveWriteScale: inactiveScalingConfig,
InactiveWriteScaleLastN: autoScaleLastN,
},
ChunkTables: ProvisionConfig{
ProvisionedWriteThroughput: write,
ProvisionedReadThroughput: read,
InactiveWriteThroughput: inactiveWrite,
InactiveReadThroughput: inactiveRead,
},
}
tableManager, err := NewTableManager(tbmConfig, cfg, maxChunkAge, client)
if err != nil {
t.Fatal(err)
}

// Check at time zero, we have one weekly table
tmTest(t, client, tableManager,
"Initial test",
baseTableStart,
[]TableDesc{
{Name: tablePrefix + "0", ProvisionedRead: read, ProvisionedWrite: write},
{Name: chunkTablePrefix + "0", ProvisionedRead: read, ProvisionedWrite: write},
},
)

// Check after one week, we have two weekly tables
tmTest(t, client, tableManager,
"Move forward by one table period",
baseTableStart.Add(tablePeriod),
[]TableDesc{
{Name: tablePrefix + "0", ProvisionedRead: read, ProvisionedWrite: write},
{Name: tablePrefix + "1", ProvisionedRead: read, ProvisionedWrite: write},
{Name: chunkTablePrefix + "0", ProvisionedRead: read, ProvisionedWrite: write},
{Name: chunkTablePrefix + "1", ProvisionedRead: read, ProvisionedWrite: write},
},
)

// Check after two weeks, we have three tables (two previous periods and the new one)
tmTest(t, client, tableManager,
"Move forward by two table periods",
baseTableStart.Add(tablePeriod*2),
[]TableDesc{
{Name: tablePrefix + "0", ProvisionedRead: inactiveRead, ProvisionedWrite: inactiveWrite, WriteScale: inactiveScalingConfig},
{Name: tablePrefix + "1", ProvisionedRead: read, ProvisionedWrite: write},
{Name: tablePrefix + "2", ProvisionedRead: read, ProvisionedWrite: write},
{Name: chunkTablePrefix + "0", ProvisionedRead: inactiveRead, ProvisionedWrite: inactiveWrite},
{Name: chunkTablePrefix + "1", ProvisionedRead: read, ProvisionedWrite: write},
{Name: chunkTablePrefix + "2", ProvisionedRead: read, ProvisionedWrite: write},
},
)

// Check after three weeks, we have three tables (two previous periods and the new one), table 0 was deleted
tmTest(t, client, tableManager,
"Move forward by three table periods",
baseTableStart.Add(tablePeriod*3),
[]TableDesc{
{Name: tablePrefix + "1", ProvisionedRead: inactiveRead, ProvisionedWrite: inactiveWrite, WriteScale: inactiveScalingConfig},
{Name: tablePrefix + "2", ProvisionedRead: read, ProvisionedWrite: write},
{Name: tablePrefix + "3", ProvisionedRead: read, ProvisionedWrite: write},
{Name: chunkTablePrefix + "1", ProvisionedRead: inactiveRead, ProvisionedWrite: inactiveWrite},
{Name: chunkTablePrefix + "2", ProvisionedRead: read, ProvisionedWrite: write},
{Name: chunkTablePrefix + "3", ProvisionedRead: read, ProvisionedWrite: write},
},
)

// Verify that without RetentionDeletesEnabled no tables are removed
tableManager.cfg.RetentionDeletesEnabled = false
// Retention > 0 will prevent older tables from being created so we need to create the old tables manually for the test
client.CreateTable(nil, TableDesc{Name: tablePrefix + "0", ProvisionedRead: inactiveRead, ProvisionedWrite: inactiveWrite, WriteScale: inactiveScalingConfig})
client.CreateTable(nil, TableDesc{Name: chunkTablePrefix + "0", ProvisionedRead: inactiveRead, ProvisionedWrite: inactiveWrite})
tmTest(t, client, tableManager,
"Move forward by three table periods (no deletes)",
baseTableStart.Add(tablePeriod*3),
[]TableDesc{
{Name: tablePrefix + "0", ProvisionedRead: inactiveRead, ProvisionedWrite: inactiveWrite, WriteScale: inactiveScalingConfig},
{Name: tablePrefix + "1", ProvisionedRead: inactiveRead, ProvisionedWrite: inactiveWrite, WriteScale: inactiveScalingConfig},
{Name: tablePrefix + "2", ProvisionedRead: read, ProvisionedWrite: write},
{Name: tablePrefix + "3", ProvisionedRead: read, ProvisionedWrite: write},
{Name: chunkTablePrefix + "0", ProvisionedRead: inactiveRead, ProvisionedWrite: inactiveWrite},
{Name: chunkTablePrefix + "1", ProvisionedRead: inactiveRead, ProvisionedWrite: inactiveWrite},
{Name: chunkTablePrefix + "2", ProvisionedRead: read, ProvisionedWrite: write},
{Name: chunkTablePrefix + "3", ProvisionedRead: read, ProvisionedWrite: write},
},
)

// Re-enable table deletions
tableManager.cfg.RetentionDeletesEnabled = true

// Verify that with a retention period of zero no tables outside the configs 'From' range are removed
tableManager.cfg.RetentionPeriod = 0
tableManager.schemaCfg.Configs[0].From = model.TimeFromUnix(baseTableStart.Add(tablePeriod).Unix())
// Retention > 0 will prevent older tables from being created so we need to create the old tables manually for the test
client.CreateTable(nil, TableDesc{Name: tablePrefix + "0", ProvisionedRead: inactiveRead, ProvisionedWrite: inactiveWrite, WriteScale: inactiveScalingConfig})
client.CreateTable(nil, TableDesc{Name: chunkTablePrefix + "0", ProvisionedRead: inactiveRead, ProvisionedWrite: inactiveWrite})
tmTest(t, client, tableManager,
"Move forward by three table periods (no deletes) and move From one table forward",
baseTableStart.Add(tablePeriod*3),
[]TableDesc{
{Name: tablePrefix + "0", ProvisionedRead: inactiveRead, ProvisionedWrite: inactiveWrite, WriteScale: inactiveScalingConfig},
{Name: tablePrefix + "1", ProvisionedRead: inactiveRead, ProvisionedWrite: inactiveWrite, WriteScale: inactiveScalingConfig},
{Name: tablePrefix + "2", ProvisionedRead: read, ProvisionedWrite: write},
{Name: tablePrefix + "3", ProvisionedRead: read, ProvisionedWrite: write},
{Name: chunkTablePrefix + "0", ProvisionedRead: inactiveRead, ProvisionedWrite: inactiveWrite},
{Name: chunkTablePrefix + "1", ProvisionedRead: inactiveRead, ProvisionedWrite: inactiveWrite},
{Name: chunkTablePrefix + "2", ProvisionedRead: read, ProvisionedWrite: write},
{Name: chunkTablePrefix + "3", ProvisionedRead: read, ProvisionedWrite: write},
},
)
}

0 comments on commit 3e1384a

Please sign in to comment.