From 210b64648c16172a63e1ccce92db7259ccc478a6 Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Tue, 4 Aug 2020 11:49:23 -0400 Subject: [PATCH] Store prometheus components in Instance (#144) * store prometheus components in Instance This doesn't do anything right now, but it sets up the groundwork for calling ApplyConfig to components from outside of the Run function. Related to #140. * address review feedback --- pkg/prom/instance/instance.go | 165 ++++++++++++++++++++-------------- 1 file changed, 99 insertions(+), 66 deletions(-) diff --git a/pkg/prom/instance/instance.go b/pkg/prom/instance/instance.go index b5ea2c6ea9d9..3985f07013a2 100644 --- a/pkg/prom/instance/instance.go +++ b/pkg/prom/instance/instance.go @@ -196,7 +196,18 @@ type Instance struct { vc *MetricValueCollector + // Components are stored as struct members to allow other methods to + // access components from different goroutines. They're protected by a + // mutex to prevent concurrent reading/writing of the pointers. + // + // All components may be nil; methods using these should take care to + // do nil checks. + componentMtx sync.Mutex + wal walStorage + discovery *discoveryService readyScrapeManager *scrape.ReadyScrapeManager + remoteStore *remote.Storage + storage storage.Storage } // New creates a new Instance with a directory for storing the WAL. The instance @@ -235,10 +246,12 @@ func newInstance(globalCfg config.GlobalConfig, cfg Config, reg prometheus.Regis return i, nil } -// Run starts the instance and will run until an error happens during running -// or until context was canceled. +// Run starts the instance, initializing Prometheus components, and will +// continue to run until an error happens during execution or the provided +// context is cancelled. // -// Run can be re-called after exiting. +// Run may be re-called after exiting, as components will be reinitialized each +// time Run is called. func (i *Instance) Run(ctx context.Context) error { level.Debug(i.logger).Log("msg", "running instance", "name", i.cfg.Name) @@ -248,35 +261,11 @@ func (i *Instance) Run(ctx context.Context) error { trackingReg := unregisterAllRegisterer{wrap: i.reg} defer trackingReg.UnregisterAll() - wstore, err := i.newWal(&trackingReg) - if err != nil { - return err - } - - discovery, err := i.newDiscoveryManager(ctx) - if err != nil { - return err - } - - storage, err := i.newStorage(&trackingReg, wstore, i.readyScrapeManager) - if err != nil { - return err + if err := i.initialize(ctx, &trackingReg); err != nil { + level.Error(i.logger).Log("msg", "failed to initialize instance", "err", err) + return fmt.Errorf("failed to initialize instance: %w", err) } - scrapeManager := newScrapeManager(log.With(i.logger, "component", "scrape manager"), storage) - - err = scrapeManager.ApplyConfig(&config.Config{ - GlobalConfig: i.globalCfg, - ScrapeConfigs: i.cfg.ScrapeConfigs, - }) - if err != nil { - level.Error(i.logger).Log("msg", "failed applying config to scrape manager", "err", err) - return fmt.Errorf("failed applying config to scrape manager: %w", err) - } - i.readyScrapeManager.Set(scrapeManager) - - rg := runGroupWithContext(ctx) - // The actors defined here are defined in the order we want them to shut down. // Primarily, we want to ensure that the following shutdown order is // maintained: @@ -285,9 +274,11 @@ func (i *Instance) Run(ctx context.Context) error { // 3. Remote write storage is closed // This is done to allow the instance to write stale markers for all active // series. + rg := runGroupWithContext(ctx) + { // Target Discovery - rg.Add(discovery.Run, discovery.Stop) + rg.Add(i.discovery.Run, i.discovery.Stop) } { // Truncation loop @@ -295,7 +286,7 @@ func (i *Instance) Run(ctx context.Context) error { defer contextCancel() rg.Add( func() error { - i.truncateLoop(ctx, wstore) + i.truncateLoop(ctx, i.wal) level.Info(i.logger).Log("msg", "truncation loop stopped") return nil }, @@ -306,10 +297,16 @@ func (i *Instance) Run(ctx context.Context) error { ) } { + sm, err := i.readyScrapeManager.Get() + if err != nil { + level.Error(i.logger).Log("msg", "failed to get scrape manager") + return err + } + // Scrape manager rg.Add( func() error { - err := scrapeManager.Run(discovery.SyncCh()) + err := sm.Run(i.discovery.SyncCh()) level.Info(i.logger).Log("msg", "scrape manager stopped") return err }, @@ -317,36 +314,92 @@ func (i *Instance) Run(ctx context.Context) error { // The scrape manager is closed first to allow us to write staleness // markers without receiving new samples from scraping in the meantime. level.Info(i.logger).Log("msg", "stopping scrape manager...") - scrapeManager.Stop() + sm.Stop() // On a graceful shutdown, write staleness markers. If something went // wrong, then the instance will be relaunched. if err == nil && i.cfg.WriteStaleOnShutdown { level.Info(i.logger).Log("msg", "writing staleness markers...") - err := wstore.WriteStalenessMarkers(i.getRemoteWriteTimestamp) + err := i.wal.WriteStalenessMarkers(i.getRemoteWriteTimestamp) if err != nil { level.Error(i.logger).Log("msg", "error writing staleness markers", "err", err) } } level.Info(i.logger).Log("msg", "closing storage...") - if err := storage.Close(); err != nil { + if err := i.storage.Close(); err != nil { level.Error(i.logger).Log("msg", "error stopping storage", "err", err) } }, ) } - err = rg.Run() + err := rg.Run() if err != nil { level.Error(i.logger).Log("msg", "agent instance stopped with error", "err", err) } return err } +// initialize sets up the various Prometheus components with their initial +// settings. initialize will be called each time the Instance is run. Prometheus +// components cannot be reused after they are stopped so we need to recreate them +// each run. +func (i *Instance) initialize(ctx context.Context, reg prometheus.Registerer) error { + i.componentMtx.Lock() + defer i.componentMtx.Unlock() + + var err error + + i.wal, err = i.newWal(reg) + if err != nil { + return fmt.Errorf("error creating WAL: %w", err) + } + + i.discovery, err = i.newDiscoveryManager(ctx) + if err != nil { + return fmt.Errorf("error creating discovery manager: %w", err) + } + + i.readyScrapeManager = &scrape.ReadyScrapeManager{} + + // Setup the remote storage + remoteLogger := log.With(i.logger, "component", "remote") + i.remoteStore = remote.NewStorage(remoteLogger, reg, i.wal.StartTime, i.wal.Directory(), i.cfg.RemoteFlushDeadline, i.readyScrapeManager) + err = i.remoteStore.ApplyConfig(&config.Config{ + GlobalConfig: i.globalCfg, + RemoteWriteConfigs: i.cfg.RemoteWrite, + }) + if err != nil { + return fmt.Errorf("failed applying config to remote storage: %w", err) + } + + i.storage = storage.NewFanout(i.logger, i.wal, i.remoteStore) + + scrapeManager := newScrapeManager(log.With(i.logger, "component", "scrape manager"), i.storage) + err = scrapeManager.ApplyConfig(&config.Config{ + GlobalConfig: i.globalCfg, + ScrapeConfigs: i.cfg.ScrapeConfigs, + }) + if err != nil { + return fmt.Errorf("failed applying config to scrape manager: %w", err) + } + + i.readyScrapeManager.Set(scrapeManager) + + return nil +} + // TargetsActive returns the set of active targets from the scrape manager. Returns nil // if the scrape manager is not ready yet. func (i *Instance) TargetsActive() map[string][]*scrape.Target { + i.componentMtx.Lock() + defer i.componentMtx.Unlock() + + if i.readyScrapeManager == nil { + return nil + } + mgr, err := i.readyScrapeManager.Get() if err == scrape.ErrNotReady { return nil @@ -358,6 +411,8 @@ func (i *Instance) TargetsActive() map[string][]*scrape.Target { } type discoveryService struct { + Manager *discovery.Manager + RunFunc func() error StopFunc func(err error) SyncChFunc func() GroupChannel @@ -407,11 +462,14 @@ func (i *Instance) newDiscoveryManager(ctx context.Context) (*discoveryService, // If host filtering is enabled, run it and use its channel for discovered // targets. if i.cfg.HostFilter { - filterer, err := i.newHostFilter() + hostname, err := Hostname() if err != nil { cancel() - return nil, err + return nil, fmt.Errorf("failed to create host filterer: %w", err) } + level.Debug(i.logger).Log("msg", "creating host filterer", "for_host", hostname) + + filterer := NewHostFilter(hostname) rg.Add(func() error { filterer.Run(manager.SyncCh()) @@ -426,39 +484,14 @@ func (i *Instance) newDiscoveryManager(ctx context.Context) (*discoveryService, } return &discoveryService{ + Manager: manager, + RunFunc: rg.Run, StopFunc: rg.Stop, SyncChFunc: syncChFunc, }, nil } -func (i *Instance) newStorage(reg prometheus.Registerer, wal walStorage, sm scrape.ReadyManager) (storage.Storage, error) { - logger := log.With(i.logger, "component", "remote") - - store := remote.NewStorage(logger, reg, wal.StartTime, wal.Directory(), i.cfg.RemoteFlushDeadline, sm) - err := store.ApplyConfig(&config.Config{ - GlobalConfig: i.globalCfg, - RemoteWriteConfigs: i.cfg.RemoteWrite, - }) - if err != nil { - level.Error(i.logger).Log("msg", "failed applying config to remote storage", "err", err) - return nil, fmt.Errorf("failed applying config to remote storage: %w", err) - } - - fanoutStorage := storage.NewFanout(i.logger, wal, store) - return fanoutStorage, nil -} - -func (i *Instance) newHostFilter() (*HostFilter, error) { - hostname, err := Hostname() - if err != nil { - return nil, fmt.Errorf("failed to create host filterer: %w", err) - } - - level.Debug(i.logger).Log("msg", "creating host filterer", "for_host", hostname, "enabled", i.cfg.HostFilter) - return NewHostFilter(hostname), nil -} - func (i *Instance) truncateLoop(ctx context.Context, wal walStorage) { for { select {