Skip to content

Fix rust helper double logging as error broken connections#3792

Merged
cataphract merged 2 commits intomasterfrom
glopes/rust-helper-write-disconnect
Apr 16, 2026
Merged

Fix rust helper double logging as error broken connections#3792
cataphract merged 2 commits intomasterfrom
glopes/rust-helper-write-disconnect

Conversation

@cataphract
Copy link
Copy Markdown
Contributor

@cataphract cataphract commented Apr 15, 2026

The special ForcefulDisconnect error was only being used in read, not write paths, resulting in forceful disconnects (broken pipe, connection reset) being logged as errors and sent via telemetry.

There was another problem in that these were being double-logged.

Fix both problems. Tested with:

diff --git a/appsec/helper-rust/src/client.rs b/appsec/helper-rust/src/client.rs
index bdd7d34c6..a32ca6ce4 100644
--- a/appsec/helper-rust/src/client.rs
+++ b/appsec/helper-rust/src/client.rs
@@ -867,6 +867,13 @@ async fn recv_command(
     cancel_token: &CancellationToken,
 ) -> anyhow::Result<protocol::Command> {
     debug!("Waiting for command");
+    // Allows integration tests to kill the PHP process while the helper is waiting for the
+    // next command, reproducing the connection-reset scenario without relying on precise timing.
+    if let Ok(delay_str) = std::env::var("_DD_APPSEC_HELPER_RECV_CMD_DELAY_MS") {
+        if let Ok(ms) = delay_str.parse::<u64>() {
+            tokio::time::sleep(tokio::time::Duration::from_millis(ms)).await;
+        }
+    }

     tokio::select! {
         maybe_msg = framed.next() => {
@@ -922,11 +929,11 @@ async fn send_command_resp(
     cmd: protocol::CommandResponse<'_>,
 ) -> anyhow::Result<()> {
     debug!("Sending command: {:?}", cmd);
-    match framed.send(cmd).await {
-        Ok(_) => Ok(()),
-        Err(err) => {
-            error!("Error sending command: {}", err);
-            Err(err)?
+    // Allows integration tests to kill the PHP process while the helper is mid-response,
+    // reproducing the broken-pipe scenario without relying on precise timing.
+    if let Ok(delay_str) = std::env::var("_DD_APPSEC_HELPER_SEND_RESP_DELAY_MS") {
+        if let Ok(ms) = delay_str.parse::<u64>() {
+            tokio::time::sleep(tokio::time::Duration::from_millis(ms)).await;
         }
     }
     framed.send(cmd).await.map_err(|err| {

and:

package com.datadog.appsec.php.integration

import com.datadog.appsec.php.TelemetryHelpers
import com.datadog.appsec.php.docker.AppSecContainer
import groovy.util.logging.Slf4j
import org.junit.jupiter.api.Tag
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.condition.DisabledIf
import org.testcontainers.junit.jupiter.Container
import org.testcontainers.junit.jupiter.Testcontainers

import java.net.http.HttpRequest
import java.net.http.HttpResponse

import static com.datadog.appsec.php.integration.TestParams.getPhpVersion
import static com.datadog.appsec.php.integration.TestParams.getVariant

@Testcontainers
@Slf4j
@Tag("musl")
@DisabledIf('isDisabled')
class BrokenPipeTests {
    // Only meaningful with the Rust helper, which is the default on PHP >= 8.5
    static boolean disabled = !TestParams.phpVersionAtLeast('8.5') || variant.contains('zts')

    @Container
    public static final AppSecContainer CONTAINER =
            new AppSecContainer(
                    workVolume: this.name,
                    baseTag: 'nginx-fpm-php',
                    phpVersion: phpVersion,
                    phpVariant: variant,
                    www: 'base',
            ) {
                @Override
                void configure() {
                    super.configure()
                    // The delay gives the test time to kill the PHP process between the helper
                    // receiving the command and writing the response back, reproducing the
                    // broken-pipe scenario from the wild crash report.
                    withEnv('_DD_APPSEC_HELPER_SEND_RESP_DELAY_MS', '3000')
                }
            }

    @Test
    void 'broken pipe logged when PHP process dies before helper writes response'() {
        // Fire a request that triggers WAF evaluation (Arachni is in the default rules)
        HttpRequest req = CONTAINER.buildReq('/hello.php')
                .header('User-Agent', 'Arachni/v1').GET().build()
        def responseFuture = CONTAINER.httpClient.sendAsync(req, HttpResponse.BodyHandlers.ofString())

        // Give PHP enough time to receive the HTTP request and send the request_init
        // command to the helper before we kill the process.
        Thread.sleep(500)

        // Kill the PHP-FPM worker. FPM master will respawn it. The helper is now sleeping
        // for _DD_APPSEC_HELPER_SEND_RESP_DELAY_MS ms before it tries to write the response,
        // at which point it will get EPIPE.
        def killResult = CONTAINER.execInContainer(
                'bash', '-c',
                'pkill -KILL -f "php-fpm: pool www" 2>/dev/null; pkill -KILL -f "php-fpm: pool" 2>/dev/null; true')
        log.info('Kill result: exit={} stderr={}', killResult.exitCode, killResult.stderr)

        // Wait for the helper's delay to expire and the error to be written to the log.
        // The delay is 3000 ms; 4000 ms leaves a comfortable margin.
        Thread.sleep(4000)

        responseFuture.cancel(true)
        CONTAINER.clearTraces()

        def logResult = CONTAINER.execInContainer('cat', '/tmp/logs/helper.log')
        def brokenPipeWarnLine = logResult.stdout.readLines().find { line ->
            line.contains('[WARN]') &&
                    (line.toLowerCase().contains('broken pipe') || line.toLowerCase().contains('connectivity issue'))
        }
        assert brokenPipeWarnLine != null :
                "Expected a [WARN] line about broken pipe / connectivity issue in helper.log but found none.\n" +
                "helper.log contents:\n${logResult.stdout}"

        // Verify the broken pipe is NOT reported as a telemetry error. Since it is classified
        // as a connectivity issue (ForcefulDisconnect) and logged at WARN, TelemetryAwareLogger
        // should not forward it. Error telemetry is sent eagerly, so 5s is sufficient.
        def telemetryLogs = TelemetryHelpers.waitForLogs(CONTAINER, 5) { List<TelemetryHelpers.Logs> messages ->
            false  // collect for full duration
        }.collectMany { it.logs }
        assert !telemetryLogs.any { log ->
            log.message.toLowerCase().contains('broken pipe') ||
                    log.message.toLowerCase().contains('epipe') ||
                    log.message.toLowerCase().contains('os error 32')
        } : 'Broken pipe should not generate any telemetry log'
    }
}
package com.datadog.appsec.php.integration

import com.datadog.appsec.php.docker.AppSecContainer
import groovy.util.logging.Slf4j
import org.junit.jupiter.api.Tag
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.condition.DisabledIf
import org.testcontainers.junit.jupiter.Container
import org.testcontainers.junit.jupiter.Testcontainers

import java.net.http.HttpResponse.BodyHandlers

import static com.datadog.appsec.php.integration.TestParams.getPhpVersion
import static com.datadog.appsec.php.integration.TestParams.getVariant

@Testcontainers
@Slf4j
@Tag("musl")
@DisabledIf('isDisabled')
class ConnectionResetTests {
    // Only meaningful with the Rust helper, which is the default on PHP >= 8.5
    static boolean disabled = !TestParams.phpVersionAtLeast('8.5') || variant.contains('zts')

    @Container
    public static final AppSecContainer CONTAINER =
            new AppSecContainer(
                    workVolume: this.name,
                    baseTag: 'nginx-fpm-php',
                    phpVersion: phpVersion,
                    phpVariant: variant,
                    www: 'base',
            )

    // Sends a valid ClientInit to the helper and then closes without reading
    // the response. Linux's unix_release_sock sends RST to the peer when the
    // closing socket's receive buffer is non-empty, causing ECONNRESET on the
    // helper's next recv_command call.
    // Root uid (0) is explicitly allowed by the helper's peer-uid check.
    // Uses an inline msgpack encoder so there is no dependency on external packages.
    private static final String RST_SCRIPT = '''\
import socket, struct, time, sys

def pack(v):
    if v is None:
        return b\'\\xc0\'
    if isinstance(v, bool):
        return b\'\\xc3\' if v else b\'\\xc2\'
    if isinstance(v, int):
        if 0 <= v <= 127: return bytes([v])
        if 0 <= v <= 0xFFFF: return b\'\\xcd\' + v.to_bytes(2, \'big\')
        return b\'\\xce\' + v.to_bytes(4, \'big\')
    if isinstance(v, float):
        return b\'\\xcb\' + struct.pack(\'>d\', v)
    if isinstance(v, str):
        b = v.encode(\'utf-8\'); n = len(b)
        if n <= 31: return bytes([0xa0 | n]) + b
        if n <= 255: return b\'\\xd9\' + bytes([n]) + b
        return b\'\\xda\' + n.to_bytes(2, \'big\') + b
    if isinstance(v, (list, tuple)):
        n = len(v)
        prefix = bytes([0x90 | n]) if n <= 15 else b\'\\xdc\' + n.to_bytes(2, \'big\')
        return prefix + b\'\'.join(pack(e) for e in v)
    raise TypeError(f\'unsupported: {type(v)}\')

def make_msg(data):
    body = pack(data)
    return b\'dds\\x00\' + struct.pack(\'<I\', len(body)) + body

SOCKET_PATH = b\'\\x00/ddappsec/ddappsec_1.18.0_1000.sock\'

# client_init command matching the protocol tuple structure:
# [pid, version, php_version, enabled, waf_config, remote_config, telemetry, sidecar]
# waf_config: [rules_file, timeout_us, rate_limit, key_regex, val_regex, [schema_enabled, period]]
# remote_config: [enabled, shmem_path]
# telemetry: [service, env]
# sidecar: [session_id, runtime_id]
cmd = [\'client_init\', [
    12345, \'1.18.0\', \'8.5.0\', True,
    [\'/etc/recommended.json\', 10001, 100, \'\', \'\', [True, 30.0]],
    [False, \'\'],
    [\'rst_test\', \'rst_env\'],
    [\'00000000-0000-0000-0000-000000000001\', \'00000000-0000-0000-0000-000000000001\'],
]]

s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.connect(SOCKET_PATH)
s.sendall(make_msg(cmd))
# Give the helper time to process ClientInit and send its response.
# The response lands in our kernel receive buffer (we never call recv()).
time.sleep(1.0)
# Close normally. Because our receive buffer is non-empty, Linux\'s
# unix_release_sock sends RST to the helper → ECONNRESET on next read.
s.close()
print("closed with data in receive buffer -> helper should get ECONNRESET", file=sys.stderr)
'''

    @Test
    void 'connection reset logged as WARN and not reported as telemetry error'() {
        // Fire a normal request first. This starts the PHP-FPM workers, loads the appsec
        // extension, and causes the Rust helper to start listening on its socket.
        def warmupReq = CONTAINER.buildReq('/hello.php').GET().build()
        CONTAINER.httpClient.send(warmupReq, BodyHandlers.ofString())
        Thread.sleep(500)
        CONTAINER.clearTraces()

        def rstResult = CONTAINER.execInContainer('python3', '-c', RST_SCRIPT)
        log.info('RST script: exit={} stderr={}', rstResult.exitCode, rstResult.stderr)
        assert rstResult.exitCode == 0 :
                "Python3 RST script failed (is python3 available in the container?)\n" +
                "stderr: ${rstResult.stderr}"

        // Error telemetry is submitted eagerly, so 5s is sufficient to confirm absence.
        Thread.sleep(5000)

        def logLines = CONTAINER.execInContainer('cat', '/tmp/logs/helper.log').stdout.readLines()

        def warnLine = logLines.find { line ->
            line.contains('[WARN]') &&
                    (line.toLowerCase().contains('connection reset') ||
                     line.toLowerCase().contains('os error 104') ||
                     line.toLowerCase().contains('connectivity issue'))
        }
        assert warnLine != null :
                "Expected a [WARN] line about connection reset / connectivity issue in helper.log but found none.\n" +
                "helper.log contents:\n${logLines.join('\n')}"

        // Verify connection reset is NOT reported as a telemetry error: it is classified as a
        // connectivity issue (ForcefulDisconnect) and logged at WARN, so TelemetryAwareLogger
        // must not forward it. The absence of a 'Submitting telemetry log to sidecar' line
        // for connection reset in helper.log confirms this.
        assert !logLines.any { line ->
            line.contains('Submitting telemetry log to sidecar') &&
                    (line.toLowerCase().contains('connection reset') ||
                     line.toLowerCase().contains('os error 104'))
        } : 'Connection reset should not generate a telemetry error log'
    }

Description

Reviewer checklist

  • Test coverage seems ok.
  • Appropriate labels assigned.

@cataphract cataphract requested a review from a team as a code owner April 15, 2026 13:17
@datadog-datadog-prod-us1-2
Copy link
Copy Markdown

datadog-datadog-prod-us1-2 Bot commented Apr 15, 2026

Tests

🎉 All green!

❄️ No new flaky tests detected
🧪 All tests passed

🎯 Code Coverage (details)
Patch Coverage: 100.00%
Overall Coverage: 60.64% (+0.00%)

This comment will be updated automatically if new data arrives.
🔗 Commit SHA: 3124fa7 | Docs | Datadog PR Page | Was this helpful? React with 👍/👎 or give us feedback!

@cataphract cataphract force-pushed the glopes/rust-helper-write-disconnect branch from ce915da to 6fc6fd2 Compare April 15, 2026 13:39
The special ForcefulDisconnect error was only being used in read, not
write paths, resulting in forceful disconnects (broken pipe, connection
reset) being logged as errors and sent via telemetry.

There was another problem in that these were being double-logged.

Fix both problems. Tested with:

```
--- a/appsec/helper-rust/src/client.rs
+++ b/appsec/helper-rust/src/client.rs
@@ -912,11 +912,11 @@ async fn send_command_resp(
     cmd: protocol::CommandResponse<'_>,
 ) -> anyhow::Result<()> {
Fix rust helper double logging as error broken connections
     debug!("Sending command: {:?}", cmd);
-    match framed.send(cmd).await {
-        Ok(_) => Ok(()),
-        Err(err) => {
-            error!("Error sending command: {}", err);
-            Err(err)?
+    // Allows integration tests to kill the PHP process while the helper is mid-response,
+    // reproducing the broken-pipe scenario without relying on precise timing.
+    if let Ok(delay_str) = std::env::var("_DD_APPSEC_HELPER_SEND_RESP_DELAY_MS") {
+        if let Ok(ms) = delay_str.parse::<u64>() {
+            tokio::time::sleep(tokio::time::Duration::from_millis(ms)).await;
         }
     }
     framed.send(cmd).await.map_err(|err| {
```

and:

```
package com.datadog.appsec.php.integration

import com.datadog.appsec.php.TelemetryHelpers
import com.datadog.appsec.php.docker.AppSecContainer
import groovy.util.logging.Slf4j
import org.junit.jupiter.api.Tag
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.condition.DisabledIf
import org.testcontainers.junit.jupiter.Container
import org.testcontainers.junit.jupiter.Testcontainers

import java.net.http.HttpRequest
import java.net.http.HttpResponse

import static com.datadog.appsec.php.integration.TestParams.getPhpVersion
import static com.datadog.appsec.php.integration.TestParams.getVariant

@testcontainers
@slf4j
@tag("musl")
@DisabledIf('isDisabled')
class BrokenPipeTests {
    // Only meaningful with the Rust helper, which is the default on PHP >= 8.5
    static boolean disabled = !TestParams.phpVersionAtLeast('8.5') || variant.contains('zts')

    @container
    public static final AppSecContainer CONTAINER =
            new AppSecContainer(
                    workVolume: this.name,
                    baseTag: 'nginx-fpm-php',
                    phpVersion: phpVersion,
                    phpVariant: variant,
                    www: 'base',
            ) {
                @OverRide
                void configure() {
                    super.configure()
                    // The delay gives the test time to kill the PHP process between the helper
                    // receiving the command and writing the response back, reproducing the
                    // broken-pipe scenario from the wild crash report.
                    withEnv('_DD_APPSEC_HELPER_SEND_RESP_DELAY_MS', '3000')
                }
            }

    @test
    void 'broken pipe logged when PHP process dies before helper writes response'() {
        // Fire a request that triggers WAF evaluation (Arachni is in the default rules)
        HttpRequest req = CONTAINER.buildReq('/hello.php')
                .header('User-Agent', 'Arachni/v1').GET().build()
        def responseFuture = CONTAINER.httpClient.sendAsync(req, HttpResponse.BodyHandlers.ofString())

        // Give PHP enough time to receive the HTTP request and send the request_init
        // command to the helper before we kill the process.
        Thread.sleep(500)

        // Kill the PHP-FPM worker. FPM master will respawn it. The helper is now sleeping
        // for _DD_APPSEC_HELPER_SEND_RESP_DELAY_MS ms before it tries to write the response,
        // at which point it will get EPIPE.
        def killResult = CONTAINER.execInContainer(
                'bash', '-c',
                'pkill -KILL -f "php-fpm: pool www" 2>/dev/null; pkill -KILL -f "php-fpm: pool" 2>/dev/null; true')
        log.info('Kill result: exit={} stderr={}', killResult.exitCode, killResult.stderr)

        // Wait for the helper's delay to expire and the error to be written to the log.
        // The delay is 3000 ms; 4000 ms leaves a comfortable margin.
        Thread.sleep(4000)

        responseFuture.cancel(true)
        CONTAINER.clearTraces()

        def logResult = CONTAINER.execInContainer('cat', '/tmp/logs/helper.log')
        def brokenPipeWarnLine = logResult.stdout.readLines().find { line ->
            line.contains('[WARN]') &&
                    (line.toLowerCase().contains('broken pipe') || line.toLowerCase().contains('connectivity issue'))
        }
        assert brokenPipeWarnLine != null :
                "Expected a [WARN] line about broken pipe / connectivity issue in helper.log but found none.\n" +
                "helper.log contents:\n${logResult.stdout}"

        // Verify the broken pipe is NOT reported as a telemetry error. Since it is classified
        // as a connectivity issue (ForcefulDisconnect) and logged at WARN, TelemetryAwareLogger
        // should not forward it. Error telemetry is sent eagerly, so 5s is sufficient.
        def telemetryLogs = TelemetryHelpers.waitForLogs(CONTAINER, 5) { List<TelemetryHelpers.Logs> messages ->
            false  // collect for full duration
        }.collectMany { it.logs }
        assert !telemetryLogs.any { log ->
            log.message.toLowerCase().contains('broken pipe') ||
                    log.message.toLowerCase().contains('epipe') ||
                    log.message.toLowerCase().contains('os error 32')
        } : 'Broken pipe should not generate any telemetry log'
    }
}
```
…PIPE

Rename is_incomplete_stream_error to is_forceful_disconnect_error and
extend it to also treat ECONNRESET and EPIPE as forceful disconnects.
On Linux, ECONNRESET is delivered to the peer when a Unix socket is
closed while its receive buffer is non-empty (unix_release_sock),
indicating the client crashed or was killed after we sent our response —
a connectivity issue, not a protocol error.

Also remove redundant `git config --global --add safe.directory '*'`
calls from integration Docker build tasks.
@cataphract cataphract force-pushed the glopes/rust-helper-write-disconnect branch from 6fc6fd2 to 3124fa7 Compare April 15, 2026 15:19
@pr-commenter
Copy link
Copy Markdown

pr-commenter Bot commented Apr 15, 2026

Benchmarks [ tracer ]

Benchmark execution time: 2026-04-15 16:38:03

Comparing candidate commit 3124fa7 in PR branch glopes/rust-helper-write-disconnect with baseline commit 2b97a2a in branch master.

Found 0 performance improvements and 0 performance regressions! Performance is the same for 193 metrics, 1 unstable metrics.

@cataphract cataphract merged commit 47460af into master Apr 16, 2026
2087 of 2098 checks passed
@cataphract cataphract deleted the glopes/rust-helper-write-disconnect branch April 16, 2026 10:37
@github-actions github-actions Bot added this to the 1.19.0 milestone Apr 16, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants