diff --git a/src/proto/graplinc/grapl/api/graph_mutation/v1beta1/graph_mutation.proto b/src/proto/graplinc/grapl/api/graph_mutation/v1beta1/graph_mutation.proto index 8aa3695840..3ab9d30c94 100644 --- a/src/proto/graplinc/grapl/api/graph_mutation/v1beta1/graph_mutation.proto +++ b/src/proto/graplinc/grapl/api/graph_mutation/v1beta1/graph_mutation.proto @@ -65,6 +65,9 @@ message CreateEdgeRequest { message CreateEdgeResponse { // Indicates whether or not the update was redundant, and therefor dropped MutationRedundancy mutation_redundancy = 1; + // The reverse edge name is looked up at create-edge process time, and it's + // useful to propagate it across the wire back to graph-merger. + graplinc.grapl.common.v1beta1.EdgeName reverse_edge_name = 2; } // DeleteEdgeRequest holds the information necessary to create a node diff --git a/src/rust/graph-merger/src/service.rs b/src/rust/graph-merger/src/service.rs index 1b23059968..4f7e59f475 100644 --- a/src/rust/graph-merger/src/service.rs +++ b/src/rust/graph-merger/src/service.rs @@ -165,13 +165,12 @@ impl GraphMerger { from_uid, edge_name, } = edge; + let forward_edge_name = EdgeName { value: edge_name }; let response = self .graph_mutation_client .create_edge(CreateEdgeRequest { tenant_id, - edge_name: EdgeName { - value: edge_name.clone(), - }, + edge_name: forward_edge_name.clone(), from_uid, to_uid, source_node_type: NodeType { @@ -194,12 +193,9 @@ impl GraphMerger { Update::Edge(EdgeUpdate { src_uid: from_uid, dst_uid: to_uid, - forward_edge_name: EdgeName { - value: edge_name.clone(), - }, - reverse_edge_name: EdgeName { - value: edge_name.clone(), - }, + forward_edge_name: forward_edge_name.clone(), + reverse_edge_name: create_edge_response + .reverse_edge_name, }), ))); } diff --git a/src/rust/graph-merger/tests/integration_test.rs b/src/rust/graph-merger/tests/integration_test.rs index 300cc5677b..aa59909df7 100644 --- a/src/rust/graph-merger/tests/integration_test.rs +++ b/src/rust/graph-merger/tests/integration_test.rs @@ -15,6 +15,7 @@ use rust_proto::graplinc::grapl::{ api::{ pipeline_ingress::v1beta1::PublishRawLogRequest, plugin_sdk::analyzers::v1beta1::messages::{ + EdgeUpdate, StringPropertyUpdate, UInt64PropertyUpdate, Update, @@ -159,5 +160,15 @@ async fn test_sysmon_event_produces_merged_graph(ctx: &mut E2eTestContext) -> ey "Expected process_name update: {process_name_update:?}" ); + let edge_update = updates.iter().find(|update| { + matches!(update.clone(), Update::Edge(EdgeUpdate {forward_edge_name, reverse_edge_name, ..}) if { + forward_edge_name.value == "children" && reverse_edge_name.value == "parent" + }) + }); + assert!( + edge_update.is_some(), + "Expected edge update: {edge_update:?}" + ); + Ok(()) } diff --git a/src/rust/graph-mutation/src/graph_mutation.rs b/src/rust/graph-mutation/src/graph_mutation.rs index fe74736fca..009624f231 100644 --- a/src/rust/graph-mutation/src/graph_mutation.rs +++ b/src/rust/graph-mutation/src/graph_mutation.rs @@ -625,14 +625,21 @@ impl GraphMutationApi for GraphMutationManager { .resolve_reverse_edge(tenant_id, source_node_type.clone(), edge_name.clone()) .await?; - self.upsert_edges(tenant_id, from_uid, to_uid, edge_name, reverse_edge_name) - .await?; + self.upsert_edges( + tenant_id, + from_uid, + to_uid, + edge_name, + reverse_edge_name.clone(), + ) + .await?; Ok(CreateEdgeResponse { // todo: At this point we don't track if the update was redundant // but it is always safe (albeit suboptimal) to assume that // it was not. mutation_redundancy: MutationRedundancy::Maybe, + reverse_edge_name, }) } } diff --git a/src/rust/rust-proto/src/graplinc/grapl/api/graph_mutation/v1beta1/messages.rs b/src/rust/rust-proto/src/graplinc/grapl/api/graph_mutation/v1beta1/messages.rs index bb0e4fb4dd..515fb4eb49 100644 --- a/src/rust/rust-proto/src/graplinc/grapl/api/graph_mutation/v1beta1/messages.rs +++ b/src/rust/rust-proto/src/graplinc/grapl/api/graph_mutation/v1beta1/messages.rs @@ -194,13 +194,21 @@ impl From for CreateEdgeRequestProto { #[derive(Debug, Clone, PartialEq)] pub struct CreateEdgeResponse { pub mutation_redundancy: MutationRedundancy, + pub reverse_edge_name: EdgeName, } impl TryFrom for CreateEdgeResponse { type Error = SerDeError; fn try_from(proto: CreateEdgeResponseProto) -> Result { + let mutation_redundancy = proto.mutation_redundancy().try_into()?; + let reverse_edge_name = proto + .reverse_edge_name + .ok_or(SerDeError::MissingField("reverse_edge_name"))? + .try_into()?; + Ok(Self { - mutation_redundancy: proto.mutation_redundancy().try_into()?, + mutation_redundancy, + reverse_edge_name, }) } } @@ -210,6 +218,7 @@ impl From for CreateEdgeResponseProto { let mutation_redundancy: MutationRedundancyProto = value.mutation_redundancy.into(); Self { mutation_redundancy: mutation_redundancy as i32, + reverse_edge_name: Some(value.reverse_edge_name.into()), } } }