Skip to content
This repository has been archived by the owner on Dec 26, 2022. It is now read-only.

Commit

Permalink
Fix bug where graph-merger updates have equal forward_edge_name/rever…
Browse files Browse the repository at this point in the history
…se edge_name. (#2100)

* Integration test exposing the issue

* Fixed the bug
  • Loading branch information
wimax-grapl committed Nov 11, 2022
1 parent d9d2e2d commit 64c3d36
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 12 deletions.
Expand Up @@ -65,6 +65,9 @@ message CreateEdgeRequest {
message CreateEdgeResponse {
// Indicates whether or not the update was redundant, and therefor dropped
MutationRedundancy mutation_redundancy = 1;
// The reverse edge name is looked up at create-edge process time, and it's
// useful to propagate it across the wire back to graph-merger.
graplinc.grapl.common.v1beta1.EdgeName reverse_edge_name = 2;
}

// DeleteEdgeRequest holds the information necessary to create a node
Expand Down
14 changes: 5 additions & 9 deletions src/rust/graph-merger/src/service.rs
Expand Up @@ -165,13 +165,12 @@ impl GraphMerger {
from_uid,
edge_name,
} = edge;
let forward_edge_name = EdgeName { value: edge_name };
let response = self
.graph_mutation_client
.create_edge(CreateEdgeRequest {
tenant_id,
edge_name: EdgeName {
value: edge_name.clone(),
},
edge_name: forward_edge_name.clone(),
from_uid,
to_uid,
source_node_type: NodeType {
Expand All @@ -194,12 +193,9 @@ impl GraphMerger {
Update::Edge(EdgeUpdate {
src_uid: from_uid,
dst_uid: to_uid,
forward_edge_name: EdgeName {
value: edge_name.clone(),
},
reverse_edge_name: EdgeName {
value: edge_name.clone(),
},
forward_edge_name: forward_edge_name.clone(),
reverse_edge_name: create_edge_response
.reverse_edge_name,
}),
)));
}
Expand Down
11 changes: 11 additions & 0 deletions src/rust/graph-merger/tests/integration_test.rs
Expand Up @@ -15,6 +15,7 @@ use rust_proto::graplinc::grapl::{
api::{
pipeline_ingress::v1beta1::PublishRawLogRequest,
plugin_sdk::analyzers::v1beta1::messages::{
EdgeUpdate,
StringPropertyUpdate,
UInt64PropertyUpdate,
Update,
Expand Down Expand Up @@ -159,5 +160,15 @@ async fn test_sysmon_event_produces_merged_graph(ctx: &mut E2eTestContext) -> ey
"Expected process_name update: {process_name_update:?}"
);

let edge_update = updates.iter().find(|update| {
matches!(update.clone(), Update::Edge(EdgeUpdate {forward_edge_name, reverse_edge_name, ..}) if {
forward_edge_name.value == "children" && reverse_edge_name.value == "parent"
})
});
assert!(
edge_update.is_some(),
"Expected edge update: {edge_update:?}"
);

Ok(())
}
11 changes: 9 additions & 2 deletions src/rust/graph-mutation/src/graph_mutation.rs
Expand Up @@ -625,14 +625,21 @@ impl GraphMutationApi for GraphMutationManager {
.resolve_reverse_edge(tenant_id, source_node_type.clone(), edge_name.clone())
.await?;

self.upsert_edges(tenant_id, from_uid, to_uid, edge_name, reverse_edge_name)
.await?;
self.upsert_edges(
tenant_id,
from_uid,
to_uid,
edge_name,
reverse_edge_name.clone(),
)
.await?;

Ok(CreateEdgeResponse {
// todo: At this point we don't track if the update was redundant
// but it is always safe (albeit suboptimal) to assume that
// it was not.
mutation_redundancy: MutationRedundancy::Maybe,
reverse_edge_name,
})
}
}
Expand Up @@ -194,13 +194,21 @@ impl From<CreateEdgeRequest> for CreateEdgeRequestProto {
#[derive(Debug, Clone, PartialEq)]
pub struct CreateEdgeResponse {
pub mutation_redundancy: MutationRedundancy,
pub reverse_edge_name: EdgeName,
}

impl TryFrom<CreateEdgeResponseProto> for CreateEdgeResponse {
type Error = SerDeError;
fn try_from(proto: CreateEdgeResponseProto) -> Result<Self, Self::Error> {
let mutation_redundancy = proto.mutation_redundancy().try_into()?;
let reverse_edge_name = proto
.reverse_edge_name
.ok_or(SerDeError::MissingField("reverse_edge_name"))?
.try_into()?;

Ok(Self {
mutation_redundancy: proto.mutation_redundancy().try_into()?,
mutation_redundancy,
reverse_edge_name,
})
}
}
Expand All @@ -210,6 +218,7 @@ impl From<CreateEdgeResponse> for CreateEdgeResponseProto {
let mutation_redundancy: MutationRedundancyProto = value.mutation_redundancy.into();
Self {
mutation_redundancy: mutation_redundancy as i32,
reverse_edge_name: Some(value.reverse_edge_name.into()),
}
}
}
Expand Down

0 comments on commit 64c3d36

Please sign in to comment.