Skip to content
This repository has been archived by the owner on Dec 26, 2022. It is now read-only.

Commit

Permalink
Analyzers now emit their ExecutionHit back to PWQ and then to a Kafka…
Browse files Browse the repository at this point in the history
… topic. Test this under E2E. (#2096)

* it's all right except theres two separate ExecutionHit types

* ugh, getting into Partialeq garbage

just comment out stuff lmao

remove some of the old ExecutionHit

lets test this

redact noise from headers

fix the enums for StringFilter stuff in rust

format

update e2e test it should only have one

format

* readd the test for requests

* spruce up unit tests

* clean up docs

* add a comment
  • Loading branch information
wimax-grapl committed Nov 10, 2022
1 parent 737039a commit d9d2e2d
Show file tree
Hide file tree
Showing 21 changed files with 276 additions and 261 deletions.
2 changes: 1 addition & 1 deletion nomad/grapl-core.nomad
Expand Up @@ -1015,7 +1015,7 @@ job "grapl-core" {
KAFKA_SASL_PASSWORD = var.kafka_credentials["plugin-work-queue"].sasl_password

GENERATOR_KAFKA_PRODUCER_TOPIC = "generated-graphs"
# ANALYZER_KAFKA_PRODUCER_TOPIC = "TODO"
ANALYZER_KAFKA_PRODUCER_TOPIC = "analyzer-executions"

# common Rust env vars
RUST_BACKTRACE = local.rust_backtrace
Expand Down
16 changes: 0 additions & 16 deletions src/proto/graplinc/grapl/api/graph/v1beta1/types.proto
Expand Up @@ -200,19 +200,3 @@ message Lens {
// score (absent prior to engagement-creator)
optional uint64 score = 4;
}

// Result computed by an analyzer when it makes a detection.
message ExecutionHit {
// The mapping of node_key to the identified Node
map<uint64, graplinc.grapl.api.graph.v1beta1.IdentifiedNode> nodes = 1;
// The mapping of node_key to the EdgeLists
map<uint64, graplinc.grapl.api.graph.v1beta1.IdentifiedEdgeList> edges = 2;
// The name of the analyzer which made the detection
string analyzer_name = 3;
// The risk score associated with this detection
uint64 risk_score = 4;
// Fragmentary lenses (only have type and name set)
repeated Lens lenses = 5;
// The nodes which contributed to the risk score
repeated string risky_node_keys = 6;
}
Expand Up @@ -73,6 +73,7 @@ message AnalyzerName {
string value = 1;
}

// TODO: This should be turned into a Pipeline message in the future.
// An ExecutionHit represents the output of an Analyzer, including
message ExecutionHit {
// The graph that the Analyzer matched against as well as any context
Expand Down
Expand Up @@ -4,6 +4,7 @@ package graplinc.grapl.api.plugin_work_queue.v1beta1;

import "graplinc/common/v1beta1/types.proto";
import "graplinc/grapl/api/graph/v1beta1/types.proto";
import "graplinc/grapl/api/plugin_sdk/analyzers/v1beta1/analyzers.proto";

// A job for a given plugin, for a given tenant, to be executed against `data`
message ExecutionJob {
Expand Down Expand Up @@ -106,9 +107,8 @@ message AcknowledgeGeneratorResponse {}
message AcknowledgeAnalyzerRequest {
// The request_id of the job that has been completed
int64 request_id = 1;
// 'true' if success, 'false' if failure
// TODO: This will likely become an `optional GeneratedMessage` field like AcknowledgeGeneratorRequest
bool success = 2;
// Some(ExecutionResult) if analyzer ran successfully, None if it failed.
optional graplinc.grapl.api.plugin_sdk.analyzers.v1beta1.ExecutionResult execution_result = 2;
// The plugin id of the plugin that completed the request
graplinc.common.v1beta1.Uuid plugin_id = 3;
// The tenant the plugin belongs to
Expand Down
Expand Up @@ -69,6 +69,7 @@ class AnalyzerServiceImpl:

def __post_init__(self) -> None:
self._graph_query = self._analyzer.query().into_graph_query()
LOGGER.info("Graph query", graph_query=str(self._graph_query))

async def run_analyzer(
self,
Expand Down
2 changes: 1 addition & 1 deletion src/python/python-proto/README.md
Expand Up @@ -5,4 +5,4 @@ them, you can use `./pants export-codegen ::`, which will write all codegen to
dist/codegen.

Alternatively, you can explore it with a repl, with
`./pants --no-pantsd repl --shell=ipython src/python/python-proto/python_proto`
`./pants --no-pantsd repl --shell=ipython src/python/python-proto/python_proto::`
45 changes: 0 additions & 45 deletions src/python/python-proto/python_proto/api/graph/v1beta1/messages.py
Expand Up @@ -597,48 +597,3 @@ def into_proto(self) -> proto.Lens:
if self.score is not None:
proto_lens.score = self.score
return proto_lens


@dataclasses.dataclass(
frozen=True,
slots=True,
)
class ExecutionHit(SerDe[proto.ExecutionHit]):
nodes: Mapping[Uid, IdentifiedNode]
edges: Mapping[Uid, IdentifiedEdgeList]
analyzer_name: str
risk_score: int
lenses: Sequence[Lens]
risky_node_keys: Sequence[str]
_proto_cls = proto.ExecutionHit

@classmethod
def from_proto(cls, proto_execution_hit: proto.ExecutionHit) -> ExecutionHit:
return ExecutionHit(
nodes={
Uid(k): IdentifiedNode.from_proto(v)
for k, v in proto_execution_hit.nodes.items()
},
edges={
Uid(k): IdentifiedEdgeList.from_proto(v)
for k, v in proto_execution_hit.edges.items()
},
analyzer_name=proto_execution_hit.analyzer_name,
risk_score=proto_execution_hit.risk_score,
lenses=[Lens.from_proto(l) for l in proto_execution_hit.lenses],
risky_node_keys=proto_execution_hit.risky_node_keys,
)

def into_proto(self) -> proto.ExecutionHit:
proto_execution_hit = proto.ExecutionHit()
for k1, v1 in self.nodes.items():
proto_execution_hit.nodes[k1.value].CopyFrom(v1.into_proto())
for k2, v2 in self.edges.items():
proto_execution_hit.edges[k2.value].CopyFrom(v2.into_proto())
proto_execution_hit.analyzer_name = self.analyzer_name
proto_execution_hit.risk_score = self.risk_score
for lens in self.lenses:
proto_execution_hit.lenses.append(lens.into_proto())
for risky_node_key in self.risky_node_keys:
proto_execution_hit.risky_node_keys.append(risky_node_key)
return proto_execution_hit
28 changes: 0 additions & 28 deletions src/python/python-proto/python_proto/tests/strategies.py
Expand Up @@ -8,7 +8,6 @@
DecrementOnlyUintProp,
Edge,
EdgeList,
ExecutionHit,
GraphDescription,
IdentifiedEdge,
IdentifiedEdgeList,
Expand Down Expand Up @@ -400,33 +399,6 @@ def lenses(
)


def execution_hits(
nodes: st.SearchStrategy[Mapping[Uid, IdentifiedNode]] = st.dictionaries(
keys=uids(), values=identified_nodes(), max_size=MAX_DICT_SIZE
),
edges: st.SearchStrategy[Mapping[Uid, IdentifiedEdgeList]] = st.dictionaries(
keys=uids(), values=identified_edge_lists(), max_size=MAX_DICT_SIZE
),
analyzer_names: st.SearchStrategy[str] = st.text(),
risk_scores: st.SearchStrategy[int] = uint64s,
lenses: st.SearchStrategy[Sequence[Lens]] = st.lists(
lenses(), max_size=MAX_LIST_SIZE
),
risky_node_keys: st.SearchStrategy[Sequence[str]] = st.lists(
st.text(), max_size=MAX_LIST_SIZE
),
) -> st.SearchStrategy[ExecutionHit]:
return st.builds(
ExecutionHit,
nodes=nodes,
edges=edges,
analyzer_name=analyzer_names,
risk_score=risk_scores,
lenses=lenses,
risky_node_keys=risky_node_keys,
)


#
# metrics
#
Expand Down
5 changes: 0 additions & 5 deletions src/python/python-proto/python_proto/tests/test_api_graph.py
Expand Up @@ -8,7 +8,6 @@
decrement_only_uint_props,
edge_lists,
edges,
execution_hits,
graph_descriptions,
id_strategies,
identified_graphs,
Expand Down Expand Up @@ -42,10 +41,6 @@ def test_edge_encode_decode() -> None:
check_encode_decode_invariant(edges())


def test_execution_hit_encode_decode() -> None:
check_encode_decode_invariant(execution_hits())


def test_graph_description_encode_decode() -> None:
check_encode_decode_invariant(graph_descriptions())

Expand Down
46 changes: 46 additions & 0 deletions src/rust/e2e-tests/tests/sysmon_log_e2e_test.rs
Expand Up @@ -29,6 +29,7 @@ use rust_proto::graplinc::grapl::{
pipeline_ingress::v1beta1::PublishRawLogRequest,
plugin_registry::v1beta1::PluginHealthStatus,
plugin_sdk::analyzers::v1beta1::messages::{
ExecutionHit,
StringPropertyUpdate,
UInt64PropertyUpdate,
Update,
Expand Down Expand Up @@ -140,6 +141,23 @@ async fn test_sysmon_log_e2e(ctx: &mut E2eTestContext) -> eyre::Result<()> {
))
.await;

let execution_hits_scanner_handle = KafkaTopicScanner::new(
ConsumerConfig::with_topic("analyzer-executions"),
Duration::from_secs(60),
Envelope::new(
Uuid::new_v4(),
Uuid::new_v4(),
Uuid::new_v4(),
ExecutionHit::default(),
),
)
.scan_for_tenant(tenant_id, 1, |_: ExecutionHit| true)
.instrument(tracing::span!(
tracing::Level::INFO,
"execution_hits_scanner_handle"
))
.await;

// Sometimes we find 40 or 41 messages; for tability we'll just let this
// time out instead of cutting it off early at 36 or 40.
// Why does it not equal 36? Not really sure! But graph-merger is being
Expand Down Expand Up @@ -279,5 +297,33 @@ async fn test_sysmon_log_e2e(ctx: &mut E2eTestContext) -> eyre::Result<()> {
assert!(msg.is_some());
}

tracing::info!(">> Test: `analyzer` emits ExecutionHits to `analyzer-executions` topic");
{
let execution_hits: Vec<ExecutionHit> = execution_hits_scanner_handle
.await
.expect("failed to configure execution hits scanner")
.into_iter()
.map(|env| env.inner_message())
.collect();

assert!(
execution_hits.len() == 1,
"Expected one execution hit, got: {execution_hits:?}"
);
let matching_nodes = execution_hits[0].graph_view.get_nodes();
let has_nodes_for_svchost_exe = matching_nodes.values().into_iter().any(|npv| {
let expected_node_type = "Process".to_owned();
let expected_process_name = "svchost.exe".to_owned();
let process_name_prop = PropertyName::new_unchecked("process_name".to_owned());
(npv.node_type.value == expected_node_type)
&& (npv.string_properties.prop_map.get(&process_name_prop)
== Some(&expected_process_name))
});
assert!(
has_nodes_for_svchost_exe,
"Expected the ExecutionHit to contain svchost.exe: {execution_hits:?}"
);
}

Ok(())
}
Expand Up @@ -131,7 +131,7 @@ impl PluginWorkProcessor for AnalyzerWorkProcessor {
let execution_hit = process_result.ok();
let ack_request = AcknowledgeAnalyzerRequest::new(
request_id,
execution_hit.is_some(),
execution_hit,
plugin_id,
tenant_id,
trace_id,
Expand Down
1 change: 1 addition & 0 deletions src/rust/plugin-work-queue/src/lib.rs
Expand Up @@ -14,6 +14,7 @@ pub struct ConfigUnion {
pub service_config: PluginWorkQueueServiceConfig,
pub db_config: PluginWorkQueueDbConfig,
pub generator_producer_config: ProducerConfig,
pub analyzer_producer_config: ProducerConfig,
}

#[derive(clap::Parser, Clone, Debug)]
Expand Down
4 changes: 3 additions & 1 deletion src/rust/plugin-work-queue/src/main.rs
Expand Up @@ -16,11 +16,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let db_config = PluginWorkQueueDbConfig::parse();
let generator_producer_config =
ProducerConfig::with_topic_env_var("GENERATOR_KAFKA_PRODUCER_TOPIC");
// TODO let analyzer_producer_config = ...
let analyzer_producer_config =
ProducerConfig::with_topic_env_var("ANALYZER_KAFKA_PRODUCER_TOPIC");
exec_service(ConfigUnion {
service_config,
db_config,
generator_producer_config,
analyzer_producer_config,
})
.await?;
Ok(())
Expand Down
32 changes: 26 additions & 6 deletions src/rust/plugin-work-queue/src/server.rs
Expand Up @@ -9,6 +9,10 @@ use rust_proto::{
graplinc::grapl::{
api::{
graph::v1beta1::GraphDescription,
plugin_sdk::analyzers::v1beta1::messages::{
ExecutionHit,
ExecutionResult,
},
plugin_work_queue::{
v1beta1,
v1beta1::{
Expand Down Expand Up @@ -73,15 +77,18 @@ impl From<PluginWorkQueueError> for Status {
pub struct PluginWorkQueue {
queue: PsqlQueue,
generator_producer: Producer<GraphDescription>,
analyzer_producer: Producer<ExecutionHit>,
}

impl PluginWorkQueue {
pub async fn try_from(configs: &ConfigUnion) -> Result<Self, PluginWorkQueueInitError> {
let psql_queue = PsqlQueue::init_with_config(configs.db_config.clone()).await?;
let generator_producer = Producer::new(configs.generator_producer_config.clone())?;
let analyzer_producer = Producer::new(configs.analyzer_producer_config.clone())?;
Ok(Self {
queue: psql_queue,
generator_producer,
analyzer_producer,
})
}
}
Expand Down Expand Up @@ -284,10 +291,25 @@ impl PluginWorkQueueApi for PluginWorkQueue {
let trace_id = request.trace_id();
let event_source_id = request.event_source_id();
let plugin_id = request.plugin_id();
let request_id = request.request_id().into();

let status = match request.execution_result() {
Some(ExecutionResult::ExecutionHit(hit)) => {
tracing::debug!(
message = "publishing analyzer execution hit",
tenant_id =% tenant_id,
trace_id =% trace_id,
event_source_id =% event_source_id,
plugin_id =% plugin_id,
);

let status = match request.success() {
true => psql_queue::Status::Processed,
false => psql_queue::Status::Failed,
self.analyzer_producer
.send(Envelope::new(tenant_id, trace_id, event_source_id, hit))
.await?;
psql_queue::Status::Processed
}
Some(ExecutionResult::ExecutionMiss(_)) => psql_queue::Status::Processed,
None => psql_queue::Status::Failed,
};

tracing::debug!(
Expand All @@ -299,9 +321,7 @@ impl PluginWorkQueueApi for PluginWorkQueue {
status =? status,
);

self.queue
.ack_analyzer(request.request_id().into(), status)
.await?;
self.queue.ack_analyzer(request_id, status).await?;
Ok(v1beta1::AcknowledgeAnalyzerResponse {})
}

Expand Down

0 comments on commit d9d2e2d

Please sign in to comment.