Skip to content

Commit

Permalink
lib/promscrape: preserve the previously discovered targets on discove…
Browse files Browse the repository at this point in the history
…ry errors per each `job_name`

Updates #582
  • Loading branch information
valyala committed Jun 23, 2020
1 parent a13cd60 commit 8f0bcec
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 43 deletions.
141 changes: 113 additions & 28 deletions lib/promscrape/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,77 +158,160 @@ func unmarshalMaybeStrict(data []byte, dst interface{}) error {
return err
}

func getSWSByJob(sws []ScrapeWork) map[string][]ScrapeWork {
m := make(map[string][]ScrapeWork)
for _, sw := range sws {
m[sw.jobNameOriginal] = append(m[sw.jobNameOriginal], sw)
}
return m
}

// getKubernetesSDScrapeWork returns `kubernetes_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getKubernetesSDScrapeWork() []ScrapeWork {
func (cfg *Config) getKubernetesSDScrapeWork(prev []ScrapeWork) []ScrapeWork {
swsPrevByJob := getSWSByJob(prev)
var dst []ScrapeWork
for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i]
dstLen := len(dst)
ok := true
for j := range sc.KubernetesSDConfigs {
sdc := &sc.KubernetesSDConfigs[j]
dst = appendKubernetesScrapeWork(dst, sdc, cfg.baseDir, sc.swc)
var okLocal bool
dst, okLocal = appendKubernetesScrapeWork(dst, sdc, cfg.baseDir, sc.swc)
if ok {
ok = okLocal
}
}
if ok {
continue
}
swsPrev := swsPrevByJob[sc.swc.jobName]
if len(swsPrev) > 0 {
logger.Errorf("there were errors when discovering kubernetes targets for job %q, so preserving the previous targets", sc.swc.jobName)
dst = append(dst[:dstLen], swsPrev...)
}
}
return dst
}

// getConsulSDScrapeWork returns `consul_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getConsulSDScrapeWork() []ScrapeWork {
func (cfg *Config) getConsulSDScrapeWork(prev []ScrapeWork) []ScrapeWork {
swsPrevByJob := getSWSByJob(prev)
var dst []ScrapeWork
for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i]
dstLen := len(dst)
ok := true
for j := range sc.ConsulSDConfigs {
sdc := &sc.ConsulSDConfigs[j]
dst = appendConsulScrapeWork(dst, sdc, cfg.baseDir, sc.swc)
var okLocal bool
dst, okLocal = appendConsulScrapeWork(dst, sdc, cfg.baseDir, sc.swc)
if ok {
ok = okLocal
}
}
if ok {
continue
}
swsPrev := swsPrevByJob[sc.swc.jobName]
if len(swsPrev) > 0 {
logger.Errorf("there were errors when discovering consul targets for job %q, so preserving the previous targets", sc.swc.jobName)
dst = append(dst[:dstLen], swsPrev...)
}
}
return dst
}

// getDNSSDScrapeWork returns `dns_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getDNSSDScrapeWork() []ScrapeWork {
func (cfg *Config) getDNSSDScrapeWork(prev []ScrapeWork) []ScrapeWork {
swsPrevByJob := getSWSByJob(prev)
var dst []ScrapeWork
for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i]
dstLen := len(dst)
ok := true
for j := range sc.DNSSDConfigs {
sdc := &sc.DNSSDConfigs[j]
dst = appendDNSScrapeWork(dst, sdc, sc.swc)
var okLocal bool
dst, okLocal = appendDNSScrapeWork(dst, sdc, sc.swc)
if ok {
ok = okLocal
}
}
if ok {
continue
}
swsPrev := swsPrevByJob[sc.swc.jobName]
if len(swsPrev) > 0 {
logger.Errorf("there were errors when discovering dns targets for job %q, so preserving the previous targets", sc.swc.jobName)
dst = append(dst[:dstLen], swsPrev...)
}
}
return dst
}

// getEC2SDScrapeWork returns `ec2_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getEC2SDScrapeWork() []ScrapeWork {
func (cfg *Config) getEC2SDScrapeWork(prev []ScrapeWork) []ScrapeWork {
swsPrevByJob := getSWSByJob(prev)
var dst []ScrapeWork
for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i]
dstLen := len(dst)
ok := true
for j := range sc.EC2SDConfigs {
sdc := &sc.EC2SDConfigs[j]
dst = appendEC2ScrapeWork(dst, sdc, sc.swc)
var okLocal bool
dst, okLocal = appendEC2ScrapeWork(dst, sdc, sc.swc)
if ok {
ok = okLocal
}
}
if ok {
continue
}
swsPrev := swsPrevByJob[sc.swc.jobName]
if len(swsPrev) > 0 {
logger.Errorf("there were errors when discovering ec2 targets for job %q, so preserving the previous targets", sc.swc.jobName)
dst = append(dst[:dstLen], swsPrev...)
}
}
return dst
}

