From d6dfd783c6ac0cd33df6e8b61611df72d22007a9 Mon Sep 17 00:00:00 2001 From: siddh34 Date: Sun, 12 Oct 2025 10:48:01 +0530 Subject: [PATCH] [CortexFlow#140]: Improve agent API response --- core/api/protos/agent.proto | 9 +++++++- core/api/src/agent.rs | 20 +++++++++++----- core/api/src/api.rs | 46 +++++++++++++++++++++---------------- 3 files changed, 48 insertions(+), 27 deletions(-) diff --git a/core/api/protos/agent.proto b/core/api/protos/agent.proto index dff6026..b8f01a9 100644 --- a/core/api/protos/agent.proto +++ b/core/api/protos/agent.proto @@ -4,12 +4,19 @@ package agent; message RequestActiveConnections{ optional string pod_ip = 2 ; } + +message ConnectionEvent { + string event_id = 1; + string src_ip_port = 2; // e.g., "192.168.1.1:8080" (src_ip:src_port) + string dst_ip_port = 3; // e.g., "10.0.0.1:80" (dst_ip:dst_port) +} + // TODO: the complete Response will be able to return all the context below //* "Event Id: {} Protocol: {:?} SRC: {}:{} -> DST: {}:{}", //* event_id, proto, src, src_port, dst, dst_port message ActiveConnectionResponse{ string status = 1; - map events = 2 ; //for simplicity right now we only return event_id and src + repeated ConnectionEvent events = 2; // List of connection events } //declare agent api diff --git a/core/api/src/agent.rs b/core/api/src/agent.rs index bdf3e72..2f5a6bd 100644 --- a/core/api/src/agent.rs +++ b/core/api/src/agent.rs @@ -4,6 +4,17 @@ pub struct RequestActiveConnections { #[prost(string, optional, tag = "2")] pub pod_ip: ::core::option::Option<::prost::alloc::string::String>, } +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct ConnectionEvent { + #[prost(string, tag = "1")] + pub event_id: ::prost::alloc::string::String, + /// e.g., "192.168.1.1:8080" (src_ip:src_port) + #[prost(string, tag = "2")] + pub src_ip_port: ::prost::alloc::string::String, + /// e.g., "10.0.0.1:80" (dst_ip:dst_port) + #[prost(string, tag = "3")] + pub dst_ip_port: ::prost::alloc::string::String, +} /// TODO: the complete Response will be able to return all the context below /// /// * "Event Id: {} Protocol: {:?} SRC: {}:{} -> DST: {}:{}", @@ -12,12 +23,9 @@ pub struct RequestActiveConnections { pub struct ActiveConnectionResponse { #[prost(string, tag = "1")] pub status: ::prost::alloc::string::String, - /// for simplicity right now we only return event_id and src - #[prost(map = "string, string", tag = "2")] - pub events: ::std::collections::HashMap< - ::prost::alloc::string::String, - ::prost::alloc::string::String, - >, + /// List of connection events + #[prost(message, repeated, tag = "2")] + pub events: ::prost::alloc::vec::Vec, } /// Generated client implementations. pub mod agent_client { diff --git a/core/api/src/api.rs b/core/api/src/api.rs index cee3962..167b4d5 100644 --- a/core/api/src/api.rs +++ b/core/api/src/api.rs @@ -16,7 +16,7 @@ use tokio::sync::mpsc; use tokio::task; // * contains agent api configuration -use crate::agent::{agent_server::Agent, ActiveConnectionResponse, RequestActiveConnections}; +use crate::agent::{agent_server::Agent, ActiveConnectionResponse, RequestActiveConnections, ConnectionEvent}; use aya::maps::Map; use bytemuck_derive::Zeroable; use cortexflow_identity::enums::IpProtocols; @@ -38,19 +38,19 @@ unsafe impl aya::Pod for PacketLog {} pub struct AgentApi { //* event_rx is an istance of a mpsc receiver. //* is used to receive the data from the transmitter (tx) - event_rx: Mutex, Status>>>, - event_tx: mpsc::Sender, Status>>, + event_rx: Mutex, Status>>>, + event_tx: mpsc::Sender, Status>>, } //* Event sender trait. Takes an event from a map and send that to the mpsc channel //* using the send_map function #[async_trait] pub trait EventSender: Send + Sync + 'static { - async fn send_event(&self, event: HashMap); + async fn send_event(&self, event: Vec); async fn send_map( &self, - map: HashMap, - tx: mpsc::Sender, Status>>, + map: Vec, + tx: mpsc::Sender, Status>>, ) { let status = Status::new(tonic::Code::Ok, "success"); let event = Ok(map); @@ -58,10 +58,11 @@ pub trait EventSender: Send + Sync + 'static { let _ = tx.send(event).await; } } + // send event function. takes an HashMap and send that using mpsc event_tx #[async_trait] impl EventSender for AgentApi { - async fn send_event(&self, event: HashMap) { + async fn send_event(&self, event: Vec) { self.send_map(event, self.event_tx.clone()).await; } } @@ -130,17 +131,18 @@ impl Default for AgentApi { "Event Id: {} Protocol: {:?} SRC: {}:{} -> DST: {}:{}", event_id, proto, src, src_port, dst, dst_port ); - info!("creating hashmap for the aggregated data"); - let mut evt = HashMap::new(); - // insert event in the hashmap - info!("Inserting events into the hashmap"); + info!("creating vector for the aggregated data"); + let mut evt = Vec::new(); + // insert event in the vector + info!("Inserting events into the vector"); //TODO: use a Arc or Box type instead of String type. //The data doesn't need to implement any .copy() or .clone() trait // using an Arc type will also waste less resources - evt.insert( - format!("{:?}", event_id.to_string()), - format!("{:?}", src.to_string()), - ); + evt.push(ConnectionEvent { + event_id: event_id.to_string(), + src_ip_port: format!("{}:{}", src, src_port), + dst_ip_port: format!("{}:{}", dst, dst_port), + }); info!("sending events to the MPSC channel"); let _ = tx.send(Ok(evt)).await; } @@ -160,8 +162,12 @@ impl Default for AgentApi { } } else if events.read == 0 { info!("[Agent/API] 0 Events found"); - let mut evt = HashMap::new(); - evt.insert("0".to_string(), "0".to_string()); + let mut evt = Vec::new(); + evt.push(ConnectionEvent { + event_id: "0".to_string(), + src_ip_port: "0:0".to_string(), + dst_ip_port: "0:0".to_string(), + }); let _ = tx.send(Ok(evt)).await; } } @@ -192,12 +198,12 @@ impl Agent for AgentApi { let req = request.into_inner(); //create the hashmap to process events from the mpsc channel queue - let mut aggregated_events: HashMap = HashMap::new(); + let mut aggregated_events: Vec = Vec::new(); //aggregate events while let Ok(evt) = self.event_rx.lock().unwrap().try_recv() { - if let Ok(map) = evt { - aggregated_events.extend(map); + if let Ok(vec) = evt { + aggregated_events.extend(vec); } }