diff --git a/ddtrace/appsec/_iast/taint_sinks/command_injection.py b/ddtrace/appsec/_iast/taint_sinks/command_injection.py index c95d99567c..ad98fd6286 100644 --- a/ddtrace/appsec/_iast/taint_sinks/command_injection.py +++ b/ddtrace/appsec/_iast/taint_sinks/command_injection.py @@ -10,6 +10,7 @@ from ..._constants import IAST_SPAN_TAGS from .. import oce +from .._metrics import _set_metric_iast_instrumented_sink from .._metrics import increment_iast_span_metric from ..constants import VULN_CMDI from ..processor import AppSecIastSpanProcessor @@ -39,6 +40,8 @@ def patch(): os._datadog_cmdi_patch = True subprocess._datadog_cmdi_patch = True + _set_metric_iast_instrumented_sink(VULN_CMDI) + if asm_config._ep_enabled: core.dispatch("exploit.prevention.ssrf.patch.urllib") diff --git a/tests/appsec/iast/test_telemetry.py b/tests/appsec/iast/test_telemetry.py index c36ac3ad44..460ba0467a 100644 --- a/tests/appsec/iast/test_telemetry.py +++ b/tests/appsec/iast/test_telemetry.py @@ -12,6 +12,7 @@ from ddtrace.appsec._iast._taint_tracking import OriginType from ddtrace.appsec._iast._taint_tracking import origin_to_str from ddtrace.appsec._iast._taint_tracking import taint_pyobject +from ddtrace.appsec._iast.taint_sinks.command_injection import patch as cmdi_patch from ddtrace.ext import SpanTypes from ddtrace.internal.telemetry.constants import TELEMETRY_NAMESPACE_TAG_IAST from ddtrace.internal.telemetry.constants import TELEMETRY_TYPE_GENERATE_METRICS @@ -75,6 +76,19 @@ def test_metric_executed_sink(no_request_sampling, telemetry_writer): assert span.get_metric(IAST_SPAN_TAGS.TELEMETRY_REQUEST_TAINTED) is None +def test_metric_instrumented_cmdi(no_request_sampling, telemetry_writer): + with override_env(dict(DD_IAST_TELEMETRY_VERBOSITY="INFORMATION")), override_global_config( + dict(_iast_enabled=True) + ): + cmdi_patch() + + metrics_result = telemetry_writer._namespace._metrics_data + generate_metrics = metrics_result[TELEMETRY_TYPE_GENERATE_METRICS][TELEMETRY_NAMESPACE_TAG_IAST] + assert [metric.name for metric in generate_metrics.values()] == ["instrumented.sink"] + assert [metric._tags for metric in generate_metrics.values()] == [(("vulnerability_type", "COMMAND_INJECTION"),)] + assert len(generate_metrics) == 1, "Expected 1 generate_metrics" + + def test_metric_instrumented_propagation(no_request_sampling, telemetry_writer): with override_env(dict(DD_IAST_TELEMETRY_VERBOSITY="INFORMATION")), override_global_config( dict(_iast_enabled=True)