Skip to content

Commit

Permalink
monitor: agent: move into hive
Browse files Browse the repository at this point in the history
The monitor agent consumes the eventsmap to multicast its events, as
well as other agent events, to subscribers. Pull it's initialization
logic into a cell, to reduce the amount of logic needed in the daemon
startup sequence. If there's no eventsmap in the hive graph, the agent
functions solely for passing along agent events.

Dropping RunMonitor flag:
Except for testing, there isn't really a use case for not attaching to
the eventsmap (The attachment alone doesn't cause performance overhead
yet, only once there is a consumer/listener we start reading.) Putting
RunMonitor into AgentConfig would mean exposing it as a flag, which
is not desirable. Instead, make the eventsmap dependency optional so
that non-privileged tests/benchmarks can still use the monitor agent for
its userspace capabilities.

Proxy Logger changes:
To improve on the isolation of components, make NewAgent private. The
only consumer other than the cell is the proxy logger benchmark, which
can create a hive instead. Unfortunately, this makes the benchmarks
require privileges.

Dropping Context() from the API:
There isn't a good reason to export the context, and it doesn't
belong in the monitor's public API. Rework the coordination between the
monitor API server and the agent slightly to make the Context() method
unnecessary.

Signed-off-by: David Bimmler <david.bimmler@isovalent.com>
  • Loading branch information
