Skip to content

Commit

Permalink
Store prometheus components in Instance (#144)
Browse files Browse the repository at this point in the history
* store prometheus components in Instance

This doesn't do anything right now, but it sets up the groundwork for
calling ApplyConfig to components from outside of the Run function.
Related to #140.

* address review feedback
  • Loading branch information
rfratto committed Aug 4, 2020
1 parent bb3465f commit 210b646
Showing 1 changed file with 99 additions and 66 deletions.
165 changes: 99 additions & 66 deletions pkg/prom/instance/instance.go
Expand Up @@ -196,7 +196,18 @@ type Instance struct {

vc *MetricValueCollector

// Components are stored as struct members to allow other methods to
// access components from different goroutines. They're protected by a
// mutex to prevent concurrent reading/writing of the pointers.
//
// All components may be nil; methods using these should take care to
// do nil checks.
componentMtx sync.Mutex
wal walStorage
discovery *discoveryService
readyScrapeManager *scrape.ReadyScrapeManager
remoteStore *remote.Storage
storage storage.Storage
}

// New creates a new Instance with a directory for storing the WAL. The instance
Expand Down Expand Up @@ -235,10 +246,12 @@ func newInstance(globalCfg config.GlobalConfig, cfg Config, reg prometheus.Regis
return i, nil
}

// Run starts the instance and will run until an error happens during running
// or until context was canceled.
// Run starts the instance, initializing Prometheus components, and will
// continue to run until an error happens during execution or the provided
// context is cancelled.
//
// Run can be re-called after exiting.
// Run may be re-called after exiting, as components will be reinitialized each
// time Run is called.
func (i *Instance) Run(ctx context.Context) error {
level.Debug(i.logger).Log("msg", "running instance", "name", i.cfg.Name)

Expand All @@ -248,35 +261,11 @@ func (i *Instance) Run(ctx context.Context) error {
trackingReg := unregisterAllRegisterer{wrap: i.reg}
defer trackingReg.UnregisterAll()

wstore, err := i.newWal(&trackingReg)
if err != nil {
return err
}

discovery, err := i.newDiscoveryManager(ctx)
if err != nil {
return err
}

storage, err := i.newStorage(&trackingReg, wstore, i.readyScrapeManager)
if err != nil {
return err
if err := i.initialize(ctx, &trackingReg); err != nil {
level.Error(i.logger).Log("msg", "failed to initialize instance", "err", err)
return fmt.Errorf("failed to initialize instance: %w", err)
}

scrapeManager := newScrapeManager(log.With(i.logger, "component", "scrape manager"), storage)

err = scrapeManager.ApplyConfig(&config.Config{
GlobalConfig: i.globalCfg,
ScrapeConfigs: i.cfg.ScrapeConfigs,
})
if err != nil {
level.Error(i.logger).Log("msg", "failed applying config to scrape manager", "err", err)
return fmt.Errorf("failed applying config to scrape manager: %w", err)
}
i.readyScrapeManager.Set(scrapeManager)

rg := runGroupWithContext(ctx)

// The actors defined here are defined in the order we want them to shut down.
// Primarily, we want to ensure that the following shutdown order is
// maintained:
Expand All @@ -285,17 +274,19 @@ func (i *Instance) Run(ctx context.Context) error {
// 3. Remote write storage is closed
// This is done to allow the instance to write stale markers for all active
// series.
rg := runGroupWithContext(ctx)

{
// Target Discovery
rg.Add(discovery.Run, discovery.Stop)
rg.Add(i.discovery.Run, i.discovery.Stop)
}
{
// Truncation loop
ctx, contextCancel := context.WithCancel(context.Background())
defer contextCancel()
rg.Add(
func() error {
i.truncateLoop(ctx, wstore)
i.truncateLoop(ctx, i.wal)
level.Info(i.logger).Log("msg", "truncation loop stopped")
return nil
},
Expand All @@ -306,47 +297,109 @@ func (i *Instance) Run(ctx context.Context) error {
)
}
{
sm, err := i.readyScrapeManager.Get()
if err != nil {
level.Error(i.logger).Log("msg", "failed to get scrape manager")
return err
}

// Scrape manager
rg.Add(
func() error {
err := scrapeManager.Run(discovery.SyncCh())
err := sm.Run(i.discovery.SyncCh())
level.Info(i.logger).Log("msg", "scrape manager stopped")
return err
},
func(err error) {
// The scrape manager is closed first to allow us to write staleness
// markers without receiving new samples from scraping in the meantime.
level.Info(i.logger).Log("msg", "stopping scrape manager...")
scrapeManager.Stop()
sm.Stop()

// On a graceful shutdown, write staleness markers. If something went
// wrong, then the instance will be relaunched.
if err == nil && i.cfg.WriteStaleOnShutdown {
level.Info(i.logger).Log("msg", "writing staleness markers...")
err := wstore.WriteStalenessMarkers(i.getRemoteWriteTimestamp)
err := i.wal.WriteStalenessMarkers(i.getRemoteWriteTimestamp)
if err != nil {
level.Error(i.logger).Log("msg", "error writing staleness markers", "err", err)
}
}

level.Info(i.logger).Log("msg", "closing storage...")
if err := storage.Close(); err != nil {
if err := i.storage.Close(); err != nil {
level.Error(i.logger).Log("msg", "error stopping storage", "err", err)
}
},
)
}

err = rg.Run()
err := rg.Run()
if err != nil {
level.Error(i.logger).Log("msg", "agent instance stopped with error", "err", err)
}
return err
}

// initialize sets up the various Prometheus components with their initial
// settings. initialize will be called each time the Instance is run. Prometheus
// components cannot be reused after they are stopped so we need to recreate them
// each run.
func (i *Instance) initialize(ctx context.Context, reg prometheus.Registerer) error {
i.componentMtx.Lock()
defer i.componentMtx.Unlock()

var err error

i.wal, err = i.newWal(reg)
if err != nil {
return fmt.Errorf("error creating WAL: %w", err)
}

i.discovery, err = i.newDiscoveryManager(ctx)
if err != nil {
return fmt.Errorf("error creating discovery manager: %w", err)
}

i.readyScrapeManager = &scrape.ReadyScrapeManager{}

// Setup the remote storage
remoteLogger := log.With(i.logger, "component", "remote")
i.remoteStore = remote.NewStorage(remoteLogger, reg, i.wal.StartTime, i.wal.Directory(), i.cfg.RemoteFlushDeadline, i.readyScrapeManager)
err = i.remoteStore.ApplyConfig(&config.Config{
GlobalConfig: i.globalCfg,
RemoteWriteConfigs: i.cfg.RemoteWrite,
})
if err != nil {
return fmt.Errorf("failed applying config to remote storage: %w", err)
}

i.storage = storage.NewFanout(i.logger, i.wal, i.remoteStore)

scrapeManager := newScrapeManager(log.With(i.logger, "component", "scrape manager"), i.storage)
err = scrapeManager.ApplyConfig(&config.Config{
GlobalConfig: i.globalCfg,
ScrapeConfigs: i.cfg.ScrapeConfigs,
})
if err != nil {
return fmt.Errorf("failed applying config to scrape manager: %w", err)
}

i.readyScrapeManager.Set(scrapeManager)

return nil
}

// TargetsActive returns the set of active targets from the scrape manager. Returns nil
// if the scrape manager is not ready yet.
func (i *Instance) TargetsActive() map[string][]*scrape.Target {
i.componentMtx.Lock()
defer i.componentMtx.Unlock()

if i.readyScrapeManager == nil {
return nil
}

mgr, err := i.readyScrapeManager.Get()
if err == scrape.ErrNotReady {
return nil
Expand All @@ -358,6 +411,8 @@ func (i *Instance) TargetsActive() map[string][]*scrape.Target {
}

type discoveryService struct {
Manager *discovery.Manager

RunFunc func() error
StopFunc func(err error)
SyncChFunc func() GroupChannel
Expand Down Expand Up @@ -407,11 +462,14 @@ func (i *Instance) newDiscoveryManager(ctx context.Context) (*discoveryService,
// If host filtering is enabled, run it and use its channel for discovered
// targets.
if i.cfg.HostFilter {
filterer, err := i.newHostFilter()
hostname, err := Hostname()
if err != nil {
cancel()
return nil, err
return nil, fmt.Errorf("failed to create host filterer: %w", err)
}
level.Debug(i.logger).Log("msg", "creating host filterer", "for_host", hostname)

filterer := NewHostFilter(hostname)

rg.Add(func() error {
filterer.Run(manager.SyncCh())
Expand All @@ -426,39 +484,14 @@ func (i *Instance) newDiscoveryManager(ctx context.Context) (*discoveryService,
}

return &discoveryService{
Manager: manager,

RunFunc: rg.Run,
StopFunc: rg.Stop,
SyncChFunc: syncChFunc,
}, nil
}

func (i *Instance) newStorage(reg prometheus.Registerer, wal walStorage, sm scrape.ReadyManager) (storage.Storage, error) {
logger := log.With(i.logger, "component", "remote")

store := remote.NewStorage(logger, reg, wal.StartTime, wal.Directory(), i.cfg.RemoteFlushDeadline, sm)
err := store.ApplyConfig(&config.Config{
GlobalConfig: i.globalCfg,
RemoteWriteConfigs: i.cfg.RemoteWrite,
})
if err != nil {
level.Error(i.logger).Log("msg", "failed applying config to remote storage", "err", err)
return nil, fmt.Errorf("failed applying config to remote storage: %w", err)
}

fanoutStorage := storage.NewFanout(i.logger, wal, store)
return fanoutStorage, nil
}

func (i *Instance) newHostFilter() (*HostFilter, error) {
hostname, err := Hostname()
if err != nil {
return nil, fmt.Errorf("failed to create host filterer: %w", err)
}

level.Debug(i.logger).Log("msg", "creating host filterer", "for_host", hostname, "enabled", i.cfg.HostFilter)
return NewHostFilter(hostname), nil
}

func (i *Instance) truncateLoop(ctx context.Context, wal walStorage) {
for {
select {
Expand Down

0 comments on commit 210b646

Please sign in to comment.