Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Agent] Add K8s event data #6054

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions agent/crates/public/src/proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
pub mod common;
pub mod flow_log;
pub mod integration;
pub mod k8s_event;
pub mod metric;
pub mod stats;
pub mod trident;
2 changes: 2 additions & 0 deletions agent/crates/public/src/sender.rs
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub enum SendMessageType {
Profile = 13,
ProcEvents = 14,
AlarmEvent = 15,
KubernetesEvent = 16,
}

impl fmt::Display for SendMessageType {
Expand All @@ -77,6 +78,7 @@ impl fmt::Display for SendMessageType {
Self::Profile => write!(f, "profile"),
Self::ProcEvents => write!(f, "proc_events"),
Self::AlarmEvent => write!(f, "alarm_event"),
Self::KubernetesEvent => write!(f, "kubernetes_event"),
}
}
}
1 change: 1 addition & 0 deletions agent/docker/deepflow-agent-ds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ rules:
- services
- pods
- replicationcontrollers
- events
verbs: ["get", "list", "watch"]
- apiGroups: ["apps"]
resources:
Expand Down
1 change: 1 addition & 0 deletions agent/docker/deepflow-agent_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ rules:
- services
- pods
- replicationcontrollers
- events
verbs: ["get", "list", "watch"]
- apiGroups: ["apps"]
resources:
Expand Down
5 changes: 5 additions & 0 deletions agent/src/config/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ pub struct YamlConfig {
pub toa_lru_cache_size: usize,
pub flow_sender_queue_size: usize,
pub flow_sender_queue_count: usize,
pub k8s_event_sender_queue_size: usize,
#[serde(rename = "second-flow-extra-delay-second", with = "humantime_serde")]
pub second_flow_extra_delay: Duration,
#[serde(with = "humantime_serde")]
Expand Down Expand Up @@ -654,6 +655,9 @@ impl YamlConfig {
1 << 16
};
}
if c.k8s_event_sender_queue_size < 1 << 16 {
c.k8s_event_sender_queue_size = 1 << 16;
}
if c.packet_delay < Duration::from_secs(1) || c.packet_delay > Duration::from_secs(10) {
c.packet_delay = Duration::from_secs(1);
}
Expand Down Expand Up @@ -904,6 +908,7 @@ impl Default for YamlConfig {
// default size changes according to tap_mode
flow_sender_queue_size: 1 << 16,
flow_sender_queue_count: 1,
k8s_event_sender_queue_size: 1 << 16,
second_flow_extra_delay: Duration::from_secs(0),
packet_delay: Duration::from_secs(1),
triple: Default::default(),
Expand Down
46 changes: 40 additions & 6 deletions agent/src/platform/kubernetes/api_watcher.rs
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,12 @@ use log::{debug, error, info, log_enabled, warn, Level};
use parking_lot::RwLock;
use tokio::{runtime::Runtime, task::JoinHandle};

use super::resource_watcher::{
default_resources, supported_resources, GenericResourceWatcher, GroupVersion, Resource,
Watcher, WatcherConfig,
use super::{
k8s_events::BoxedKubernetesEvent,
resource_watcher::{
default_resources, supported_resources, GenericResourceWatcher, GroupVersion, Resource,
Watcher, WatcherConfig,
},
};
use crate::{
config::{handler::PlatformAccess, KubernetesResourceConfig},
Expand All @@ -52,9 +55,12 @@ use crate::{
stats,
},
};
use public::proto::{
common::KubernetesApiInfo,
trident::{Exception, KubernetesApiSyncRequest},
use public::{
proto::{
common::KubernetesApiInfo,
trident::{Exception, KubernetesApiSyncRequest},
},
queue::DebugSender,
};

/*
Expand Down Expand Up @@ -99,6 +105,7 @@ pub struct ApiWatcher {
exception_handler: ExceptionHandler,
stats_collector: Arc<stats::Collector>,
agent_id: Arc<RwLock<AgentId>>,
k8s_events_sender: Arc<Mutex<Option<DebugSender<BoxedKubernetesEvent>>>>,
}

impl ApiWatcher {
Expand Down Expand Up @@ -131,9 +138,17 @@ impl ApiWatcher {
watchers: Arc::new(Mutex::new(HashMap::new())),
exception_handler,
stats_collector,
k8s_events_sender: Arc::new(Mutex::new(None)),
}
}

pub fn set_k8s_events_sender(&self, k8s_events_sender: DebugSender<BoxedKubernetesEvent>) {
self.k8s_events_sender
.lock()
.unwrap()
.replace(k8s_events_sender);
}

// 直接拿对应的entries
pub fn get_watcher_entries(&self, resource_name: impl AsRef<str>) -> Option<Vec<Vec<u8>>> {
if !*self.running.lock().unwrap() {
Expand Down Expand Up @@ -221,6 +236,7 @@ impl ApiWatcher {
let watchers = self.watchers.clone();
let exception_handler = self.exception_handler.clone();
let stats_collector = self.stats_collector.clone();
let k8s_events_sender = self.k8s_events_sender.clone();

let handle = thread::Builder::new()
.name("kubernetes-api-watcher".to_owned())
Expand All @@ -236,6 +252,7 @@ impl ApiWatcher {
exception_handler,
stats_collector,
agent_id,
k8s_events_sender,
)
})
.unwrap();
Expand Down Expand Up @@ -579,6 +596,7 @@ impl ApiWatcher {
resource_watchers: &Arc<Mutex<HashMap<WatcherKey, GenericResourceWatcher>>>,
exception_handler: &ExceptionHandler,
agent_id: &Arc<RwLock<AgentId>>,
k8s_events_sender: &Arc<Mutex<Option<DebugSender<BoxedKubernetesEvent>>>>,
) {
let version = &context.version;
// 将缓存的entry 上报,如果没有则跳过
Expand Down Expand Up @@ -632,6 +650,19 @@ impl ApiWatcher {
let resource_watchers_guard = resource_watchers.lock().unwrap();
for watcher in resource_watchers_guard.values() {
let kind = watcher.pb_name();
if kind == "*v1.Events" {
let mut events = watcher.events();
if events.is_empty() {
continue;
}
let mut k8s_events_sender_guard = k8s_events_sender.lock().unwrap();
if let Some(s) = k8s_events_sender_guard.as_mut() {
if let Err(e) = s.send_all(&mut events) {
warn!("send k8s events failed: {:?}", e);
}
}
continue;
}
for entry in watcher.entries() {
total_entries.push(KubernetesApiInfo {
r#type: Some(kind.to_owned()),
Expand Down Expand Up @@ -752,6 +783,7 @@ impl ApiWatcher {
exception_handler: ExceptionHandler,
stats_collector: Arc<stats::Collector>,
agent_id: Arc<RwLock<AgentId>>,
k8s_events_sender: Arc<Mutex<Option<DebugSender<BoxedKubernetesEvent>>>>,
) {
info!("kubernetes api watcher starting");

Expand Down Expand Up @@ -849,6 +881,7 @@ impl ApiWatcher {
&resource_watchers,
&exception_handler,
&agent_id,
&k8s_events_sender,
);
break;
}
Expand All @@ -864,6 +897,7 @@ impl ApiWatcher {
&resource_watchers,
&exception_handler,
&agent_id,
&k8s_events_sender,
);
}
info!("kubernetes api watcher stopping");
Expand Down
35 changes: 35 additions & 0 deletions agent/src/platform/kubernetes/k8s_events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (c) 2024 Yunshan Networks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

use public::{
proto::k8s_event::KubernetesEvent,
sender::{SendMessageType, Sendable},
};

use prost::Message;

#[derive(Debug, Default, Clone)]
pub struct BoxedKubernetesEvent(pub Box<KubernetesEvent>);

impl Sendable for BoxedKubernetesEvent {
fn encode(self, buf: &mut Vec<u8>) -> Result<usize, prost::EncodeError> {
self.0.encode(buf).map(|_| self.0.encoded_len())
}

fn message_type(&self) -> SendMessageType {
SendMessageType::KubernetesEvent
}
}
1 change: 1 addition & 0 deletions agent/src/platform/kubernetes/mod.rs
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use regex::Regex;
mod active_poller;
mod api_watcher;
mod crd;
mod k8s_events;
mod passive_poller;
mod sidecar_poller;
pub use active_poller::ActivePoller;
Expand Down