From d4967772747f96e440c33b2b25ae8b8a998a531d Mon Sep 17 00:00:00 2001 From: David Bimmler Date: Thu, 27 Apr 2023 16:00:57 +0200 Subject: [PATCH] monitor: agent: move into hive The monitor agent consumes the eventsmap to multicast its events, as well as other agent events, to subscribers. Pull it's initialization logic into a cell, to reduce the amount of logic needed in the daemon startup sequence. If there's no eventsmap in the hive graph, the agent functions solely for passing along agent events. Dropping RunMonitor flag: Except for testing, there isn't really a use case for not attaching to the eventsmap (The attachment alone doesn't cause performance overhead yet, only once there is a consumer/listener we start reading.) Putting RunMonitor into AgentConfig would mean exposing it as a flag, which is not desirable. Instead, make the eventsmap dependency optional so that non-privileged tests/benchmarks can still use the monitor agent for its userspace capabilities. Proxy Logger changes: To improve on the isolation of components, make NewAgent private. The only consumer other than the cell is the proxy logger benchmark, which can create a hive instead. Unfortunately, this makes the benchmarks require privileges. Dropping Context() from the API: There isn't a good reason to export the context, and it doesn't belong in the monitor's public API. Rework the coordination between the monitor API server and the agent slightly to make the Context() method unnecessary. Signed-off-by: David Bimmler --- Documentation/cmdref/cilium-agent_hive.md | 2 + .../cmdref/cilium-agent_hive_dot-graph.md | 2 + daemon/cmd/daemon.go | 24 +---- daemon/cmd/daemon_main.go | 11 +-- daemon/cmd/daemon_test.go | 2 + pkg/datapath/cells.go | 4 + pkg/monitor/agent/agent.go | 59 +++++------ pkg/monitor/agent/cell.go | 99 +++++++++++++++++++ pkg/monitor/agent/server.go | 15 ++- pkg/option/config.go | 21 ---- pkg/proxy/logger/logger_test.go | 83 +++++++++++++--- test/controlplane/suite/agent.go | 2 + 12 files changed, 223 insertions(+), 101 deletions(-) create mode 100644 pkg/monitor/agent/cell.go diff --git a/Documentation/cmdref/cilium-agent_hive.md b/Documentation/cmdref/cilium-agent_hive.md index cc369ea53a33..627d55d557fe 100644 --- a/Documentation/cmdref/cilium-agent_hive.md +++ b/Documentation/cmdref/cilium-agent_hive.md @@ -22,6 +22,7 @@ cilium-agent hive [flags] --enable-cilium-health-api-server-access strings List of cilium health API APIs which are administratively enabled. Supports '*'. (default [*]) --enable-k8s Enable the k8s clientset (default true) --enable-k8s-api-discovery Enable discovery of Kubernetes API groups and resources with the discovery API + --enable-monitor Enable the monitor unix domain socket server (default true) --gops-port uint16 Port for gops server to listen on (default 9890) -h, --help help for hive --install-egress-gateway-routes Install egress gateway IP rules and routes in order to properly steer egress gateway traffic to the correct ENI interface @@ -35,6 +36,7 @@ cilium-agent hive [flags] --mesh-auth-rotated-identities-queue-size int The size of the queue for signaling rotated identities. (default 1024) --mesh-auth-spiffe-trust-domain string The trust domain for the SPIFFE identity. (default "spiffe.cilium") --mesh-auth-spire-admin-socket string The path for the SPIRE admin agent Unix socket. + --monitor-queue-size int Size of the event queue when reading monitor events --pprof Enable serving pprof debugging API --pprof-address string Address that pprof listens on (default "localhost") --pprof-port uint16 Port that pprof listens on (default 6060) diff --git a/Documentation/cmdref/cilium-agent_hive_dot-graph.md b/Documentation/cmdref/cilium-agent_hive_dot-graph.md index 728845bc1d56..eb7cd41d4a56 100644 --- a/Documentation/cmdref/cilium-agent_hive_dot-graph.md +++ b/Documentation/cmdref/cilium-agent_hive_dot-graph.md @@ -28,6 +28,7 @@ cilium-agent hive dot-graph [flags] --enable-cilium-health-api-server-access strings List of cilium health API APIs which are administratively enabled. Supports '*'. (default [*]) --enable-k8s Enable the k8s clientset (default true) --enable-k8s-api-discovery Enable discovery of Kubernetes API groups and resources with the discovery API + --enable-monitor Enable the monitor unix domain socket server (default true) --gops-port uint16 Port for gops server to listen on (default 9890) --install-egress-gateway-routes Install egress gateway IP rules and routes in order to properly steer egress gateway traffic to the correct ENI interface --k8s-api-server string Kubernetes API server URL @@ -40,6 +41,7 @@ cilium-agent hive dot-graph [flags] --mesh-auth-rotated-identities-queue-size int The size of the queue for signaling rotated identities. (default 1024) --mesh-auth-spiffe-trust-domain string The trust domain for the SPIFFE identity. (default "spiffe.cilium") --mesh-auth-spire-admin-socket string The path for the SPIRE admin agent Unix socket. + --monitor-queue-size int Size of the event queue when reading monitor events --pprof Enable serving pprof debugging API --pprof-address string Address that pprof listens on (default "localhost") --pprof-port uint16 Port that pprof listens on (default 6060) diff --git a/daemon/cmd/daemon.go b/daemon/cmd/daemon.go index 52d90193976c..12fc234686fc 100644 --- a/daemon/cmd/daemon.go +++ b/daemon/cmd/daemon.go @@ -120,7 +120,7 @@ type Daemon struct { statusResponse models.StatusResponse statusCollector *status.Collector - monitorAgent *monitoragent.Agent + monitorAgent monitoragent.Agent ciliumHealth *health.CiliumHealth deviceManager *linuxdatapath.DeviceManager @@ -539,10 +539,7 @@ func newDaemon(ctx context.Context, cleaner *daemonCleanup, params *daemonParams ipamMetadata: params.IPAMMetadataManager, cniConfigManager: params.CNIConfigManager, clustermesh: params.ClusterMesh, - } - - if option.Config.RunMonitorAgent { - d.monitorAgent = monitoragent.NewAgent(ctx) + monitorAgent: params.MonitorAgent, } d.configModifyQueue = eventqueue.NewEventQueueBuffered("config-modify-queue", ConfigModifyQueueSize) @@ -1196,23 +1193,6 @@ func newDaemon(ctx context.Context, cleaner *daemonCleanup, params *daemonParams return nil, restoredEndpoints, fmt.Errorf("error encountered while updating DNS datapath rules: %w", err) } - // We can only attach the monitor agent once cilium_event has been set up. - if option.Config.RunMonitorAgent { - err = d.monitorAgent.AttachToEventsMap(defaults.MonitorBufferPages) - if err != nil { - log.WithError(err).Error("encountered error configuring run monitor agent") - return nil, nil, fmt.Errorf("encountered error configuring run monitor agent: %w", err) - } - - if option.Config.EnableMonitor { - err = monitoragent.ServeMonitorAPI(d.monitorAgent) - if err != nil { - log.WithError(err).Error("encountered error configuring run monitor agent") - return nil, nil, fmt.Errorf("encountered error configuring run monitor agent: %w", err) - } - } - } - // Start the controller for periodic sync. The purpose of the // controller is to ensure that endpoints and host IPs entries are // reinserted to the bpf maps if they are ever removed from them. diff --git a/daemon/cmd/daemon_main.go b/daemon/cmd/daemon_main.go index 44423213e29a..cfbeb23b3fc9 100644 --- a/daemon/cmd/daemon_main.go +++ b/daemon/cmd/daemon_main.go @@ -76,6 +76,7 @@ import ( "github.com/cilium/cilium/pkg/maps/neighborsmap" "github.com/cilium/cilium/pkg/maps/policymap" "github.com/cilium/cilium/pkg/metrics" + monitorAgent "github.com/cilium/cilium/pkg/monitor/agent" monitorAPI "github.com/cilium/cilium/pkg/monitor/api" "github.com/cilium/cilium/pkg/node" nodeManager "github.com/cilium/cilium/pkg/node/manager" @@ -695,16 +696,10 @@ func initializeFlags() { flags.StringSlice(option.Metrics, []string{}, "Metrics that should be enabled or disabled from the default metric list. The list is expected to be separated by a space. (+metric_foo to enable metric_foo , -metric_bar to disable metric_bar)") option.BindEnv(Vp, option.Metrics) - flags.Bool(option.EnableMonitorName, true, "Enable the monitor unix domain socket server") - option.BindEnv(Vp, option.EnableMonitorName) - flags.String(option.MonitorAggregationName, "None", "Level of monitor aggregation for traces from the datapath") option.BindEnvWithLegacyEnvFallback(Vp, option.MonitorAggregationName, "CILIUM_MONITOR_AGGREGATION_LEVEL") - flags.Int(option.MonitorQueueSizeName, 0, "Size of the event queue when reading monitor events") - option.BindEnv(Vp, option.MonitorQueueSizeName) - flags.Int(option.MTUName, 0, "Overwrite auto-detected MTU of underlying network") option.BindEnv(Vp, option.MTUName) @@ -1159,9 +1154,6 @@ func initEnv() { var debugDatapath bool - // Not running tests -> enable the monitor agent. - option.Config.RunMonitorAgent = true - option.LogRegisteredOptions(Vp, log) sysctl.SetProcfs(option.Config.ProcFs) @@ -1629,6 +1621,7 @@ type daemonParams struct { HealthAPISpec *healthApi.Spec ServiceCache *k8s.ServiceCache ClusterMesh *clustermesh.ClusterMesh + MonitorAgent monitorAgent.Agent } func newDaemonPromise(params daemonParams) promise.Promise[*Daemon] { diff --git a/daemon/cmd/daemon_test.go b/daemon/cmd/daemon_test.go index b109273a590a..8a16a05b19a6 100644 --- a/daemon/cmd/daemon_test.go +++ b/daemon/cmd/daemon_test.go @@ -36,6 +36,7 @@ import ( "github.com/cilium/cilium/pkg/maps/signalmap" fakesignalmap "github.com/cilium/cilium/pkg/maps/signalmap/fake" "github.com/cilium/cilium/pkg/metrics" + monitorAgent "github.com/cilium/cilium/pkg/monitor/agent" monitorAPI "github.com/cilium/cilium/pkg/monitor/api" "github.com/cilium/cilium/pkg/option" "github.com/cilium/cilium/pkg/policy" @@ -173,6 +174,7 @@ func (ds *DaemonSuite) SetUpTest(c *C) { func() authmap.Map { return fakeauthmap.NewFakeAuthMap() }, func() egressmap.PolicyMap { return nil }, ), + monitorAgent.Cell, ControlPlane, cell.Invoke(func(p promise.Promise[*Daemon]) { daemonPromise = p diff --git a/pkg/datapath/cells.go b/pkg/datapath/cells.go index cf2e144db677..e083f5b76820 100644 --- a/pkg/datapath/cells.go +++ b/pkg/datapath/cells.go @@ -19,6 +19,7 @@ import ( ipcache "github.com/cilium/cilium/pkg/ipcache/types" "github.com/cilium/cilium/pkg/maps" "github.com/cilium/cilium/pkg/maps/eventsmap" + monitorAgent "github.com/cilium/cilium/pkg/monitor/agent" "github.com/cilium/cilium/pkg/option" wg "github.com/cilium/cilium/pkg/wireguard/agent" wgTypes "github.com/cilium/cilium/pkg/wireguard/types" @@ -39,6 +40,9 @@ var Cell = cell.Module( // The cilium events map, used by the monitor agent. eventsmap.Cell, + // The monitor agent, which multicasts cilium and agent events to its subscribers. + monitorAgent.Cell, + cell.Provide( newWireguardAgent, newDatapath, diff --git a/pkg/monitor/agent/agent.go b/pkg/monitor/agent/agent.go index eef4aef62fc2..6469e3bdd921 100644 --- a/pkg/monitor/agent/agent.go +++ b/pkg/monitor/agent/agent.go @@ -40,6 +40,16 @@ func isCtxDone(ctx context.Context) bool { } } +type Agent interface { + AttachToEventsMap(nPages int) error + SendEvent(typ int, event interface{}) error + RegisterNewListener(newListener listener.MonitorListener) + RemoveListener(ml listener.MonitorListener) + RegisterNewConsumer(newConsumer consumer.MonitorConsumer) + RemoveConsumer(mc consumer.MonitorConsumer) + State() *models.MonitorStatus +} + // Agent structure for centralizing the responsibilities of the main events // reader. // There is some racey-ness around perfReaderCancel since it replaces on every @@ -50,7 +60,7 @@ func isCtxDone(ctx context.Context) bool { // must have at least one MonitorListener (since it started) so no cancel is called. // If it doesn't, the cancel is the correct behavior (the older generation // cancel must have been called for us to get this far anyway). -type Agent struct { +type agent struct { lock.Mutex models.MonitorStatus @@ -67,7 +77,7 @@ type Agent struct { monitorEvents *perf.Reader } -// NewAgent starts a new monitor agent instance which distributes monitor events +// newAgent starts a new monitor agent instance which distributes monitor events // to registered listeners. Once the datapath is set up, AttachToEventsMap needs // to be called to receive events from the perf ring buffer. Otherwise, only // user space events received via SendEvent are distributed registered listeners. @@ -77,8 +87,8 @@ type Agent struct { // goroutine and close all registered listeners. // Note that the perf buffer reader is started only when listeners are // connected. -func NewAgent(ctx context.Context) *Agent { - return &Agent{ +func newAgent(ctx context.Context) *agent { + return &agent{ ctx: ctx, listeners: make(map[listener.MonitorListener]struct{}), consumers: make(map[consumer.MonitorConsumer]struct{}), @@ -89,7 +99,7 @@ func NewAgent(ctx context.Context) *Agent { // AttachToEventsMap opens the events perf ring buffer and makes it ready for // consumption, such that any subscribed consumers may receive events // from it. This function is to be called once the events map has been set up. -func (a *Agent) AttachToEventsMap(nPages int) error { +func (a *agent) AttachToEventsMap(nPages int) error { a.Lock() defer a.Unlock() @@ -120,7 +130,7 @@ func (a *Agent) AttachToEventsMap(nPages int) error { } // SendEvent distributes an event to all monitor listeners -func (a *Agent) SendEvent(typ int, event interface{}) error { +func (a *agent) SendEvent(typ int, event interface{}) error { if a == nil { return fmt.Errorf("monitor agent is not set up") } @@ -167,23 +177,16 @@ func (a *Agent) SendEvent(typ int, event interface{}) error { return nil } -// Context returns the underlying context of this monitor instance. It can be -// used to derive other contexts which should be stopped when the monitor is -// stopped. -func (a *Agent) Context() context.Context { - return a.ctx -} - // hasSubscribersLocked returns true if there are listeners or consumers // subscribed to the agent right now. // Note: it is critical to hold the lock for this operation. -func (a *Agent) hasSubscribersLocked() bool { +func (a *agent) hasSubscribersLocked() bool { return len(a.listeners)+len(a.consumers) != 0 } // hasListeners returns true if there are listeners subscribed to the // agent right now. -func (a *Agent) hasListeners() bool { +func (a *agent) hasListeners() bool { a.Lock() defer a.Unlock() return len(a.listeners) != 0 @@ -195,7 +198,7 @@ func (a *Agent) hasListeners() bool { // cancelFunc is assigned to perfReaderCancel. Note that cancelling m.Context() // (e.g. on program shutdown) will also cancel the derived context. // Note: it is critical to hold the lock for this operation. -func (a *Agent) startPerfReaderLocked() { +func (a *agent) startPerfReaderLocked() { if a.events == nil { return // not attached to events map yet } @@ -208,7 +211,7 @@ func (a *Agent) startPerfReaderLocked() { // RegisterNewListener adds the new MonitorListener to the global list. // It also spawns a singleton goroutine to read and distribute the events. -func (a *Agent) RegisterNewListener(newListener listener.MonitorListener) { +func (a *agent) RegisterNewListener(newListener listener.MonitorListener) { if a == nil { return } @@ -245,7 +248,7 @@ func (a *Agent) RegisterNewListener(newListener listener.MonitorListener) { // RemoveListener deletes the MonitorListener from the list, closes its queue, // and stops perfReader if this is the last subscriber -func (a *Agent) RemoveListener(ml listener.MonitorListener) { +func (a *agent) RemoveListener(ml listener.MonitorListener) { if a == nil { return } @@ -273,7 +276,7 @@ func (a *Agent) RemoveListener(ml listener.MonitorListener) { // RegisterNewConsumer adds the new MonitorConsumer to the global list. // It also spawns a singleton goroutine to read and distribute the events. -func (a *Agent) RegisterNewConsumer(newConsumer consumer.MonitorConsumer) { +func (a *agent) RegisterNewConsumer(newConsumer consumer.MonitorConsumer) { if a == nil { return } @@ -294,7 +297,7 @@ func (a *Agent) RegisterNewConsumer(newConsumer consumer.MonitorConsumer) { // RemoveConsumer deletes the MonitorConsumer from the list, closes its queue, // and stops perfReader if this is the last subscriber -func (a *Agent) RemoveConsumer(mc consumer.MonitorConsumer) { +func (a *agent) RemoveConsumer(mc consumer.MonitorConsumer) { if a == nil { return } @@ -312,7 +315,7 @@ func (a *Agent) RemoveConsumer(mc consumer.MonitorConsumer) { // will exit when stopCtx is done. Note, however, that it will block in the // Poll call but assumes enough events are generated that these blocks are // short. -func (a *Agent) handleEvents(stopCtx context.Context) { +func (a *agent) handleEvents(stopCtx context.Context) { scopedLog := log.WithField(logfields.StartTime, time.Now()) scopedLog.Info("Beginning to read perf buffer") defer scopedLog.Info("Stopped reading perf buffer") @@ -358,7 +361,7 @@ func (a *Agent) handleEvents(stopCtx context.Context) { // processPerfRecord processes a record from the datapath and sends it to any // registered subscribers -func (a *Agent) processPerfRecord(scopedLog *logrus.Entry, record perf.Record) { +func (a *agent) processPerfRecord(scopedLog *logrus.Entry, record perf.Record) { a.Lock() defer a.Unlock() @@ -382,7 +385,7 @@ func (a *Agent) processPerfRecord(scopedLog *logrus.Entry, record perf.Record) { } // State returns the current status of the monitor -func (a *Agent) State() *models.MonitorStatus { +func (a *agent) State() *models.MonitorStatus { if a == nil { return nil } @@ -400,7 +403,7 @@ func (a *Agent) State() *models.MonitorStatus { } // notifyAgentEvent notifies all consumers about an agent event. -func (a *Agent) notifyAgentEvent(typ int, message interface{}) { +func (a *agent) notifyAgentEvent(typ int, message interface{}) { a.Lock() defer a.Unlock() for mc := range a.consumers { @@ -410,7 +413,7 @@ func (a *Agent) notifyAgentEvent(typ int, message interface{}) { // notifyPerfEventLocked notifies all consumers about a perf event. // The caller must hold the monitor lock. -func (a *Agent) notifyPerfEventLocked(data []byte, cpu int) { +func (a *agent) notifyPerfEventLocked(data []byte, cpu int) { for mc := range a.consumers { mc.NotifyPerfEvent(data, cpu) } @@ -418,21 +421,21 @@ func (a *Agent) notifyPerfEventLocked(data []byte, cpu int) { // notifyEventToConsumersLocked notifies all consumers about lost events. // The caller must hold the monitor lock. -func (a *Agent) notifyPerfEventLostLocked(numLostEvents uint64, cpu int) { +func (a *agent) notifyPerfEventLostLocked(numLostEvents uint64, cpu int) { for mc := range a.consumers { mc.NotifyPerfEventLost(numLostEvents, cpu) } } // sendToListeners enqueues the payload to all listeners. -func (a *Agent) sendToListeners(pl *payload.Payload) { +func (a *agent) sendToListeners(pl *payload.Payload) { a.Lock() defer a.Unlock() a.sendToListenersLocked(pl) } // sendToListenersLocked enqueues the payload to all listeners while holding the monitor lock. -func (a *Agent) sendToListenersLocked(pl *payload.Payload) { +func (a *agent) sendToListenersLocked(pl *payload.Payload) { for ml := range a.listeners { ml.Enqueue(pl) } diff --git a/pkg/monitor/agent/cell.go b/pkg/monitor/agent/cell.go new file mode 100644 index 000000000000..0e0d20a1aad5 --- /dev/null +++ b/pkg/monitor/agent/cell.go @@ -0,0 +1,99 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright Authors of Cilium + +package agent + +import ( + "context" + "fmt" + + "github.com/sirupsen/logrus" + "github.com/spf13/pflag" + + "github.com/cilium/cilium/pkg/common" + "github.com/cilium/cilium/pkg/defaults" + "github.com/cilium/cilium/pkg/hive" + "github.com/cilium/cilium/pkg/hive/cell" + "github.com/cilium/cilium/pkg/maps/eventsmap" +) + +// Cell provides the monitor agent, which monitors the cilium events perf event +// buffer and forwards events to consumers/listeners. It also handles +// multicasting of other agent events. +var Cell = cell.Module( + "monitor-agent", + "Consumes the cilium events map and distributes those and other agent events", + + cell.Provide(newMonitorAgent), + cell.Config(defaultConfig), +) + +type AgentConfig struct { + // EnableMonitor enables the monitor unix domain socket server + EnableMonitor bool + + // MonitorQueueSize is the size of the monitor event queue + MonitorQueueSize int +} + +var defaultConfig = AgentConfig{ + EnableMonitor: true, +} + +func (def AgentConfig) Flags(flags *pflag.FlagSet) { + flags.Bool("enable-monitor", def.EnableMonitor, "Enable the monitor unix domain socket server") + flags.Int("monitor-queue-size", 0, "Size of the event queue when reading monitor events") +} + +type agentParams struct { + cell.In + + Lifecycle hive.Lifecycle + Log logrus.FieldLogger + Config AgentConfig + EventsMap eventsmap.Map `optional:"true"` +} + +func newMonitorAgent(params agentParams) Agent { + ctx, cancel := context.WithCancel(context.Background()) + agent := newAgent(ctx) + + params.Lifecycle.Append(hive.Hook{ + OnStart: func(hive.HookContext) error { + if params.EventsMap == nil { + // If there's no event map, function only for agent events. + log.Info("No eventsmap: monitor works only for agent events.") + return nil + } + + err := agent.AttachToEventsMap(defaults.MonitorBufferPages) + if err != nil { + log.WithError(err).Error("encountered error when attaching the monitor agent to eventsmap") + return fmt.Errorf("encountered error when attaching the monitor agent: %w", err) + } + + if params.Config.EnableMonitor { + queueSize := params.Config.MonitorQueueSize + if queueSize == 0 { + queueSize = common.GetNumPossibleCPUs(log) * defaults.MonitorQueueSizePerCPU + if queueSize > defaults.MonitorQueueSizePerCPUMaximum { + queueSize = defaults.MonitorQueueSizePerCPUMaximum + } + } + + err = ServeMonitorAPI(ctx, agent, queueSize) + if err != nil { + log.WithError(err).Error("encountered error serving monitor agent API") + return fmt.Errorf("encountered error serving monitor agent API: %w", err) + } + } + return err + }, + OnStop: func(hive.HookContext) error { + cancel() + return nil + }, + }) + + return agent +} diff --git a/pkg/monitor/agent/server.go b/pkg/monitor/agent/server.go index dcac65258864..2f0622e2cf3f 100644 --- a/pkg/monitor/agent/server.go +++ b/pkg/monitor/agent/server.go @@ -13,7 +13,6 @@ import ( "github.com/cilium/cilium/pkg/defaults" "github.com/cilium/cilium/pkg/logging" "github.com/cilium/cilium/pkg/logging/logfields" - "github.com/cilium/cilium/pkg/option" ) var ( @@ -47,14 +46,14 @@ func buildServer(path string) (*net.UnixListener, error) { // server serves the Cilium monitor API on the unix domain socket type server struct { listener net.Listener - monitor *Agent + monitor Agent } // ServeMonitorAPI serves the Cilium 1.2 monitor API on a unix domain socket. // This method starts the server in the background. The server is stopped when -// monitor.Context() is cancelled. Each incoming connection registers a new -// listener on monitor. -func ServeMonitorAPI(monitor *Agent) error { +// ctx is cancelled. Each incoming connection registers a new listener on +// monitor. +func ServeMonitorAPI(ctx context.Context, monitor Agent, queueSize int) error { listener, err := buildServer(defaults.MonitorSockPath1_2) if err != nil { return err @@ -67,14 +66,14 @@ func ServeMonitorAPI(monitor *Agent) error { log.Infof("Serving cilium node monitor v1.2 API at unix://%s", defaults.MonitorSockPath1_2) - go s.connectionHandler1_2(monitor.Context()) + go s.connectionHandler1_2(ctx, queueSize) return nil } // connectionHandler1_2 handles all the incoming connections and sets up the // listener objects. It will block until ctx is cancelled. -func (s *server) connectionHandler1_2(ctx context.Context) { +func (s *server) connectionHandler1_2(ctx context.Context, queueSize int) { go func() { <-ctx.Done() s.listener.Close() @@ -93,7 +92,7 @@ func (s *server) connectionHandler1_2(ctx context.Context) { continue } - newListener := newListenerv1_2(conn, option.Config.MonitorQueueSize, s.monitor.RemoveListener) + newListener := newListenerv1_2(conn, queueSize, s.monitor.RemoveListener) s.monitor.RegisterNewListener(newListener) } } diff --git a/pkg/option/config.go b/pkg/option/config.go index 5f2d46a7340e..4b6c6668ce48 100644 --- a/pkg/option/config.go +++ b/pkg/option/config.go @@ -710,12 +710,6 @@ const ( // IPv6MCastDevice is the name of the option to select IPv6 multicast device IPv6MCastDevice = "ipv6-mcast-device" - // EnableMonitor is the name of the option to enable the monitor socket - EnableMonitorName = "enable-monitor" - - // MonitorQueueSizeName is the name of the option MonitorQueueSize - MonitorQueueSizeName = "monitor-queue-size" - // FQDNRejectResponseCode is the name for the option for dns-proxy reject response code FQDNRejectResponseCode = "tofqdns-dns-reject-response-code" @@ -1497,9 +1491,6 @@ type DaemonConfig struct { CTMapEntriesTimeoutSYN time.Duration CTMapEntriesTimeoutFIN time.Duration - // EnableMonitor enables the monitor unix domain socket server - EnableMonitor bool - // MonitorAggregationInterval configures the interval between monitor // messages when monitor aggregation is enabled. MonitorAggregationInterval time.Duration @@ -1684,9 +1675,6 @@ type DaemonConfig struct { // the label selector in the above field. NodeEncryptionOptOutLabelsString string - // MonitorQueueSize is the size of the monitor event queue - MonitorQueueSize int - // CLI options BPFRoot string @@ -1915,9 +1903,6 @@ type DaemonConfig struct { // Specifies wheather to annotate the kubernetes nodes or not AnnotateK8sNode bool - // RunMonitorAgent indicates whether to run the monitor agent - RunMonitorAgent bool - // EnableNodePort enables k8s NodePort service implementation in BPF EnableNodePort bool @@ -3042,10 +3027,8 @@ func (c *DaemonConfig) Populate(vp *viper.Viper) { c.IPTablesRandomFully = vp.GetBool(IPTablesRandomFully) c.IPSecKeyFile = vp.GetString(IPSecKeyFileName) c.IPsecKeyRotationDuration = vp.GetDuration(IPsecKeyRotationDuration) - c.EnableMonitor = vp.GetBool(EnableMonitorName) c.MonitorAggregation = vp.GetString(MonitorAggregationName) c.MonitorAggregationInterval = vp.GetDuration(MonitorAggregationInterval) - c.MonitorQueueSize = vp.GetInt(MonitorQueueSizeName) c.MTU = vp.GetInt(MTUName) c.PreAllocateMaps = vp.GetBool(PreAllocateMapsName) c.PrependIptablesChains = vp.GetBool(PrependIptablesChainsName) @@ -3324,10 +3307,6 @@ func (c *DaemonConfig) Populate(vp *viper.Viper) { c.EndpointStatus[option] = struct{}{} } - if c.MonitorQueueSize == 0 { - c.MonitorQueueSize = getDefaultMonitorQueueSize(runtime.NumCPU()) - } - // Metrics Setup metrics.ResetMetrics() defaultMetrics := metrics.DefaultMetrics() diff --git a/pkg/proxy/logger/logger_test.go b/pkg/proxy/logger/logger_test.go index 34623fa1d155..7082cafeec22 100644 --- a/pkg/proxy/logger/logger_test.go +++ b/pkg/proxy/logger/logger_test.go @@ -12,11 +12,13 @@ import ( "sync" "testing" + "github.com/cilium/cilium/pkg/hive" + "github.com/cilium/cilium/pkg/hive/cell" + "github.com/cilium/cilium/pkg/maps/eventsmap" "github.com/cilium/cilium/pkg/monitor/agent" "github.com/cilium/cilium/pkg/monitor/agent/listener" "github.com/cilium/cilium/pkg/monitor/api" "github.com/cilium/cilium/pkg/monitor/payload" - "github.com/cilium/cilium/pkg/option" "github.com/cilium/cilium/pkg/proxy/accesslog" "github.com/cilium/cilium/pkg/u8proto" @@ -116,12 +118,12 @@ func (ml *MockMonitorListener) Close() { // Specifically, it mimics the behavior of the Daemon and its implementation of the // NewProxyLogRecord method. type MockLogNotifier struct { - monitorAgent *agent.Agent + monitorAgent agent.Agent } // NewMockLogNotifier returns a MockLogNotifier ready to be used in the benchmarks below. -func NewMockLogNotifier() *MockLogNotifier { - return &MockLogNotifier{agent.NewAgent(context.Background())} +func NewMockLogNotifier(monitor agent.Agent) *MockLogNotifier { + return &MockLogNotifier{monitor} } // NewProxyLogRecord sends the event to the monitor agent to notify the listeners. @@ -156,9 +158,7 @@ var benchCases = []struct { }, } -func BenchmarkLogNotifierWithNoListeners(b *testing.B) { - SetNotifier(NewMockLogNotifier()) - +func benchWithoutListeners(b *testing.B) { for _, bm := range benchCases { b.Run(bm.name, func(b *testing.B) { b.ReportAllocs() @@ -181,12 +181,7 @@ func BenchmarkLogNotifierWithNoListeners(b *testing.B) { } } -func BenchmarkLogNotifierWithListeners(b *testing.B) { - listener := NewMockMonitorListener(option.Config.MonitorQueueSize) - notifier := NewMockLogNotifier() - notifier.RegisterNewListener(listener) - SetNotifier(notifier) - +func benchWithListeners(listener *MockMonitorListener, b *testing.B) { for _, bm := range benchCases { b.Run(bm.name, func(b *testing.B) { ctx, cancel := context.WithCancel(context.Background()) @@ -220,3 +215,65 @@ func BenchmarkLogNotifierWithListeners(b *testing.B) { }) } } + +func BenchmarkLogNotifierWithNoListeners(b *testing.B) { + bench := cell.Invoke(func(lc hive.Lifecycle, monitor agent.Agent) error { + notifier := NewMockLogNotifier(monitor) + SetNotifier(notifier) + + lc.Append(hive.Hook{ + OnStart: func(ctx hive.HookContext) error { + benchWithoutListeners(b) + return nil + }, + OnStop: func(ctx hive.HookContext) error { return nil }, + }) + + return nil + }) + + h := hive.New( + cell.Provide(func() eventsmap.Map { return nil }), + agent.Cell, + bench, + ) + + if err := h.Start(context.TODO()); err != nil { + b.Fatalf("failed to start hive: %v", err) + } + if err := h.Stop(context.TODO()); err != nil { + b.Fatalf("failed to stop hive: %v", err) + } +} + +func BenchmarkLogNotifierWithListeners(b *testing.B) { + bench := cell.Invoke(func(lc hive.Lifecycle, monitor agent.Agent, cfg agent.AgentConfig, em eventsmap.Map) error { + listener := NewMockMonitorListener(cfg.MonitorQueueSize) + notifier := NewMockLogNotifier(monitor) + notifier.RegisterNewListener(listener) + SetNotifier(notifier) + + lc.Append(hive.Hook{ + OnStart: func(ctx hive.HookContext) error { + benchWithListeners(listener, b) + return nil + }, + OnStop: func(ctx hive.HookContext) error { return nil }, + }) + + return nil + }) + + h := hive.New( + cell.Provide(func() eventsmap.Map { return nil }), + agent.Cell, + bench, + ) + + if err := h.Start(context.TODO()); err != nil { + b.Fatalf("failed to start hive: %v", err) + } + if err := h.Stop(context.TODO()); err != nil { + b.Fatalf("failed to stop hive: %v", err) + } +} diff --git a/test/controlplane/suite/agent.go b/test/controlplane/suite/agent.go index d52f5a91a1f7..2c04e1143e7b 100644 --- a/test/controlplane/suite/agent.go +++ b/test/controlplane/suite/agent.go @@ -23,6 +23,7 @@ import ( "github.com/cilium/cilium/pkg/maps/egressmap" "github.com/cilium/cilium/pkg/maps/signalmap" fakesignalmap "github.com/cilium/cilium/pkg/maps/signalmap/fake" + monitorAgent "github.com/cilium/cilium/pkg/monitor/agent" "github.com/cilium/cilium/pkg/node" "github.com/cilium/cilium/pkg/option" agentOption "github.com/cilium/cilium/pkg/option" @@ -79,6 +80,7 @@ func startCiliumAgent(t *testing.T, clientset k8sClient.Clientset) (*fakeDatapat func() authmap.Map { return fakeauthmap.NewFakeAuthMap() }, func() egressmap.PolicyMap { return nil }, ), + monitorAgent.Cell, cmd.ControlPlane, cell.Invoke(func(p promise.Promise[*cmd.Daemon]) { daemonPromise = p