diff --git a/agent/crates/public/src/proto/mod.rs b/agent/crates/public/src/proto/mod.rs index 1ca733f1e97..be531ce451a 100644 --- a/agent/crates/public/src/proto/mod.rs +++ b/agent/crates/public/src/proto/mod.rs @@ -17,6 +17,7 @@ pub mod common; pub mod flow_log; pub mod integration; +pub mod k8s_event; pub mod metric; pub mod stats; pub mod trident; diff --git a/agent/crates/public/src/sender.rs b/agent/crates/public/src/sender.rs old mode 100644 new mode 100755 index 1ec33ad0b95..19d42c2a1cd --- a/agent/crates/public/src/sender.rs +++ b/agent/crates/public/src/sender.rs @@ -56,6 +56,7 @@ pub enum SendMessageType { Profile = 13, ProcEvents = 14, AlarmEvent = 15, + KubernetesEvent = 16, } impl fmt::Display for SendMessageType { @@ -77,6 +78,7 @@ impl fmt::Display for SendMessageType { Self::Profile => write!(f, "profile"), Self::ProcEvents => write!(f, "proc_events"), Self::AlarmEvent => write!(f, "alarm_event"), + Self::KubernetesEvent => write!(f, "kubernetes_event"), } } } diff --git a/agent/docker/deepflow-agent-ds.yaml b/agent/docker/deepflow-agent-ds.yaml index a1cdccc7fc5..a3fac63e785 100644 --- a/agent/docker/deepflow-agent-ds.yaml +++ b/agent/docker/deepflow-agent-ds.yaml @@ -63,6 +63,7 @@ rules: - services - pods - replicationcontrollers + - events verbs: ["get", "list", "watch"] - apiGroups: ["apps"] resources: diff --git a/agent/docker/deepflow-agent_test.yaml b/agent/docker/deepflow-agent_test.yaml index d684d4bf456..70c2f3e25c7 100644 --- a/agent/docker/deepflow-agent_test.yaml +++ b/agent/docker/deepflow-agent_test.yaml @@ -67,6 +67,7 @@ rules: - services - pods - replicationcontrollers + - events verbs: ["get", "list", "watch"] - apiGroups: ["apps"] resources: diff --git a/agent/src/config/config.rs b/agent/src/config/config.rs index 50fdd53dc19..c3a317a0caf 100644 --- a/agent/src/config/config.rs +++ b/agent/src/config/config.rs @@ -514,6 +514,7 @@ pub struct YamlConfig { pub toa_lru_cache_size: usize, pub flow_sender_queue_size: usize, pub flow_sender_queue_count: usize, + pub k8s_event_sender_queue_size: usize, #[serde(rename = "second-flow-extra-delay-second", with = "humantime_serde")] pub second_flow_extra_delay: Duration, #[serde(with = "humantime_serde")] @@ -654,6 +655,9 @@ impl YamlConfig { 1 << 16 }; } + if c.k8s_event_sender_queue_size < 1 << 16 { + c.k8s_event_sender_queue_size = 1 << 16; + } if c.packet_delay < Duration::from_secs(1) || c.packet_delay > Duration::from_secs(10) { c.packet_delay = Duration::from_secs(1); } @@ -904,6 +908,7 @@ impl Default for YamlConfig { // default size changes according to tap_mode flow_sender_queue_size: 1 << 16, flow_sender_queue_count: 1, + k8s_event_sender_queue_size: 1 << 16, second_flow_extra_delay: Duration::from_secs(0), packet_delay: Duration::from_secs(1), triple: Default::default(), diff --git a/agent/src/platform/kubernetes/api_watcher.rs b/agent/src/platform/kubernetes/api_watcher.rs old mode 100644 new mode 100755 index 43ff07bf84f..c3bde7ec363 --- a/agent/src/platform/kubernetes/api_watcher.rs +++ b/agent/src/platform/kubernetes/api_watcher.rs @@ -36,9 +36,12 @@ use log::{debug, error, info, log_enabled, warn, Level}; use parking_lot::RwLock; use tokio::{runtime::Runtime, task::JoinHandle}; -use super::resource_watcher::{ - default_resources, supported_resources, GenericResourceWatcher, GroupVersion, Resource, - Watcher, WatcherConfig, +use super::{ + k8s_events::BoxedKubernetesEvent, + resource_watcher::{ + default_resources, supported_resources, GenericResourceWatcher, GroupVersion, Resource, + Watcher, WatcherConfig, + }, }; use crate::{ config::{handler::PlatformAccess, KubernetesResourceConfig}, @@ -52,9 +55,12 @@ use crate::{ stats, }, }; -use public::proto::{ - common::KubernetesApiInfo, - trident::{Exception, KubernetesApiSyncRequest}, +use public::{ + proto::{ + common::KubernetesApiInfo, + trident::{Exception, KubernetesApiSyncRequest}, + }, + queue::DebugSender, }; /* @@ -99,6 +105,7 @@ pub struct ApiWatcher { exception_handler: ExceptionHandler, stats_collector: Arc, agent_id: Arc>, + k8s_events_sender: Arc>>>, } impl ApiWatcher { @@ -131,9 +138,17 @@ impl ApiWatcher { watchers: Arc::new(Mutex::new(HashMap::new())), exception_handler, stats_collector, + k8s_events_sender: Arc::new(Mutex::new(None)), } } + pub fn set_k8s_events_sender(&self, k8s_events_sender: DebugSender) { + self.k8s_events_sender + .lock() + .unwrap() + .replace(k8s_events_sender); + } + // 直接拿对应的entries pub fn get_watcher_entries(&self, resource_name: impl AsRef) -> Option>> { if !*self.running.lock().unwrap() { @@ -221,6 +236,7 @@ impl ApiWatcher { let watchers = self.watchers.clone(); let exception_handler = self.exception_handler.clone(); let stats_collector = self.stats_collector.clone(); + let k8s_events_sender = self.k8s_events_sender.clone(); let handle = thread::Builder::new() .name("kubernetes-api-watcher".to_owned()) @@ -236,6 +252,7 @@ impl ApiWatcher { exception_handler, stats_collector, agent_id, + k8s_events_sender, ) }) .unwrap(); @@ -579,6 +596,7 @@ impl ApiWatcher { resource_watchers: &Arc>>, exception_handler: &ExceptionHandler, agent_id: &Arc>, + k8s_events_sender: &Arc>>>, ) { let version = &context.version; // 将缓存的entry 上报,如果没有则跳过 @@ -632,6 +650,19 @@ impl ApiWatcher { let resource_watchers_guard = resource_watchers.lock().unwrap(); for watcher in resource_watchers_guard.values() { let kind = watcher.pb_name(); + if kind == "*v1.Events" { + let mut events = watcher.events(); + if events.is_empty() { + continue; + } + let mut k8s_events_sender_guard = k8s_events_sender.lock().unwrap(); + if let Some(s) = k8s_events_sender_guard.as_mut() { + if let Err(e) = s.send_all(&mut events) { + warn!("send k8s events failed: {:?}", e); + } + } + continue; + } for entry in watcher.entries() { total_entries.push(KubernetesApiInfo { r#type: Some(kind.to_owned()), @@ -752,6 +783,7 @@ impl ApiWatcher { exception_handler: ExceptionHandler, stats_collector: Arc, agent_id: Arc>, + k8s_events_sender: Arc>>>, ) { info!("kubernetes api watcher starting"); @@ -849,6 +881,7 @@ impl ApiWatcher { &resource_watchers, &exception_handler, &agent_id, + &k8s_events_sender, ); break; } @@ -864,6 +897,7 @@ impl ApiWatcher { &resource_watchers, &exception_handler, &agent_id, + &k8s_events_sender, ); } info!("kubernetes api watcher stopping"); diff --git a/agent/src/platform/kubernetes/k8s_events.rs b/agent/src/platform/kubernetes/k8s_events.rs new file mode 100755 index 00000000000..1a24d376c84 --- /dev/null +++ b/agent/src/platform/kubernetes/k8s_events.rs @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2024 Yunshan Networks + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use public::{ + proto::k8s_event::KubernetesEvent, + sender::{SendMessageType, Sendable}, +}; + +use prost::Message; + +#[derive(Debug, Default, Clone)] +pub struct BoxedKubernetesEvent(pub Box); + +impl Sendable for BoxedKubernetesEvent { + fn encode(self, buf: &mut Vec) -> Result { + self.0.encode(buf).map(|_| self.0.encoded_len()) + } + + fn message_type(&self) -> SendMessageType { + SendMessageType::KubernetesEvent + } +} diff --git a/agent/src/platform/kubernetes/mod.rs b/agent/src/platform/kubernetes/mod.rs old mode 100644 new mode 100755 index d32ef9e9e0b..36df7c74d87 --- a/agent/src/platform/kubernetes/mod.rs +++ b/agent/src/platform/kubernetes/mod.rs @@ -25,6 +25,7 @@ use regex::Regex; mod active_poller; mod api_watcher; mod crd; +mod k8s_events; mod passive_poller; mod sidecar_poller; pub use active_poller::ActivePoller; diff --git a/agent/src/platform/kubernetes/resource_watcher.rs b/agent/src/platform/kubernetes/resource_watcher.rs old mode 100644 new mode 100755 index d31eaed56c1..e86dadfffe5 --- a/agent/src/platform/kubernetes/resource_watcher.rs +++ b/agent/src/platform/kubernetes/resource_watcher.rs @@ -22,7 +22,7 @@ use std::{ atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}, Arc, Weak, }, - time::{Duration, Instant, SystemTime}, + time::{Duration, Instant, SystemTime, UNIX_EPOCH}, }; use enum_dispatch::enum_dispatch; @@ -35,7 +35,7 @@ use k8s_openapi::{ StatefulSet, StatefulSetSpec, }, core::v1::{ - Container, ContainerStatus, Namespace, Node, NodeSpec, NodeStatus, Pod, PodSpec, + Container, ContainerStatus, Event, Namespace, Node, NodeSpec, NodeStatus, Pod, PodSpec, PodStatus, ReplicationController, ReplicationControllerSpec, Service, ServiceSpec, }, extensions, networking, @@ -49,6 +49,7 @@ use kube::{ }; use log::{debug, info, trace, warn}; use openshift_openapi::api::route::v1::Route; +use public::proto::k8s_event::{EventType, InvolvedObject, KubernetesEvent, Source}; use serde::de::DeserializeOwned; use serde::ser::Serialize; use tokio::{runtime::Handle, sync::Mutex, task::JoinHandle, time}; @@ -58,8 +59,11 @@ use super::crd::{ kruise::{CloneSet, StatefulSet as KruiseStatefulSet}, pingan::ServiceRule, }; -use crate::utils::stats::{ - self, Countable, Counter, CounterType, CounterValue, RefCountable, StatsOption, +use crate::{ + platform::kubernetes::k8s_events::BoxedKubernetesEvent, + utils::stats::{ + self, Countable, Counter, CounterType, CounterValue, RefCountable, StatsOption, + }, }; const REFRESH_INTERVAL: Duration = Duration::from_secs(3600); @@ -73,6 +77,7 @@ pub trait Watcher { fn start(&self) -> Option>; fn error(&self) -> Option; fn entries(&self) -> Vec>; + fn events(&self) -> Vec; fn pb_name(&self) -> &str; fn version(&self) -> u64; fn ready(&self) -> bool; @@ -93,6 +98,7 @@ pub enum GenericResourceWatcher { V1Ingress(ResourceWatcher), V1beta1Ingress(ResourceWatcher), ExtV1beta1Ingress(ResourceWatcher), + Event(ResourceWatcher), Route(ResourceWatcher), // CRDs @@ -235,6 +241,15 @@ pub fn default_resources() -> Vec { ], selected_gv: None, }, + Resource { + name: "events", + pb_name: "*v1.Events", + group_versions: vec![GroupVersion { + group: "events.k8s.io", + version: "v1", + }], + selected_gv: None, + }, ] } @@ -382,6 +397,15 @@ pub fn supported_resources() -> Vec { }], selected_gv: None, }, + Resource { + name: "events", + pb_name: "*v1.Events", + group_versions: vec![GroupVersion { + group: "events.k8s.io", + version: "v1", + }], + selected_gv: None, + }, ] } @@ -456,6 +480,7 @@ pub struct WatcherConfig { pub struct ResourceWatcher { api: Api, entries: Arc>>>, + events: Arc>>, err_msg: Arc>>, kind: Resource, version: Arc, @@ -469,6 +494,7 @@ pub struct ResourceWatcher { struct Context { entries: Arc>>>, + events: Arc>>, version: Arc, api: Api, kind: Resource, @@ -488,6 +514,7 @@ where fn start(&self) -> Option> { let ctx = Context { entries: self.entries.clone(), + events: self.events.clone(), version: self.version.clone(), kind: self.kind.clone(), err_msg: self.err_msg.clone(), @@ -524,6 +551,10 @@ where .collect::>() } + fn events(&self) -> Vec { + self.events.blocking_lock().drain(..).collect() + } + fn ready(&self) -> bool { self.ready.load(Ordering::Relaxed) } @@ -543,6 +574,7 @@ where Self { api, entries: Arc::new(Mutex::new(HashMap::new())), + events: Arc::new(Mutex::new(vec![])), version: Arc::new(AtomicU64::new(0)), kind, err_msg: Arc::new(Mutex::new(None)), @@ -576,8 +608,8 @@ where }; while let Some(ev) = stream.next().await { match ev { - Ok(event) => { - match &event { + Ok(watch_event) => { + match &watch_event { WatchEvent::Added(o) | WatchEvent::Modified(o) | WatchEvent::Deleted(o) => { @@ -598,7 +630,7 @@ where } } // handles add/modify/delete - Self::resolve_event(&ctx, encoder, event).await; + Self::resolve_watch_event(&ctx, encoder, watch_event).await; } Err(e) => { if std::matches!( @@ -680,6 +712,7 @@ where ctx.kind, ctx.config.list_limit, ); let mut all_entries = HashMap::new(); + let mut all_events = vec![]; let mut total_count = 0; let mut estimated_total = None; let mut total_bytes = 0; @@ -711,40 +744,54 @@ where estimated_total = Some(total_count + r as usize); } - for object in object_list.items { - if object.meta().uid.as_ref().is_none() { - continue; + if ctx.kind.name == "events" { + for object in object_list.items { + if object.meta().uid.as_ref().is_none() { + continue; + } + let event = object.trim_to_event(); + // Only collect pod events + if !is_pod_event(&event) { + continue; + } + all_events.push(event); } - let mut trim_object = object.trim(); - match serde_json::to_vec(&trim_object) { - Ok(serialized_object) => { - let compressed_object = match Self::compress_entry( - encoder, - serialized_object.as_slice(), - ) { - Ok(c) => c, - Err(e) => { - warn!( - "failed to compress {} resource with UID({}) error: {} ", - ctx.kind, - trim_object.meta().uid.as_ref().unwrap(), - e - ); - continue; - } - }; - total_bytes += compressed_object.len(); - all_entries.insert( - trim_object.meta_mut().uid.take().unwrap(), - compressed_object, - ); + } else { + for object in object_list.items { + if object.meta().uid.as_ref().is_none() { + continue; + } + let mut trim_object = object.trim(); + match serde_json::to_vec(&trim_object) { + Ok(serialized_object) => { + let compressed_object = match Self::compress_entry( + encoder, + serialized_object.as_slice(), + ) { + Ok(c) => c, + Err(e) => { + warn!( + "failed to compress {} resource with UID({}) error: {} ", + ctx.kind, + trim_object.meta().uid.as_ref().unwrap(), + e + ); + continue; + } + }; + total_bytes += compressed_object.len(); + all_entries.insert( + trim_object.meta_mut().uid.take().unwrap(), + compressed_object, + ); + } + Err(e) => warn!( + "failed serialized resource {} UID({}) to json Err: {}", + ctx.kind, + trim_object.meta().uid.as_ref().unwrap(), + e + ), } - Err(e) => warn!( - "failed serialized resource {} UID({}) to json Err: {}", - ctx.kind, - trim_object.meta().uid.as_ref().unwrap(), - e - ), } } @@ -759,6 +806,9 @@ where if !all_entries.is_empty() { *ctx.entries.lock().await = all_entries; ctx.version.fetch_add(1, Ordering::SeqCst); + } else if !all_events.is_empty() { + *ctx.events.lock().await = all_events; + ctx.version.fetch_add(1, Ordering::SeqCst); } ctx.resource_version = object_list.metadata.resource_version.take(); ctx.stats_counter @@ -805,13 +855,26 @@ where } } - async fn resolve_event( + async fn resolve_watch_event( ctx: &Context, encoder: &mut ZlibEncoder>, - event: WatchEvent, + watch_event: WatchEvent, ) { - match event { + match watch_event { WatchEvent::Added(object) | WatchEvent::Modified(object) => { + if ctx.kind.name == "events" { + if object.meta().uid.as_ref().is_none() { + return; + } + let event = object.trim_to_event(); + // Only collect pod events + if !is_pod_event(&event) { + return; + } + let all_events = &mut ctx.events.lock().await; + all_events.push(event); + return; + } Self::insert_object(encoder, object, &ctx.entries, &ctx.version, &ctx.kind).await; ctx.stats_counter .watch_applied @@ -885,6 +948,9 @@ where pub trait Trimmable: 'static + Send { fn trim(self) -> Self; + fn trim_to_event(&self) -> BoxedKubernetesEvent { + BoxedKubernetesEvent::default() + } } impl Trimmable for Pod { @@ -1168,6 +1234,58 @@ impl Trimmable for Namespace { } } +impl Trimmable for Event { + fn trim(self) -> Self { + Event::default() + } + + fn trim_to_event(&self) -> BoxedKubernetesEvent { + BoxedKubernetesEvent(Box::new(KubernetesEvent { + first_timestamp: self.first_timestamp.clone().map_or( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_micros() as u64, + |t| t.0.timestamp_micros() as u64, + ), + involved_object: Some(InvolvedObject { + field_path: self.involved_object.field_path.clone().unwrap_or_default(), + kind: self.involved_object.kind.clone().unwrap_or_default(), + name: self.involved_object.name.clone().unwrap_or_default(), + }), + message: self.message.clone().unwrap_or_default(), + reason: self.reason.clone().unwrap_or_default(), + source: Some(Source { + component: self + .source + .clone() + .map(|s| s.component.unwrap_or_default()) + .unwrap_or_default(), + }), + r#type: self + .type_ + .clone() + .map(|t| { + if t == "Normal" { + EventType::Normal + } else { + EventType::Warning + } + }) + .unwrap_or(EventType::Warning) + .into(), + })) + } +} + +fn is_pod_event(event: &BoxedKubernetesEvent) -> bool { + if let Some(o) = &event.0.involved_object { + o.kind == "Pod" + } else { + false + } +} + pub struct ResourceWatcherFactory { client: Client, runtime: Handle, @@ -1357,6 +1475,12 @@ impl ResourceWatcherFactory { namespace, config, )), + "events" => GenericResourceWatcher::Event(self.new_watcher_inner( + resource, + stats_collector, + namespace, + config, + )), _ => { warn!("unsupported resource {}", resource.name); return None; diff --git a/agent/src/trident.rs b/agent/src/trident.rs old mode 100644 new mode 100755 index 0f6ba14b7bf..7aad8b79cf8 --- a/agent/src/trident.rs +++ b/agent/src/trident.rs @@ -1908,6 +1908,34 @@ impl AgentComponents { let debugger = Debugger::new(context); let queue_debugger = debugger.clone_queue(); + #[cfg(target_os = "linux")] + { + let k8s_events_queue_name = "1-k8s-events-to-sender"; + let (k8s_events_sender, k8s_events_receiver, counter) = queue::bounded_with_debug( + yaml_config.k8s_event_sender_queue_size, + k8s_events_queue_name, + &queue_debugger, + ); + stats_collector.register_countable( + "queue", + Countable::Owned(Box::new(counter)), + vec![StatsOption::Tag( + "module", + k8s_events_queue_name.to_string(), + )], + ); + let mut k8s_events_uniform_sender = UniformSenderThread::new( + k8s_events_queue_name, + Arc::new(k8s_events_receiver), + config_handler.sender(), + stats_collector.clone(), + exception_handler.clone(), + true, + ); + api_watcher.set_k8s_events_sender(k8s_events_sender); + k8s_events_uniform_sender.start(); + } + #[cfg(any(target_os = "linux", target_os = "android"))] let (toa_sender, toa_recv, _) = queue::bounded_with_debug( yaml_config.toa_sender_queue_size, diff --git a/server/controller/model/agent_group_config.go b/server/controller/model/agent_group_config.go index d7b1b4bde44..64938d4d2aa 100644 --- a/server/controller/model/agent_group_config.go +++ b/server/controller/model/agent_group_config.go @@ -56,6 +56,7 @@ type StaticConfig struct { ToaLruCacheSize *int `yaml:"toa-lru-cache-size,omitempty"` FlowSenderQueueSize *int `yaml:"flow-sender-queue-size,omitempty"` FlowSenderQueueCount *int `yaml:"flow-sender-queue-count,omitempty"` + K8sEventSenderQueueSize *int `yaml:"k8s-event-sender-queue-size,omitempty"` SecondFlowExtraDelaySecond *string `yaml:"second-flow-extra-delay-second,omitempty"` PacketDelay *string `yaml:"packet-delay,omitempty"` Triple *TripleMapConfig `yaml:"triple,omitempty"` diff --git a/server/controller/model/agent_group_config_example.yaml b/server/controller/model/agent_group_config_example.yaml index 5bbb30a4dfa..be22ad6c024 100644 --- a/server/controller/model/agent_group_config_example.yaml +++ b/server/controller/model/agent_group_config_example.yaml @@ -727,6 +727,12 @@ vtap_group_id: g-xxxxxx ## FlowAggregator/SessionAggregator. #flow-sender-queue-count: 1 + ## Queue Size of K8sEventSender Output + ## Default: 65536. Range: [65536, +oo) + ## Note: the length of the following queues: + ## - 1-k8s-events-to-sender + #k8s-event-sender-queue-size: 65536 + ## Queue Size for Analyzer Mode ## Default: 131072. Range: [65536, +oo) ## Note: the length of the following queues (only for tap_mode = 2):