// getGCESDScrapeWork returns `gce_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getGCESDScrapeWork() []ScrapeWork {
func (cfg *Config) getGCESDScrapeWork(prev []ScrapeWork) []ScrapeWork {
swsPrevByJob := getSWSByJob(prev)
var dst []ScrapeWork
for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i]
dstLen := len(dst)
ok := true
for j := range sc.GCESDConfigs {
sdc := &sc.GCESDConfigs[j]
dst = appendGCEScrapeWork(dst, sdc, sc.swc)
var okLocal bool
dst, okLocal = appendGCEScrapeWork(dst, sdc, sc.swc)
if ok {
ok = okLocal
}
}
if ok {
continue
}
swsPrev := swsPrevByJob[sc.swc.jobName]
if len(swsPrev) > 0 {
logger.Errorf("there were errors when discovering gce targets for job %q, so preserving the previous targets", sc.swc.jobName)
dst = append(dst[:dstLen], swsPrev...)
}
}
return dst
}

// getFileSDScrapeWork returns `file_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getFileSDScrapeWork(swsPrev []ScrapeWork) []ScrapeWork {
func (cfg *Config) getFileSDScrapeWork(prev []ScrapeWork) []ScrapeWork {
// Create a map for the previous scrape work.
swsMapPrev := make(map[string][]ScrapeWork)
for i := range swsPrev {
sw := &swsPrev[i]
for i := range prev {
sw := &prev[i]
filepath := promrelabel.GetLabelValueByName(sw.Labels, "__vm_filepath")
if len(filepath) == 0 {
logger.Panicf("BUG: missing `__vm_filepath` label")
Expand Down Expand Up @@ -341,49 +424,49 @@ type scrapeWorkConfig struct {
sampleLimit int
}

func appendKubernetesScrapeWork(dst []ScrapeWork, sdc *kubernetes.SDConfig, baseDir string, swc *scrapeWorkConfig) []ScrapeWork {
func appendKubernetesScrapeWork(dst []ScrapeWork, sdc *kubernetes.SDConfig, baseDir string, swc *scrapeWorkConfig) ([]ScrapeWork, bool) {
targetLabels, err := kubernetes.GetLabels(sdc, baseDir)
if err != nil {
logger.Errorf("error when discovering kubernetes targets for `job_name` %q: %s; skipping it", swc.jobName, err)
return dst
return dst, false
}
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "kubernetes_sd_config")
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "kubernetes_sd_config"), true
}

func appendConsulScrapeWork(dst []ScrapeWork, sdc *consul.SDConfig, baseDir string, swc *scrapeWorkConfig) []ScrapeWork {
func appendConsulScrapeWork(dst []ScrapeWork, sdc *consul.SDConfig, baseDir string, swc *scrapeWorkConfig) ([]ScrapeWork, bool) {
targetLabels, err := consul.GetLabels(sdc, baseDir)
if err != nil {
logger.Errorf("error when discovering consul targets for `job_name` %q: %s; skipping it", swc.jobName, err)
return dst
return dst, false
}
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "consul_sd_config")
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "consul_sd_config"), true
}

func appendDNSScrapeWork(dst []ScrapeWork, sdc *dns.SDConfig, swc *scrapeWorkConfig) []ScrapeWork {
func appendDNSScrapeWork(dst []ScrapeWork, sdc *dns.SDConfig, swc *scrapeWorkConfig) ([]ScrapeWork, bool) {
targetLabels, err := dns.GetLabels(sdc)
if err != nil {
logger.Errorf("error when discovering dns targets for `job_name` %q: %s; skipping it", swc.jobName, err)
return dst
return dst, false
}
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "dns_sd_config")
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "dns_sd_config"), true
}

func appendEC2ScrapeWork(dst []ScrapeWork, sdc *ec2.SDConfig, swc *scrapeWorkConfig) []ScrapeWork {
func appendEC2ScrapeWork(dst []ScrapeWork, sdc *ec2.SDConfig, swc *scrapeWorkConfig) ([]ScrapeWork, bool) {
targetLabels, err := ec2.GetLabels(sdc)
if err != nil {
logger.Errorf("error when discovering ec2 targets for `job_name` %q: %s; skipping it", swc.jobName, err)
return dst
return dst, false
}
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "ec2_sd_config")
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "ec2_sd_config"), true
}

func appendGCEScrapeWork(dst []ScrapeWork, sdc *gce.SDConfig, swc *scrapeWorkConfig) []ScrapeWork {
func appendGCEScrapeWork(dst []ScrapeWork, sdc *gce.SDConfig, swc *scrapeWorkConfig) ([]ScrapeWork, bool) {
targetLabels, err := gce.GetLabels(sdc)
if err != nil {
logger.Errorf("error when discovering gce targets for `job_name` %q: %s; skippint it", swc.jobName, err)
return dst
return dst, false
}
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "gce_sd_config")
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "gce_sd_config"), true
}

func appendScrapeWorkForTargetLabels(dst []ScrapeWork, swc *scrapeWorkConfig, targetLabels []map[string]string, sectionName string) []ScrapeWork {
Expand Down Expand Up @@ -519,6 +602,8 @@ func appendScrapeWork(dst []ScrapeWork, swc *scrapeWorkConfig, target string, ex
AuthConfig: swc.authConfig,
MetricRelabelConfigs: swc.metricRelabelConfigs,
SampleLimit: swc.sampleLimit,

jobNameOriginal: swc.jobName,
})
return dst, nil
}
Expand Down

0 comments on commit 8f0bcec

Please sign in to comment.