Skip to content

Commit

Permalink
[ingester] support mod flow_tag ttl
Browse files Browse the repository at this point in the history
  • Loading branch information
lzf575 committed Jun 20, 2024
1 parent 664e1bf commit d0c026c
Showing 1 changed file with 30 additions and 19 deletions.
49 changes: 30 additions & 19 deletions server/ingester/datasource/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,17 @@ const (
NETWORK = "network"
APPLICATION = "application"
TRAFFIC_POLICY = "traffic_policy"
FLOW_TAG_DB = "flow_tag"

ERR_IS_MODIFYING = "Modifying the retention time (%s), please try again later"
)

type DatasourceModifiedOnly string
type DatasourceInfo struct {
ID int
DB string
Tables []string
ID int
DB string
Tables []string
FlowTagTables []string
}

const (
Expand All @@ -59,21 +61,23 @@ const (
DEEPFLOW_ADMIN = "deepflow_admin"
)

// to modify the datasource TTL, you need to also modify the 'flow_tag' database tables.
// FIXME: only the 'prometheus' database is supported now, and the remaining databases will be completed in the future.
var DatasourceModifiedOnlyIDMap = map[DatasourceModifiedOnly]DatasourceInfo{
DEEPFLOW_SYSTEM: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 1, "deepflow_system", []string{"deepflow_system"}},
L4_FLOW_LOG: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 2, "flow_log", []string{"l4_flow_log"}},
L7_FLOW_LOG: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 3, "flow_log", []string{"l7_flow_log"}},
L4_PACKET: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 4, "flow_log", []string{"l4_packet"}},
L7_PACKET: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 5, "flow_log", []string{"l7_packet"}},
EXT_METRICS: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 6, "ext_metrics", []string{"metrics"}},
PROMETHEUS: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 7, "prometheus", []string{"samples"}},
EVENT_EVENT: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 8, "event", []string{"event"}},
EVENT_PERF_EVENT: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 9, "event", []string{"perf_event"}},
EVENT_ALARM_EVENT: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 10, "event", []string{"alarm_event"}},
PROFILE: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 11, "profile", []string{"in_process"}},
APPLOG: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 12, "application_log", []string{"log"}},
DEEPFLOW_TENANT: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 13, "deepflow_tenant", []string{"deepflow_collector"}},
DEEPFLOW_ADMIN: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 14, "deepflow_admin", []string{"deepflow_server"}},
DEEPFLOW_SYSTEM: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 1, "deepflow_system", []string{"deepflow_system"}, []string{}},
L4_FLOW_LOG: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 2, "flow_log", []string{"l4_flow_log"}, []string{}},
L7_FLOW_LOG: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 3, "flow_log", []string{"l7_flow_log"}, []string{}},
L4_PACKET: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 4, "flow_log", []string{"l4_packet"}, []string{}},
L7_PACKET: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 5, "flow_log", []string{"l7_packet"}, []string{}},
EXT_METRICS: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 6, "ext_metrics", []string{"metrics"}, []string{}},
PROMETHEUS: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 7, "prometheus", []string{"samples"}, []string{"prometheus_custom_field", "prometheus_custom_field_value"}},
EVENT_EVENT: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 8, "event", []string{"event"}, []string{}},
EVENT_PERF_EVENT: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 9, "event", []string{"perf_event"}, []string{}},
EVENT_ALARM_EVENT: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 10, "event", []string{"alarm_event"}, []string{}},
PROFILE: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 11, "profile", []string{"in_process"}, []string{}},
APPLOG: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 12, "application_log", []string{"log"}, []string{}},
DEEPFLOW_TENANT: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 13, "deepflow_tenant", []string{"deepflow_collector"}, []string{}},
DEEPFLOW_ADMIN: {int(flow_metrics.METRICS_TABLE_ID_MAX) + 14, "deepflow_admin", []string{"deepflow_server"}, []string{}},
}

func (ds DatasourceModifiedOnly) DatasourceInfo() DatasourceInfo {
Expand Down Expand Up @@ -513,6 +517,8 @@ func (m *DatasourceManager) Handle(orgID int, action ActionEnum, dbGroup, baseTa
datasourceId := datasoureInfo.ID
db := ckdb.OrgDatabasePrefix(uint16(orgID)) + datasoureInfo.DB
tables := datasoureInfo.Tables
flowTagDb := ckdb.OrgDatabasePrefix(uint16(orgID)) + FLOW_TAG_DB
flowTagTables := datasoureInfo.FlowTagTables

cks, err := basecommon.NewCKConnections(m.ckAddrs, m.user, m.password)
if err != nil {
Expand All @@ -522,16 +528,21 @@ func (m *DatasourceManager) Handle(orgID int, action ActionEnum, dbGroup, baseTa
if m.isModifyingFlags[datasourceId] {
return fmt.Errorf(ERR_IS_MODIFYING, dbGroup)
}
go func(tableNames []string, id int) {
go func(tableNames, flowTagTableNames []string, id int) {
m.isModifyingFlags[id] = true
for _, tableName := range tableNames {
if err := m.modTableTTL(cks, db, tableName, duration); err != nil {
log.Info(err)
}
}
for _, tableName := range flowTagTableNames {
if err := m.modTableTTL(cks, flowTagDb, tableName, duration); err != nil {
log.Info(err)
}
}
m.isModifyingFlags[id] = false
cks.Close()
}(tables, datasourceId)
}(tables, flowTagTables, datasourceId)

return nil
}
Expand Down

0 comments on commit d0c026c

Please sign in to comment.