Skip to content
This repository has been archived by the owner on Dec 26, 2022. It is now read-only.

Commit

Permalink
Add a simpler analyzer, which is also now successfully emitting Execu…
Browse files Browse the repository at this point in the history
…tionHits! (#2093)

* Add a simpler analyzer

* fix the graph query proxy in hax_docker_analyzer

* timestamp in logger

* Okay i have exceptiosn printing out yay

* oh snap i have execution hits!

* cleanup

* formatting

* one simplification

* clean up further

* fix lint
  • Loading branch information
wimax-grapl committed Nov 8, 2022
1 parent 8ebbe8a commit e87fbd0
Show file tree
Hide file tree
Showing 14 changed files with 169 additions and 17 deletions.
6 changes: 6 additions & 0 deletions src/python/grapl-common/grapl_common/logger.py
Expand Up @@ -3,6 +3,8 @@

import structlog

Structlogger = structlog.stdlib.BoundLogger


def get_structlogger() -> structlog.stdlib.BoundLogger:
log_level_name = os.environ["GRAPL_LOG_LEVEL"] # e.g. "DEBUG"
Expand All @@ -11,6 +13,10 @@ def get_structlogger() -> structlog.stdlib.BoundLogger:
processors=[
# include {"level": "INFO"} in the dict
structlog.processors.add_log_level,
# include timestamp in the dict
structlog.processors.TimeStamper(fmt="iso"),
# specify `logger.error(stack_info = True)` to get the stacktrace
structlog.processors.StackInfoRenderer(),
# Output as JSON
structlog.processors.JSONRenderer(),
],
Expand Down
4 changes: 3 additions & 1 deletion src/python/grapl-common/grapl_common/retry.py
Expand Up @@ -3,12 +3,14 @@
from functools import wraps
from typing import Any, Callable, TypeVar, cast

from grapl_common.logger import Structlogger

F = TypeVar("F", bound=Callable)


def retry(
exception_cls: type[Exception],
logger: logging.Logger,
logger: logging.Logger | Structlogger,
on_falsey: bool = False,
tries: int = 3,
delay: float = 0.5,
Expand Down
7 changes: 7 additions & 0 deletions src/python/grapl-plugin-sdk/example_analyzers/BUILD
Expand Up @@ -8,3 +8,10 @@ pex_binary(
entry_point="suspicious_svchost.py:main",
tags=["test-fixture-pex"],
)

pex_binary(
name="process_named_svchost",
output_path="process_named_svchost_analyzer.pex", # where it ends up in dist/
entry_point="process_named_svchost.py:main",
tags=["test-fixture-pex"],
)
@@ -0,0 +1,81 @@
"""
This is meant as an extremely simple Analyzer to get the pipeline to fire
during integration tests.
"""
from datetime import datetime

from grapl_plugin_sdk.analyzer.analyzer import (
Analyzer,
AnalyzerContext,
AnalyzerServiceConfig,
serve_analyzer,
)
from grapl_plugin_sdk.analyzer.query_and_views import NodeQuery, NodeView
from python_proto.api.graph_query.v1beta1.messages import (
NodePropertyQuery,
StringFilter,
StringOperation,
)
from python_proto.api.plugin_sdk.analyzers.v1beta1.messages import (
AnalyzerName,
ExecutionHit,
)
from python_proto.common import Timestamp
from python_proto.grapl.common.v1beta1.messages import NodeType, PropertyName


class ProcessNamedSvchost(Analyzer):
@staticmethod
def query() -> NodeQuery:
# Describes a Process where `process_name` = `svchost.exe`
node_query = NodeQuery(
NodePropertyQuery(node_type=NodeType(value="Process"))
).with_string_filters(
property_name=PropertyName(value="process_name"),
filters=[
StringFilter(
operation=StringOperation.EQUAL,
value="svchost.exe",
negated=False,
)
],
)

return node_query

async def analyze(
self, matched: NodeView, ctx: AnalyzerContext
) -> ExecutionHit | None:
print(f"analyze() was called: {matched}")
return ExecutionHit(
graph_view=matched.graph,
lens_refs=[],
idempotency_key=12345, # ???
time_of_match=Timestamp.from_datetime(datetime.utcnow()),
score=100,
# implies the return type here should not be the pure python-proto type
# https://github.com/grapl-security/issue-tracker/issues/1032
analyzer_name=AnalyzerName(
"TODO: This should be set by AnalyzerServiceImpl"
),
)

async def add_context(self, matched: NodeView, ctx: AnalyzerContext) -> None:
pass


def main() -> None:
"""
main() is invoked by the pex_binary() entrypoint=
"""
analyzer = ProcessNamedSvchost()
# Perhaps `serve_analyzer` should just take `(analyzer=analyzer)`?
# We shouldn't pass on the `AnalyzerServiceConfig` to the consumer, right?
# https://github.com/grapl-security/issue-tracker/issues/1032
serve_analyzer(
analyzer_name=AnalyzerName(
value="suspicious_svchost"
), # Why is this configured here?
analyzer=analyzer,
service_config=AnalyzerServiceConfig.from_env(),
)
Expand Up @@ -81,6 +81,7 @@ async def analyze(
time_of_match=Timestamp.from_datetime(datetime.utcnow()),
score=100,
# implies the return type here should not be the pure python-proto type
# https://github.com/grapl-security/issue-tracker/issues/1032
analyzer_name=AnalyzerName(
"TODO: This should be set by AnalyzerServiceImpl"
),
Expand All @@ -97,6 +98,7 @@ def main() -> None:
analyzer = SuspiciousSvchostAnalyzer()
# Perhaps `serve_analyzer` should just take `(analyzer=analyzer)`?
# We shouldn't pass on the `AnalyzerServiceConfig` to the consumer, right?
# https://github.com/grapl-security/issue-tracker/issues/1032
serve_analyzer(
analyzer_name=AnalyzerName(
value="suspicious_svchost"
Expand Down
Expand Up @@ -79,7 +79,7 @@ def graph_query_proxy_address() -> str:
plugin_id = os.environ["PLUGIN_ID"]
upstream_addr_env_var = f"NOMAD_UPSTREAM_ADDR_graph-query-proxy-{plugin_id}"
upstream_addr = os.environ[upstream_addr_env_var]
return f"http://{upstream_addr}"
return upstream_addr


def serve_analyzer(
Expand Down
Expand Up @@ -5,10 +5,10 @@
from dataclasses import dataclass, field
from datetime import datetime
from typing import TYPE_CHECKING, Final, Protocol, runtime_checkable
from uuid import UUID
from uuid import UUID, uuid4

import grpc
from grapl_common.logger import get_structlogger
from grapl_common.logger import Structlogger, get_structlogger
from grapl_plugin_sdk.analyzer.analyzer_context import AnalyzerContext
from grapl_plugin_sdk.analyzer.query_and_views import NodeView
from grpc import aio as grpc_aio # type: ignore
Expand Down Expand Up @@ -75,9 +75,18 @@ async def run_analyzer(
request: analyzer_messages.RunAnalyzerRequest,
context: grpc_aio.ServicerContext,
) -> analyzer_messages.RunAnalyzerResponse:

# TODO: Extract a Request ID from context.invocation_metadata()
request_id = uuid4()
logger = LOGGER.bind(
request_id=str(request_id),
)
logger.debug("run_analyzer on request", request=request)

try:
return await self._run_analyzer_inner(request)
return await self._run_analyzer_inner(request, logger)
except Exception as e:
logger.error("run_analyzer failed", error=str(e))
details = f"error_as_grpc_abort exception: {str(e)}"
code = grpc.StatusCode.UNKNOWN
await context.abort(
Expand All @@ -87,13 +96,10 @@ async def run_analyzer(
raise AssertionError("not reachable")

async def _run_analyzer_inner(
self, request: analyzer_messages.RunAnalyzerRequest
self, request: analyzer_messages.RunAnalyzerRequest, logger: Structlogger
) -> analyzer_messages.RunAnalyzerResponse:
# structlog context
logger = LOGGER.bind(update_type=type(request.update.inner).__name__)
match request.update.inner:
case PropertyUpdate() as prop_update:
logger.debug("PropertyUpdate")
# optimization
# i.e. if the update is for process_name, and you're not querying for
# process_name, that's obviously a miss
Expand Down Expand Up @@ -142,7 +148,6 @@ async def _run_analyzer_inner(
tenant_id=TENANT_ID,
)

# todo: Add a timeout here
execution_hit: analyzer_messages.ExecutionHit | None = await analyzer.analyze(
root_node, ctx
)
Expand Down
Expand Up @@ -3,6 +3,8 @@
from dataclasses import dataclass

import grpc
from grapl_common.logger import get_structlogger
from grapl_common.retry import retry
from graplinc.grapl.api.graph_query_proxy.v1beta1.graph_query_proxy_pb2_grpc import (
GraphQueryProxyServiceStub,
)
Expand All @@ -16,6 +18,8 @@
from python_proto.client import Connectable, GrpcClientConfig
from python_proto.grapl.common.v1beta1.messages import Uid

LOGGER = get_structlogger()


@dataclass(frozen=True, slots=True)
class GraphQueryProxyClient(Connectable):
Expand All @@ -31,6 +35,10 @@ def connect(cls, client_config: GrpcClientConfig) -> GraphQueryProxyClient:

return cls(proto_client=stub, client_config=client_config)

@retry(
Exception,
logger=LOGGER,
)
def query_with_uid(
self,
node_uid: Uid,
Expand All @@ -40,9 +48,15 @@ def query_with_uid(
node_uid=node_uid,
graph_query=graph_query,
)
proto_response = self.proto_client.QueryGraphWithUid(request.into_proto())
proto_response = self.proto_client.QueryGraphWithUid(
request.into_proto(), timeout=5
)
return QueryGraphWithUidResponse.from_proto(proto_response)

@retry(
Exception,
logger=LOGGER,
)
def query_from_uid(
self,
node_uid: Uid,
Expand All @@ -52,5 +66,7 @@ def query_from_uid(
node_uid=node_uid,
graph_query=graph_query,
)
proto_response = self.proto_client.QueryGraphFromUid(request.into_proto())
proto_response = self.proto_client.QueryGraphFromUid(
request.into_proto(), timeout=5
)
return QueryGraphFromUidResponse.from_proto(proto_response)
Expand Up @@ -41,7 +41,6 @@ async def RunAnalyzer(
context: grpc.aio.ServicerContext,
) -> proto.RunAnalyzerResponse:
native_request = native.RunAnalyzerRequest.from_proto(proto_request)
LOGGER.info("Running analyzer")
LOGGER.debug("Analyzer request:", request=native_request)
native_response = await self.analyzer_service_impl.run_analyzer(
native_request, context
Expand Down
8 changes: 7 additions & 1 deletion src/rust/Dockerfile
Expand Up @@ -178,6 +178,10 @@ sysmon-generator/integration_tests
uid-allocator/integration_tests
)

INTEGRATION_TEST_FEATURES=(
e2e-tests/integration_tests
)

# cargo --features expects a comma-separated feature1,feature2,feature3
function join_by { local IFS="$1"; shift; echo "$*"; }
join_by , "${INTEGRATION_TEST_FEATURES[@]}" > integration_test_features.csv
Expand Down Expand Up @@ -250,10 +254,12 @@ COPY --from=etc-ctx example_schemas /test-fixtures/example_schemas
# hadolint ignore=DL3022
COPY --from=etc-ctx sample_data/36_eventlog.xml /test-fixtures

# Copy in an example Analyzer, suspicious_svchost, for integration testing.
# Copy in an example Analyzers, for integration testing.
# hadolint-ignore reason: https://github.com/hadolint/hadolint/issues/830
# hadolint ignore=DL3022
COPY --from=dist-ctx suspicious_svchost_analyzer.pex /test-fixtures
# hadolint ignore=DL3022
COPY --from=dist-ctx process_named_svchost_analyzer.pex /test-fixtures

# Copy in the example generator so we can deploy it in test_deploy_plugin
COPY --from=build /outputs/example-generator /test-fixtures
Expand Down
2 changes: 1 addition & 1 deletion src/rust/e2e-tests/tests/sysmon_log_e2e_test.rs
Expand Up @@ -63,7 +63,7 @@ async fn test_sysmon_log_e2e(ctx: &mut E2eTestContext) -> eyre::Result<()> {
.expect("failed to setup the sysmon-generator");

let analyzer_plugin_id = ctx
.setup_suspicious_svchost_analyzer(tenant_id, test_name)
.setup_process_named_svchost_analyzer(tenant_id, test_name)
.await?;

tracing::info!(">> Waiting for Generator and Analyzer to report healthy.");
Expand Down
13 changes: 13 additions & 0 deletions src/rust/integration-test-utils/src/context.rs
Expand Up @@ -267,6 +267,19 @@ impl E2eTestContext {
Ok(analyzer_plugin_id)
}

pub async fn setup_process_named_svchost_analyzer(
&mut self,
tenant_id: uuid::Uuid,
test_name: &str,
) -> eyre::Result<uuid::Uuid> {
let analyzer_artifact = test_fixtures::get_process_named_svchost_analyzer()?;
let analyzer_plugin_id = self
.create_analyzer(tenant_id, test_name.to_owned(), analyzer_artifact)
.await?;
self.deploy_analyzer(analyzer_plugin_id).await?;
Ok(analyzer_plugin_id)
}

pub async fn setup_sysmon_generator(
&mut self,
tenant_id: uuid::Uuid,
Expand Down
4 changes: 4 additions & 0 deletions src/rust/integration-test-utils/src/test_fixtures.rs
Expand Up @@ -19,6 +19,10 @@ pub fn get_suspicious_svchost_analyzer() -> Result<Bytes, std::io::Error> {
std::fs::read("/test-fixtures/suspicious_svchost_analyzer.pex").map(Bytes::from)
}

pub fn get_process_named_svchost_analyzer() -> Result<Bytes, std::io::Error> {
std::fs::read("/test-fixtures/process_named_svchost_analyzer.pex").map(Bytes::from)
}

pub fn single_sysmon_event() -> Bytes {
r#"
<Event xmlns="http://schemas.microsoft.com/win/2004/08/events/event">
Expand Down
Expand Up @@ -204,8 +204,19 @@ job "grapl-plugin" {

env {
GRAPH_QUERY_PROXY_BIND_ADDRESS = "0.0.0.0:${NOMAD_PORT_graph-query-proxy}"
GRAPH_QUERY_CLIENT_ADDRESS = "http://${NOMAD_UPSTREAM_ADDR_graph-query}"
TENANT_ID = var.tenant_id

GRAPH_QUERY_CLIENT_ADDRESS = "http://${NOMAD_UPSTREAM_ADDR_graph-query}"
GRAPH_QUERY_CLIENT_REQUEST_TIMEOUT = "5s"
GRAPH_QUERY_CLIENT_EXECUTOR_TIMEOUT = "5s"
GRAPH_QUERY_CLIENT_CONCURRENCY_LIMIT = 16
GRAPH_QUERY_CLIENT_INITIAL_BACKOFF_DELAY = "10ms"
GRAPH_QUERY_CLIENT_MAXIMUM_BACKOFF_DELAY = "5s"
GRAPH_QUERY_CLIENT_CONNECT_TIMEOUT = "5s"
GRAPH_QUERY_CLIENT_CONNECT_RETRIES = 10
GRAPH_QUERY_CLIENT_CONNECT_INITIAL_BACKOFF_DELAY = "1s"
GRAPH_QUERY_CLIENT_CONNECT_MAXIMUM_BACKOFF_DELAY = "60s"

TENANT_ID = var.tenant_id

RUST_LOG = var.rust_log
RUST_BACKTRACE = 1
Expand Down

0 comments on commit e87fbd0

Please sign in to comment.