bimmlerd authored and julianwiedmann committed May 31, 2023
1 parent 80d30af commit d496777
Show file tree
Hide file tree
Showing 12 changed files with 223 additions and 101 deletions.
2 changes: 2 additions & 0 deletions Documentation/cmdref/cilium-agent_hive.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Documentation/cmdref/cilium-agent_hive_dot-graph.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 2 additions & 22 deletions daemon/cmd/daemon.go
Expand Up @@ -120,7 +120,7 @@ type Daemon struct {
statusResponse models.StatusResponse
statusCollector *status.Collector

monitorAgent *monitoragent.Agent
monitorAgent monitoragent.Agent
ciliumHealth *health.CiliumHealth

deviceManager *linuxdatapath.DeviceManager
Expand Down Expand Up @@ -539,10 +539,7 @@ func newDaemon(ctx context.Context, cleaner *daemonCleanup, params *daemonParams
ipamMetadata: params.IPAMMetadataManager,
cniConfigManager: params.CNIConfigManager,
clustermesh: params.ClusterMesh,
}

if option.Config.RunMonitorAgent {
d.monitorAgent = monitoragent.NewAgent(ctx)
monitorAgent: params.MonitorAgent,
}

d.configModifyQueue = eventqueue.NewEventQueueBuffered("config-modify-queue", ConfigModifyQueueSize)
Expand Down Expand Up @@ -1196,23 +1193,6 @@ func newDaemon(ctx context.Context, cleaner *daemonCleanup, params *daemonParams
return nil, restoredEndpoints, fmt.Errorf("error encountered while updating DNS datapath rules: %w", err)
}

// We can only attach the monitor agent once cilium_event has been set up.
if option.Config.RunMonitorAgent {
err = d.monitorAgent.AttachToEventsMap(defaults.MonitorBufferPages)
if err != nil {
log.WithError(err).Error("encountered error configuring run monitor agent")
return nil, nil, fmt.Errorf("encountered error configuring run monitor agent: %w", err)
}

if option.Config.EnableMonitor {
err = monitoragent.ServeMonitorAPI(d.monitorAgent)
if err != nil {
log.WithError(err).Error("encountered error configuring run monitor agent")
return nil, nil, fmt.Errorf("encountered error configuring run monitor agent: %w", err)
}
}
}

// Start the controller for periodic sync. The purpose of the
// controller is to ensure that endpoints and host IPs entries are
// reinserted to the bpf maps if they are ever removed from them.
Expand Down
11 changes: 2 additions & 9 deletions daemon/cmd/daemon_main.go
Expand Up @@ -76,6 +76,7 @@ import (
"github.com/cilium/cilium/pkg/maps/neighborsmap"
"github.com/cilium/cilium/pkg/maps/policymap"
"github.com/cilium/cilium/pkg/metrics"
monitorAgent "github.com/cilium/cilium/pkg/monitor/agent"
monitorAPI "github.com/cilium/cilium/pkg/monitor/api"
"github.com/cilium/cilium/pkg/node"
nodeManager "github.com/cilium/cilium/pkg/node/manager"
Expand Down Expand Up @@ -695,16 +696,10 @@ func initializeFlags() {
flags.StringSlice(option.Metrics, []string{}, "Metrics that should be enabled or disabled from the default metric list. The list is expected to be separated by a space. (+metric_foo to enable metric_foo , -metric_bar to disable metric_bar)")
option.BindEnv(Vp, option.Metrics)

flags.Bool(option.EnableMonitorName, true, "Enable the monitor unix domain socket server")
option.BindEnv(Vp, option.EnableMonitorName)

flags.String(option.MonitorAggregationName, "None",
"Level of monitor aggregation for traces from the datapath")
option.BindEnvWithLegacyEnvFallback(Vp, option.MonitorAggregationName, "CILIUM_MONITOR_AGGREGATION_LEVEL")

flags.Int(option.MonitorQueueSizeName, 0, "Size of the event queue when reading monitor events")
option.BindEnv(Vp, option.MonitorQueueSizeName)

flags.Int(option.MTUName, 0, "Overwrite auto-detected MTU of underlying network")
option.BindEnv(Vp, option.MTUName)

Expand Down Expand Up @@ -1159,9 +1154,6 @@ func initEnv() {

var debugDatapath bool

// Not running tests -> enable the monitor agent.
option.Config.RunMonitorAgent = true

option.LogRegisteredOptions(Vp, log)

sysctl.SetProcfs(option.Config.ProcFs)
Expand Down Expand Up @@ -1629,6 +1621,7 @@ type daemonParams struct {
HealthAPISpec *healthApi.Spec
ServiceCache *k8s.ServiceCache
ClusterMesh *clustermesh.ClusterMesh
MonitorAgent monitorAgent.Agent
}

func newDaemonPromise(params daemonParams) promise.Promise[*Daemon] {
Expand Down
2 changes: 2 additions & 0 deletions daemon/cmd/daemon_test.go
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cilium/cilium/pkg/maps/signalmap"
fakesignalmap "github.com/cilium/cilium/pkg/maps/signalmap/fake"
"github.com/cilium/cilium/pkg/metrics"
monitorAgent "github.com/cilium/cilium/pkg/monitor/agent"
monitorAPI "github.com/cilium/cilium/pkg/monitor/api"
"github.com/cilium/cilium/pkg/option"
"github.com/cilium/cilium/pkg/policy"
Expand Down Expand Up @@ -173,6 +174,7 @@ func (ds *DaemonSuite) SetUpTest(c *C) {
func() authmap.Map { return fakeauthmap.NewFakeAuthMap() },
func() egressmap.PolicyMap { return nil },
),
monitorAgent.Cell,
ControlPlane,
cell.Invoke(func(p promise.Promise[*Daemon]) {
daemonPromise = p
Expand Down
4 changes: 4 additions & 0 deletions pkg/datapath/cells.go
Expand Up @@ -19,6 +19,7 @@ import (
ipcache "github.com/cilium/cilium/pkg/ipcache/types"
"github.com/cilium/cilium/pkg/maps"
"github.com/cilium/cilium/pkg/maps/eventsmap"
monitorAgent "github.com/cilium/cilium/pkg/monitor/agent"
"github.com/cilium/cilium/pkg/option"
wg "github.com/cilium/cilium/pkg/wireguard/agent"
wgTypes "github.com/cilium/cilium/pkg/wireguard/types"
Expand All @@ -39,6 +40,9 @@ var Cell = cell.Module(
// The cilium events map, used by the monitor agent.
eventsmap.Cell,

// The monitor agent, which multicasts cilium and agent events to its subscribers.
monitorAgent.Cell,

cell.Provide(
newWireguardAgent,
newDatapath,
Expand Down
59 changes: 31 additions & 28 deletions pkg/monitor/agent/agent.go
Expand Up @@ -40,6 +40,16 @@ func isCtxDone(ctx context.Context) bool {
}
}

type Agent interface {
AttachToEventsMap(nPages int) error
SendEvent(typ int, event interface{}) error
RegisterNewListener(newListener listener.MonitorListener)
RemoveListener(ml listener.MonitorListener)
RegisterNewConsumer(newConsumer consumer.MonitorConsumer)
RemoveConsumer(mc consumer.MonitorConsumer)
State() *models.MonitorStatus
}

// Agent structure for centralizing the responsibilities of the main events
// reader.
// There is some racey-ness around perfReaderCancel since it replaces on every
Expand All @@ -50,7 +60,7 @@ func isCtxDone(ctx context.Context) bool {
// must have at least one MonitorListener (since it started) so no cancel is called.
// If it doesn't, the cancel is the correct behavior (the older generation
// cancel must have been called for us to get this far anyway).
type Agent struct {
type agent struct {
lock.Mutex
models.MonitorStatus

Expand All @@ -67,7 +77,7 @@ type Agent struct {
monitorEvents *perf.Reader
}

// NewAgent starts a new monitor agent instance which distributes monitor events
// newAgent starts a new monitor agent instance which distributes monitor events
// to registered listeners. Once the datapath is set up, AttachToEventsMap needs
// to be called to receive events from the perf ring buffer. Otherwise, only
// user space events received via SendEvent are distributed registered listeners.
Expand All @@ -77,8 +87,8 @@ type Agent struct {
// goroutine and close all registered listeners.
// Note that the perf buffer reader is started only when listeners are
// connected.
func NewAgent(ctx context.Context) *Agent {
return &Agent{
func newAgent(ctx context.Context) *agent {
return &agent{
ctx: ctx,
listeners: make(map[listener.MonitorListener]struct{}),
consumers: make(map[consumer.MonitorConsumer]struct{}),
Expand All @@ -89,7 +99,7 @@ func NewAgent(ctx context.Context) *Agent {
// AttachToEventsMap opens the events perf ring buffer and makes it ready for
// consumption, such that any subscribed consumers may receive events
// from it. This function is to be called once the events map has been set up.
func (a *Agent) AttachToEventsMap(nPages int) error {
func (a *agent) AttachToEventsMap(nPages int) error {
a.Lock()
defer a.Unlock()

Expand Down Expand Up @@ -120,7 +130,7 @@ func (a *Agent) AttachToEventsMap(nPages int) error {
}

// SendEvent distributes an event to all monitor listeners
func (a *Agent) SendEvent(typ int, event interface{}) error {
func (a *agent) SendEvent(typ int, event interface{}) error {
if a == nil {
return fmt.Errorf("monitor agent is not set up")
}
Expand Down Expand Up @@ -167,23 +177,16 @@ func (a *Agent) SendEvent(typ int, event interface{}) error {
return nil
}

// Context returns the underlying context of this monitor instance. It can be
// used to derive other contexts which should be stopped when the monitor is
// stopped.
func (a *Agent) Context() context.Context {
return a.ctx
}

// hasSubscribersLocked returns true if there are listeners or consumers
// subscribed to the agent right now.
// Note: it is critical to hold the lock for this operation.
func (a *Agent) hasSubscribersLocked() bool {
func (a *agent) hasSubscribersLocked() bool {
return len(a.listeners)+len(a.consumers) != 0
}

// hasListeners returns true if there are listeners subscribed to the
// agent right now.
func (a *Agent) hasListeners() bool {
func (a *agent) hasListeners() bool {
a.Lock()
defer a.Unlock()
return len(a.listeners) != 0
Expand All @@ -195,7 +198,7 @@ func (a *Agent) hasListeners() bool {
// cancelFunc is assigned to perfReaderCancel. Note that cancelling m.Context()
// (e.g. on program shutdown) will also cancel the derived context.
// Note: it is critical to hold the lock for this operation.
func (a *Agent) startPerfReaderLocked() {
func (a *agent) startPerfReaderLocked() {
if a.events == nil {
return // not attached to events map yet
}
Expand All @@ -208,7 +211,7 @@ func (a *Agent) startPerfReaderLocked() {

// RegisterNewListener adds the new MonitorListener to the global list.
// It also spawns a singleton goroutine to read and distribute the events.
func (a *Agent) RegisterNewListener(newListener listener.MonitorListener) {
func (a *agent) RegisterNewListener(newListener listener.MonitorListener) {
if a == nil {
return
}
Expand Down Expand Up @@ -245,7 +248,7 @@ func (a *Agent) RegisterNewListener(newListener listener.MonitorListener) {

// RemoveListener deletes the MonitorListener from the list, closes its queue,
// and stops perfReader if this is the last subscriber
func (a *Agent) RemoveListener(ml listener.MonitorListener) {
func (a *agent) RemoveListener(ml listener.MonitorListener) {
if a == nil {
return
}
Expand Down Expand Up @@ -273,7 +276,7 @@ func (a *Agent) RemoveListener(ml listener.MonitorListener) {

// RegisterNewConsumer adds the new MonitorConsumer to the global list.
// It also spawns a singleton goroutine to read and distribute the events.
func (a *Agent) RegisterNewConsumer(newConsumer consumer.MonitorConsumer) {
func (a *agent) RegisterNewConsumer(newConsumer consumer.MonitorConsumer) {
if a == nil {
return
}
Expand All @@ -294,7 +297,7 @@ func (a *Agent) RegisterNewConsumer(newConsumer consumer.MonitorConsumer) {

// RemoveConsumer deletes the MonitorConsumer from the list, closes its queue,
// and stops perfReader if this is the last subscriber
func (a *Agent) RemoveConsumer(mc consumer.MonitorConsumer) {
func (a *agent) RemoveConsumer(mc consumer.MonitorConsumer) {
if a == nil {
return
}
Expand All @@ -312,7 +315,7 @@ func (a *Agent) RemoveConsumer(mc consumer.MonitorConsumer) {
// will exit when stopCtx is done. Note, however, that it will block in the
// Poll call but assumes enough events are generated that these blocks are
// short.
func (a *Agent) handleEvents(stopCtx context.Context) {
func (a *agent) handleEvents(stopCtx context.Context) {
scopedLog := log.WithField(logfields.StartTime, time.Now())
scopedLog.Info("Beginning to read perf buffer")
defer scopedLog.Info("Stopped reading perf buffer")
Expand Down Expand Up @@ -358,7 +361,7 @@ func (a *Agent) handleEvents(stopCtx context.Context) {

// processPerfRecord processes a record from the datapath and sends it to any
// registered subscribers
func (a *Agent) processPerfRecord(scopedLog *logrus.Entry, record perf.Record) {
func (a *agent) processPerfRecord(scopedLog *logrus.Entry, record perf.Record) {
a.Lock()
defer a.Unlock()

Expand All @@ -382,7 +385,7 @@ func (a *Agent) processPerfRecord(scopedLog *logrus.Entry, record perf.Record) {
}

// State returns the current status of the monitor
func (a *Agent) State() *models.MonitorStatus {
func (a *agent) State() *models.MonitorStatus {
if a == nil {
return nil
}
Expand All @@ -400,7 +403,7 @@ func (a *Agent) State() *models.MonitorStatus {
}

// notifyAgentEvent notifies all consumers about an agent event.
func (a *Agent) notifyAgentEvent(typ int, message interface{}) {
func (a *agent) notifyAgentEvent(typ int, message interface{}) {
a.Lock()
defer a.Unlock()
for mc := range a.consumers {
Expand All @@ -410,29 +413,29 @@ func (a *Agent) notifyAgentEvent(typ int, message interface{}) {

// notifyPerfEventLocked notifies all consumers about a perf event.
// The caller must hold the monitor lock.
func (a *Agent) notifyPerfEventLocked(data []byte, cpu int) {
func (a *agent) notifyPerfEventLocked(data []byte, cpu int) {
for mc := range a.consumers {
mc.NotifyPerfEvent(data, cpu)
}
}

// notifyEventToConsumersLocked notifies all consumers about lost events.
// The caller must hold the monitor lock.
func (a *Agent) notifyPerfEventLostLocked(numLostEvents uint64, cpu int) {
func (a *agent) notifyPerfEventLostLocked(numLostEvents uint64, cpu int) {
for mc := range a.consumers {
mc.NotifyPerfEventLost(numLostEvents, cpu)
}
}

// sendToListeners enqueues the payload to all listeners.
func (a *Agent) sendToListeners(pl *payload.Payload) {
func (a *agent) sendToListeners(pl *payload.Payload) {
a.Lock()
defer a.Unlock()
a.sendToListenersLocked(pl)
}

// sendToListenersLocked enqueues the payload to all listeners while holding the monitor lock.
func (a *Agent) sendToListenersLocked(pl *payload.Payload) {
func (a *agent) sendToListenersLocked(pl *payload.Payload) {
for ml := range a.listeners {
ml.Enqueue(pl)
}
Expand Down

0 comments on commit d496777

Please sign in to comment.