Skip to content

Commit

Permalink
read prom url from database (#1119)
Browse files Browse the repository at this point in the history
* add model alerting_engine

* heartbeat using db

* reader.Client from database

* fix sql
  • Loading branch information
UlricQin committed Aug 17, 2022
1 parent b92e4ab commit b4ddd03
Show file tree
Hide file tree
Showing 15 changed files with 310 additions and 86 deletions.
12 changes: 11 additions & 1 deletion docker/initsql/a-n9e.sql
Expand Up @@ -52,7 +52,7 @@ insert into user_group_member(group_id, user_id) values(1, 1);
CREATE TABLE `configs` (
`id` bigint unsigned not null auto_increment,
`ckey` varchar(191) not null,
`cval` varchar(1024) not null default '',
`cval` varchar(4096) not null default '',
PRIMARY KEY (`id`),
UNIQUE KEY (`ckey`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
Expand Down Expand Up @@ -500,3 +500,13 @@ CREATE TABLE `task_record`
KEY (`create_at`, `group_id`),
KEY (`create_by`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;

CREATE TABLE `alerting_engines`
(
`id` int unsigned NOT NULL AUTO_INCREMENT,
`instance` varchar(128) not null default '' comment 'instance identification, e.g. 10.9.0.9:9090',
`cluster` varchar(128) not null default '' comment 'target reader cluster',
`clock` bigint not null,
PRIMARY KEY (`id`),
UNIQUE KEY (`instance`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
14 changes: 5 additions & 9 deletions etc/server.conf
Expand Up @@ -13,6 +13,9 @@ EngineDelay = 120

DisableUsageReport = false

# config | database
ReaderFrom = "config"

[Log]
# log write dir
Dir = "logs"
Expand Down Expand Up @@ -155,15 +158,8 @@ BasicAuthUser = ""
BasicAuthPass = ""
# timeout settings, unit: ms
Timeout = 30000
DialTimeout = 10000
TLSHandshakeTimeout = 30000
ExpectContinueTimeout = 1000
IdleConnTimeout = 90000
# time duration, unit: ms
KeepAlive = 30000
MaxConnsPerHost = 0
MaxIdleConns = 100
MaxIdleConnsPerHost = 10
DialTimeout = 3000
MaxIdleConnsPerHost = 100

[WriterOpt]
# queue channel count
Expand Down
2 changes: 1 addition & 1 deletion src/models/alert_mute.go
Expand Up @@ -71,7 +71,7 @@ func (m *AlertMute) Verify() error {
}

if m.Etime <= m.Btime {
return fmt.Errorf("Oops... etime(%d) <= btime(%d)", m.Etime, m.Btime)
return fmt.Errorf("oops... etime(%d) <= btime(%d)", m.Etime, m.Btime)
}

if err := m.Parse(); err != nil {
Expand Down
81 changes: 81 additions & 0 deletions src/models/alerting_engine.go
@@ -0,0 +1,81 @@
package models

import "time"

type AlertingEngines struct {
Id int64 `json:"id" gorm:"primaryKey"`
Instance string `json:"instance"`
Cluster string `json:"cluster"` // reader cluster
Clock int64 `json:"clock"`
}

func (e *AlertingEngines) TableName() string {
return "alerting_engines"
}

// UpdateCluster 页面上用户会给各个n9e-server分配要关联的目标集群是什么
func (e *AlertingEngines) UpdateCluster(c string) error {
e.Cluster = c
return DB().Model(e).Select("cluster").Updates(e).Error
}

// AlertingEngineGetCluster 根据实例名获取对应的集群名字
func AlertingEngineGetCluster(instance string) (string, error) {
var objs []AlertingEngines
err := DB().Where("instance=?", instance).Find(&objs).Error
if err != nil {
return "", err
}

if len(objs) == 0 {
return "", nil
}

return objs[0].Cluster, nil
}

// AlertingEngineGets 拉取列表数据,用户要在页面上看到所有 n9e-server 实例列表,然后为其分配 cluster
func AlertingEngineGets(where string, args ...interface{}) ([]*AlertingEngines, error) {
var objs []*AlertingEngines
var err error
session := DB().Order("instance")
if where == "" {
err = session.Find(&objs).Error
} else {
err = session.Where(where, args...).Find(&objs).Error
}
return objs, err
}

func AlertingEngineGetsInstances(where string, args ...interface{}) ([]string, error) {
var arr []string
var err error
session := DB().Model(new(AlertingEngines)).Order("instance")
if where == "" {
err = session.Pluck("instance", &arr).Error
} else {
err = session.Where(where, args...).Pluck("instance", &arr).Error
}
return arr, err
}

func AlertingEngineHeartbeat(instance string) error {
var total int64
err := DB().Model(new(AlertingEngines)).Where("instance=?", instance).Count(&total).Error
if err != nil {
return err
}

if total == 0 {
// insert
err = DB().Create(&AlertingEngines{
Instance: instance,
Clock: time.Now().Unix(),
}).Error
} else {
// update
err = DB().Model(new(AlertingEngines)).Where("instance=?", instance).Update("clock", time.Now().Unix()).Error
}

return err
}
15 changes: 7 additions & 8 deletions src/server/config/config.go
Expand Up @@ -70,6 +70,10 @@ func MustLoad(fpaths ...string) {
C.EngineDelay = 120
}

if C.ReaderFrom == "" {
C.ReaderFrom = "config"
}

if C.Heartbeat.IP == "" {
// auto detect
// C.Heartbeat.IP = fmt.Sprint(GetOutboundIP())
Expand Down Expand Up @@ -187,6 +191,7 @@ type Config struct {
AnomalyDataApi []string
EngineDelay int64
DisableUsageReport bool
ReaderFrom string
Log logx.Config
HTTP httpx.Config
BasicAuth gin.Accounts
Expand All @@ -207,15 +212,9 @@ type ReaderOptions struct {
BasicAuthUser string
BasicAuthPass string

Timeout int64
DialTimeout int64
TLSHandshakeTimeout int64
ExpectContinueTimeout int64
IdleConnTimeout int64
KeepAlive int64
Timeout int64
DialTimeout int64

MaxConnsPerHost int
MaxIdleConns int
MaxIdleConnsPerHost int

Headers []string
Expand Down
11 changes: 11 additions & 0 deletions src/server/engine/worker.go
Expand Up @@ -3,6 +3,7 @@ package engine
import (
"context"
"fmt"
"log"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -166,6 +167,11 @@ func (r *RuleEval) Work() {
return
}

if reader.Client == nil {
logger.Error("reader.Client is nil")
return
}

var value model.Value
var err error
if r.rule.Algorithm == "" {
Expand Down Expand Up @@ -636,6 +642,11 @@ func (r RecordingRuleEval) Work() {
return
}

if reader.Client == nil {
log.Println("reader.Client is nil")
return
}

value, warnings, err := reader.Client.Query(context.Background(), promql, time.Now())
if err != nil {
logger.Errorf("recording_rule_eval:%d promql:%s, error:%v", r.RuleID(), promql, err)
Expand Down
61 changes: 12 additions & 49 deletions src/server/naming/heartbeat.go
Expand Up @@ -4,57 +4,45 @@ import (
"context"
"fmt"
"sort"
"strconv"
"strings"
"time"

"github.com/toolkits/pkg/logger"

"github.com/didi/nightingale/v5/src/models"
"github.com/didi/nightingale/v5/src/server/config"
"github.com/didi/nightingale/v5/src/storage"
)

// local servers
var localss string

func Heartbeat(ctx context.Context) error {
if err := heartbeat(ctx); err != nil {
if err := heartbeat(); err != nil {
fmt.Println("failed to heartbeat:", err)
return err
}

go loopHeartbeat(ctx)
go loopHeartbeat()
return nil
}

func loopHeartbeat(ctx context.Context) {
func loopHeartbeat() {
interval := time.Duration(config.C.Heartbeat.Interval) * time.Millisecond
for {
time.Sleep(interval)
if err := heartbeat(ctx); err != nil {
if err := heartbeat(); err != nil {
logger.Warning(err)
}
}
}

// hash struct:
// /server/heartbeat/Default -> {
// 10.2.3.4:19000 => $timestamp
// 10.2.3.5:19000 => $timestamp
// }
func redisKey(cluster string) string {
return fmt.Sprintf("/server/heartbeat/%s", cluster)
}

func heartbeat(ctx context.Context) error {
now := time.Now().Unix()
key := redisKey(config.C.ClusterName)
err := storage.Redis.HSet(ctx, key, config.C.Heartbeat.Endpoint, now).Err()
func heartbeat() error {
err := models.AlertingEngineHeartbeat(config.C.Heartbeat.Endpoint)
if err != nil {
return err
}

servers, err := ActiveServers(ctx, config.C.ClusterName)
servers, err := ActiveServers()
if err != nil {
return err
}
Expand All @@ -69,37 +57,12 @@ func heartbeat(ctx context.Context) error {
return nil
}

func clearDeadServer(ctx context.Context, cluster, endpoint string) {
key := redisKey(cluster)
err := storage.Redis.HDel(ctx, key, endpoint).Err()
if err != nil {
logger.Warningf("failed to hdel %s %s, error: %v", key, endpoint, err)
}
}

func ActiveServers(ctx context.Context, cluster string) ([]string, error) {
ret, err := storage.Redis.HGetAll(ctx, redisKey(cluster)).Result()
func ActiveServers() ([]string, error) {
cluster, err := models.AlertingEngineGetCluster(config.C.Heartbeat.Endpoint)
if err != nil {
return nil, err
}

now := time.Now().Unix()
dur := int64(20)

actives := make([]string, 0, len(ret))
for endpoint, clockstr := range ret {
clock, err := strconv.ParseInt(clockstr, 10, 64)
if err != nil {
continue
}

if now-clock > dur {
clearDeadServer(ctx, cluster, endpoint)
continue
}

actives = append(actives, endpoint)
}

return actives, nil
// 30秒内有心跳,就认为是活的
return models.AlertingEngineGetsInstances("cluster = ? and clock > ?", cluster, time.Now().Unix()-30)
}
3 changes: 1 addition & 2 deletions src/server/naming/leader.go
@@ -1,15 +1,14 @@
package naming

import (
"context"
"sort"

"github.com/didi/nightingale/v5/src/server/config"
"github.com/toolkits/pkg/logger"
)

func IamLeader() (bool, error) {
servers, err := ActiveServers(context.Background(), config.C.ClusterName)
servers, err := ActiveServers()
if err != nil {
logger.Errorf("failed to get active servers: %v", err)
return false, err
Expand Down

0 comments on commit b4ddd03

Please sign in to comment.