From a188b9e3efb477deb1a650d5b1160b1ee2c82162 Mon Sep 17 00:00:00 2001 From: theteacat Date: Fri, 9 May 2025 16:36:20 +0100 Subject: [PATCH 01/15] renames --- src/{bidirectionalStream.go => bidirectional_stream.go} | 0 src/{requestAndResponse.go => request_and_response.go} | 0 src/{serviceIpManager.go => service_ip_manager.go} | 0 3 files changed, 0 insertions(+), 0 deletions(-) rename src/{bidirectionalStream.go => bidirectional_stream.go} (100%) rename src/{requestAndResponse.go => request_and_response.go} (100%) rename src/{serviceIpManager.go => service_ip_manager.go} (100%) diff --git a/src/bidirectionalStream.go b/src/bidirectional_stream.go similarity index 100% rename from src/bidirectionalStream.go rename to src/bidirectional_stream.go diff --git a/src/requestAndResponse.go b/src/request_and_response.go similarity index 100% rename from src/requestAndResponse.go rename to src/request_and_response.go diff --git a/src/serviceIpManager.go b/src/service_ip_manager.go similarity index 100% rename from src/serviceIpManager.go rename to src/service_ip_manager.go From bf561058e75430b89361320cb324303697be3b67 Mon Sep 17 00:00:00 2001 From: theteacat Date: Fri, 9 May 2025 16:36:33 +0100 Subject: [PATCH 02/15] initial is_json implementation --- src/is_json.go | 23 +++++++++++++++++ src/is_json_test.go | 62 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 85 insertions(+) create mode 100644 src/is_json.go create mode 100644 src/is_json_test.go diff --git a/src/is_json.go b/src/is_json.go new file mode 100644 index 0000000..6c7c2e9 --- /dev/null +++ b/src/is_json.go @@ -0,0 +1,23 @@ +package main + +import ( + "bytes" + "encoding/json" + "io" + "mime" + "net/http" +) + +func isJson(r *http.Request) bool { + mediaType, _, err := mime.ParseMediaType(r.Header.Get("Content-Type")) + if err == nil && mediaType == "application/json" { + return true + } + bodyBytes, err := io.ReadAll(r.Body) + if err != nil { + return false + } + r.Body = io.NopCloser(io.MultiReader(bytes.NewReader(bodyBytes))) + var v map[string]interface{} + return json.Unmarshal(bodyBytes, &v) == nil +} diff --git a/src/is_json_test.go b/src/is_json_test.go new file mode 100644 index 0000000..2e1fc90 --- /dev/null +++ b/src/is_json_test.go @@ -0,0 +1,62 @@ +package main + +import ( + "net/http" + "strings" + "testing" +) + +func TestIsJson(t *testing.T) { + tests := []struct { + name string + contentType string + body string + expectedResult bool + }{ + { + name: "Valid JSON with correct Content-Type", + contentType: "application/json", + body: `{"key": "value"}`, + expectedResult: true, + }, + { + name: "Invalid JSON with correct Content-Type", + contentType: "application/json", + body: `{"key": "value"`, + expectedResult: true, + }, + { + name: "Valid JSON with incorrect Content-Type", + contentType: "text/plain", + body: `{"key": "value"}`, + expectedResult: true, + }, + { + name: "Empty body with correct Content-Type", + contentType: "application/json", + body: ``, + expectedResult: true, + }, + { + name: "No Content-Type header", + contentType: "", + body: `{"key": "value"}`, + expectedResult: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + req, err := http.NewRequest("POST", "/", strings.NewReader(tt.body)) + if err != nil { + t.Fatalf("Failed to create request: %v", err) + } + if tt.contentType != "" { + req.Header.Set("Content-Type", tt.contentType) + } + result := isJson(req) + if result != tt.expectedResult { + t.Errorf("isJson() = %v, want %v", result, tt.expectedResult) + } + }) + } +} From 8af652f9dcf9d42ff172d1c468547a9da11d7f2b Mon Sep 17 00:00:00 2001 From: theteacat Date: Fri, 9 May 2025 16:36:45 +0100 Subject: [PATCH 03/15] use is_json in main --- src/main.go | 52 ++++++++++++++++++++++++++++++++-------------------- 1 file changed, 32 insertions(+), 20 deletions(-) diff --git a/src/main.go b/src/main.go index 420338a..0705a5a 100644 --- a/src/main.go +++ b/src/main.go @@ -15,23 +15,26 @@ import ( ) func main() { + logsApiToken, logsApiTokenSet := os.LookupEnv("FIRETAIL_API_TOKEN") + if !logsApiTokenSet { + log.Fatal("FIRETAIL_API_TOKEN environment variable not set") + } + devEnabled, _ := strconv.ParseBool(os.Getenv("FIRETAIL_KUBERNETES_SENSOR_DEV_MODE")) if devEnabled { slog.Warn("🧰 Development mode enabled, setting log level to debug...") slog.SetLogLoggerLevel(slog.LevelDebug) } - logsApiToken, logsApiTokenSet := os.LookupEnv("FIRETAIL_API_TOKEN") - if !logsApiTokenSet { - log.Fatal("FIRETAIL_API_TOKEN environment variable not set") - } - - var ipManager *serviceIpManager - if disableServiceIpFilter, err := strconv.ParseBool(os.Getenv("DISABLE_SERVICE_IP_FILTERING")); !(err == nil && disableServiceIpFilter) { - slog.Info( - "Service IP filter enabled, monitoring service IPs...", - ) - ipManager = newServiceIpManager() + devServerEnabled, err := strconv.ParseBool(os.Getenv("FIRETAIL_KUBERNETES_SENSOR_DEV_SERVER_ENABLED")) + if err == nil && devServerEnabled { + slog.Warn("🧰 Development server enabled, starting example HTTP server...") + go func() { + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, "Hello, %s!", r.URL.Path[1:]) + }) + log.Fatal(http.ListenAndServe(":80", nil)) + }() } bpfExpression, bpfExpressionSet := os.LookupEnv("BPF_EXPRESSION") @@ -43,17 +46,16 @@ func main() { bpfExpression = "tcp and (port 80 or port 443)" } - devServerEnabled, err := strconv.ParseBool(os.Getenv("FIRETAIL_KUBERNETES_SENSOR_DEV_SERVER_ENABLED")) - if err == nil && devServerEnabled { - slog.Warn("🧰 Development server enabled, starting example HTTP server...") - go func() { - http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - fmt.Fprintf(w, "Hello, %s!", r.URL.Path[1:]) - }) - log.Fatal(http.ListenAndServe(":80", nil)) - }() + var ipManager *serviceIpManager + if disableServiceIpFilter, err := strconv.ParseBool(os.Getenv("DISABLE_SERVICE_IP_FILTERING")); !(err == nil && disableServiceIpFilter) { + slog.Info( + "Service IP filter enabled, monitoring service IPs...", + ) + ipManager = newServiceIpManager() } + onlyLogJson, _ := strconv.ParseBool(os.Getenv("ENABLE_ONLY_LOG_JSON")) + requestAndResponseChannel := make(chan httpRequestAndResponse, 1) httpRequestStreamer := &httpRequestAndResponseStreamer{ bpfExpression: bpfExpression, @@ -91,6 +93,16 @@ func main() { ) continue } + if onlyLogJson && !isJson(requestAndResponse.request) { + slog.Debug( + "Ignoring non-JSON request:", + "Src", requestAndResponse.src, + "Dst", requestAndResponse.dst, + "SrcPort", requestAndResponse.srcPort, + "DstPort", requestAndResponse.dstPort, + ) + continue + } slog.Debug( "Captured request and response:", "Method", requestAndResponse.request.Method, From 7cdfb1e909aded616c1c0d6488eed5e51ff44564 Mon Sep 17 00:00:00 2001 From: theteacat Date: Fri, 9 May 2025 16:37:25 +0100 Subject: [PATCH 04/15] add ENABLE_ONLY_LOG_JSON env var to dev target in makefile --- Makefile | 1 + 1 file changed, 1 insertion(+) diff --git a/Makefile b/Makefile index 32a4777..cd232dd 100644 --- a/Makefile +++ b/Makefile @@ -23,4 +23,5 @@ dev: build-dev -e FIRETAIL_KUBERNETES_SENSOR_DEV_MODE=true \ -e FIRETAIL_KUBERNETES_SENSOR_DEV_SERVER_ENABLED=true \ -e DISABLE_SERVICE_IP_FILTERING=true \ + -e ENABLE_ONLY_LOG_JSON=true \ firetail/kubernetes-sensor-dev From 9363737ab20cf83c6a0b69914df38223ab87d4ec Mon Sep 17 00:00:00 2001 From: theteacat Date: Fri, 9 May 2025 17:30:47 +0100 Subject: [PATCH 05/15] ignore packets not destined for or originating from a service IP --- src/requestAndResponse.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/requestAndResponse.go b/src/requestAndResponse.go index 186a182..28384ce 100644 --- a/src/requestAndResponse.go +++ b/src/requestAndResponse.go @@ -2,6 +2,7 @@ package main import ( "log" + "log/slog" "net/http" "time" @@ -58,6 +59,22 @@ func (s *httpRequestAndResponseStreamer) start() { if !ok { continue } + net, ok := packet.NetworkLayer().(*layers.IPv4) + if !ok { + continue + } + src := net.SrcIP.String() + dst := net.DstIP.String() + if s.ipManager != nil && !(s.ipManager.isServiceIP(dst) || s.ipManager.isServiceIP(src)) { + slog.Debug( + "Ignoring packet not destined for or originating from a service IP:", + "Src", src, + "Dst", dst, + "SrcPort", tcp.SrcPort.String(), + "DstPort", tcp.DstPort.String(), + ) + continue + } assembler.AssembleWithTimestamp(packet.NetworkLayer().NetworkFlow(), tcp, packet.Metadata().Timestamp) case <-ticker: assembler.FlushOlderThan(time.Now().Add(-2 * time.Minute)) From dd0a10eac18458451bf33225a5145cddef1a41e1 Mon Sep 17 00:00:00 2001 From: theteacat Date: Fri, 9 May 2025 17:32:42 +0100 Subject: [PATCH 06/15] Doc `ENABLE_ONLY_LOG_JSON` env var --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 4b89cc4..fa4b6f2 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,7 @@ POC for a FireTail Kubernetes Sensor. | `FIRETAIL_API_TOKEN` | ✅ | `PS-02-XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX-XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX` | The API token the sensor will use to report logs to FireTail | | `BPF_EXPRESSION` | ❌ | `tcp and (port 80 or port 443)` | The BPF filter used by the sensor. See docs for syntax info: https://www.tcpdump.org/manpages/pcap-filter.7.html | | `DISABLE_SERVICE_IP_FILTERING` | ❌ | `true` | Disables polling Kubernetes for the IP addresses of services & subsequently ignoring all requests captured that aren't made to one of those IPs. | +| `ENABLE_ONLY_LOG_JSON` | ❌ | `true` | Enables only logging requests where the content-type implies the payload should be JSON, or the payload is valid JSON regardless of the content-type. | | `FIRETAIL_API_URL` | ❌ | `https://api.logging.eu-west-1.prod.firetail.app/logs/bulk` | The API url the sensor will send logs to. Defaults to the EU region production environment. | | `FIRETAIL_KUBERNETES_SENSOR_DEV_MODE` | ❌ | `true` | Enables debug logging when set to `true`, and reduces the max age of a log in a batch to be sent to FireTail. | | `FIRETAIL_KUBERNETES_SENSOR_DEV_SERVER_ENABLED` | ❌ | `true` | Enables a demo web server when set to `true`; useful for sending test requests to. | From d8bc4cf3c651c0e26ef039fe8ef6c1138b51c630 Mon Sep 17 00:00:00 2001 From: theteacat Date: Mon, 12 May 2025 09:40:24 +0100 Subject: [PATCH 07/15] check both request and response payloads and headers --- src/is_json.go | 24 ++++++--- src/is_json_test.go | 116 +++++++++++++++++++++++++++++++++----------- src/main.go | 2 +- 3 files changed, 107 insertions(+), 35 deletions(-) diff --git a/src/is_json.go b/src/is_json.go index 6c7c2e9..cedd417 100644 --- a/src/is_json.go +++ b/src/is_json.go @@ -8,16 +8,28 @@ import ( "net/http" ) -func isJson(r *http.Request) bool { - mediaType, _, err := mime.ParseMediaType(r.Header.Get("Content-Type")) - if err == nil && mediaType == "application/json" { - return true +func isJson(req_and_resp *httpRequestAndResponse) bool { + for _, headers := range []http.Header{req_and_resp.request.Header, req_and_resp.response.Header} { + mediaType, _, err := mime.ParseMediaType(headers.Get("Content-Type")) + if err == nil && mediaType == "application/json" { + return true + } } - bodyBytes, err := io.ReadAll(r.Body) + + bodyBytes, err := io.ReadAll(req_and_resp.request.Body) + req_and_resp.request.Body = io.NopCloser(io.MultiReader(bytes.NewReader(bodyBytes))) if err != nil { return false } - r.Body = io.NopCloser(io.MultiReader(bytes.NewReader(bodyBytes))) var v map[string]interface{} + if json.Unmarshal(bodyBytes, &v) == nil { + return true + } + + bodyBytes, err = io.ReadAll(req_and_resp.response.Body) + req_and_resp.response.Body = io.NopCloser(io.MultiReader(bytes.NewReader(bodyBytes))) + if err != nil { + return false + } return json.Unmarshal(bodyBytes, &v) == nil } diff --git a/src/is_json_test.go b/src/is_json_test.go index 2e1fc90..b1cdab4 100644 --- a/src/is_json_test.go +++ b/src/is_json_test.go @@ -1,6 +1,7 @@ package main import ( + "io" "net/http" "strings" "testing" @@ -8,52 +9,111 @@ import ( func TestIsJson(t *testing.T) { tests := []struct { - name string - contentType string - body string - expectedResult bool + name string + reqContentType string + reqBody string + respContentType string + respBody string + expectedResult bool }{ { - name: "Valid JSON with correct Content-Type", - contentType: "application/json", - body: `{"key": "value"}`, - expectedResult: true, + name: "Valid JSON in both request and response with correct content types", + reqContentType: "application/json", + reqBody: `{"key": "value"}`, + respContentType: "application/json", + respBody: `{"key": "value"}`, + expectedResult: true, }, { - name: "Invalid JSON with correct Content-Type", - contentType: "application/json", - body: `{"key": "value"`, - expectedResult: true, + name: "XML in request and response with correct Content-Type", + reqContentType: "application/xml", + reqBody: `value`, + respContentType: "application/xml", + respBody: `value`, + expectedResult: false, }, { - name: "Valid JSON with incorrect Content-Type", - contentType: "text/plain", - body: `{"key": "value"}`, - expectedResult: true, + name: "XML in request with JSON in response", + reqContentType: "application/xml", + reqBody: `value`, + respContentType: "application/json", + respBody: `{"key": "value"}`, + expectedResult: true, }, { - name: "Empty body with correct Content-Type", - contentType: "application/json", - body: ``, - expectedResult: true, + name: "JSON in request with XML in response", + reqContentType: "application/json", + reqBody: `{"key": "value"}`, + respContentType: "application/xml", + respBody: `value`, + expectedResult: true, }, { - name: "No Content-Type header", - contentType: "", - body: `{"key": "value"}`, - expectedResult: true, + name: "Empty request and response bodies and headers", + reqContentType: "", + reqBody: "", + respContentType: "", + respBody: "", + expectedResult: false, + }, + { + name: "No content-type headers with valid JSON in request", + reqContentType: "", + reqBody: `{"key": "value"}`, + respContentType: "", + respBody: ``, + expectedResult: true, + }, + { + name: "No content-type headers with valid JSON in response", + reqContentType: "", + reqBody: ``, + respContentType: "", + respBody: `{"key": "value"}`, + expectedResult: true, + }, + { + name: "No content-type headers with invalid JSON in request", + reqContentType: "", + reqBody: `{"key": "value"`, + respContentType: "", + respBody: ``, + expectedResult: false, + }, + { + name: "No content-type headers with invalid JSON in response", + reqContentType: "", + reqBody: ``, + respContentType: "", + respBody: `{"key": "value"`, + expectedResult: false, }, } + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - req, err := http.NewRequest("POST", "/", strings.NewReader(tt.body)) + req, err := http.NewRequest("POST", "/", strings.NewReader(tt.reqBody)) if err != nil { t.Fatalf("Failed to create request: %v", err) } - if tt.contentType != "" { - req.Header.Set("Content-Type", tt.contentType) + if tt.reqContentType != "" { + req.Header.Set("Content-Type", tt.reqContentType) + } + + resp := &http.Response{ + Header: make(http.Header), + Body: io.NopCloser(strings.NewReader(tt.respBody)), + } + if tt.respContentType != "" { + resp.Header.Set("Content-Type", tt.respContentType) } - result := isJson(req) + + reqAndResp := httpRequestAndResponse{ + request: req, + response: resp, + } + + result := isJson(&reqAndResp) if result != tt.expectedResult { t.Errorf("isJson() = %v, want %v", result, tt.expectedResult) } diff --git a/src/main.go b/src/main.go index 0705a5a..4296211 100644 --- a/src/main.go +++ b/src/main.go @@ -93,7 +93,7 @@ func main() { ) continue } - if onlyLogJson && !isJson(requestAndResponse.request) { + if onlyLogJson && !isJson(&requestAndResponse) { slog.Debug( "Ignoring non-JSON request:", "Src", requestAndResponse.src, From 692bf93078e6a0166e2070e2c89af7058b2f6f62 Mon Sep 17 00:00:00 2001 From: theteacat Date: Mon, 12 May 2025 09:54:57 +0100 Subject: [PATCH 08/15] add test case with +json --- src/is_json_test.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/is_json_test.go b/src/is_json_test.go index b1cdab4..ab02280 100644 --- a/src/is_json_test.go +++ b/src/is_json_test.go @@ -88,6 +88,14 @@ func TestIsJson(t *testing.T) { respBody: `{"key": "value"`, expectedResult: false, }, + { + name: "Content-type geo+json in request with invalid body", + reqContentType: "application/geo+json", + reqBody: ``, + respContentType: "", + respBody: ``, + expectedResult: true, + }, } for _, tt := range tests { From adc37a2b6496fac3b278244f289323cf45f0cbcc Mon Sep 17 00:00:00 2001 From: theteacat Date: Mon, 12 May 2025 09:55:15 +0100 Subject: [PATCH 09/15] update isJson to check for +json suffix in content types --- src/is_json.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/is_json.go b/src/is_json.go index cedd417..3f64e43 100644 --- a/src/is_json.go +++ b/src/is_json.go @@ -6,14 +6,19 @@ import ( "io" "mime" "net/http" + "strings" ) func isJson(req_and_resp *httpRequestAndResponse) bool { for _, headers := range []http.Header{req_and_resp.request.Header, req_and_resp.response.Header} { - mediaType, _, err := mime.ParseMediaType(headers.Get("Content-Type")) + contentTypeHeader := headers.Get("Content-Type") + mediaType, _, err := mime.ParseMediaType(contentTypeHeader) if err == nil && mediaType == "application/json" { return true } + if strings.HasSuffix(mediaType, "+json") { + return true + } } bodyBytes, err := io.ReadAll(req_and_resp.request.Body) From ea270ae7365b1b568a256eb4b7182546971210b3 Mon Sep 17 00:00:00 2001 From: theteacat Date: Mon, 12 May 2025 10:27:35 +0100 Subject: [PATCH 10/15] Implement & doc ONLY_LOG_JSON_MAX_CONTENT_LENGTH env var --- README.md | 1 + src/is_json.go | 39 ++++++---- src/is_json_test.go | 181 ++++++++++++++++++++++++++++---------------- src/main.go | 16 +++- 4 files changed, 154 insertions(+), 83 deletions(-) diff --git a/README.md b/README.md index fa4b6f2..9aada96 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,7 @@ POC for a FireTail Kubernetes Sensor. | `BPF_EXPRESSION` | ❌ | `tcp and (port 80 or port 443)` | The BPF filter used by the sensor. See docs for syntax info: https://www.tcpdump.org/manpages/pcap-filter.7.html | | `DISABLE_SERVICE_IP_FILTERING` | ❌ | `true` | Disables polling Kubernetes for the IP addresses of services & subsequently ignoring all requests captured that aren't made to one of those IPs. | | `ENABLE_ONLY_LOG_JSON` | ❌ | `true` | Enables only logging requests where the content-type implies the payload should be JSON, or the payload is valid JSON regardless of the content-type. | +| `ONLY_LOG_JSON_MAX_CONTENT_LENGTH` | ❌ | `1048576` | When `ENABLE_ONLY_LOG_JSON` is `true`, the sensor will only read request or response bodies to check if they're valid JSON if their length is less than `ONLY_LOG_JSON_MAX_CONTENT_LENGTH` bytes. | | `FIRETAIL_API_URL` | ❌ | `https://api.logging.eu-west-1.prod.firetail.app/logs/bulk` | The API url the sensor will send logs to. Defaults to the EU region production environment. | | `FIRETAIL_KUBERNETES_SENSOR_DEV_MODE` | ❌ | `true` | Enables debug logging when set to `true`, and reduces the max age of a log in a batch to be sent to FireTail. | | `FIRETAIL_KUBERNETES_SENSOR_DEV_SERVER_ENABLED` | ❌ | `true` | Enables a demo web server when set to `true`; useful for sending test requests to. | diff --git a/src/is_json.go b/src/is_json.go index 3f64e43..8227540 100644 --- a/src/is_json.go +++ b/src/is_json.go @@ -9,8 +9,8 @@ import ( "strings" ) -func isJson(req_and_resp *httpRequestAndResponse) bool { - for _, headers := range []http.Header{req_and_resp.request.Header, req_and_resp.response.Header} { +func isJson(reqAndResp *httpRequestAndResponse, maxContentLength int64) bool { + for _, headers := range []http.Header{reqAndResp.request.Header, reqAndResp.response.Header} { contentTypeHeader := headers.Get("Content-Type") mediaType, _, err := mime.ParseMediaType(contentTypeHeader) if err == nil && mediaType == "application/json" { @@ -21,20 +21,29 @@ func isJson(req_and_resp *httpRequestAndResponse) bool { } } - bodyBytes, err := io.ReadAll(req_and_resp.request.Body) - req_and_resp.request.Body = io.NopCloser(io.MultiReader(bytes.NewReader(bodyBytes))) - if err != nil { - return false - } - var v map[string]interface{} - if json.Unmarshal(bodyBytes, &v) == nil { - return true + if reqAndResp.request.ContentLength <= maxContentLength { + bodyBytes, err := io.ReadAll(reqAndResp.request.Body) + reqAndResp.request.Body = io.NopCloser(io.MultiReader(bytes.NewReader(bodyBytes))) + if err != nil { + return false + } + var v map[string]interface{} + if json.Unmarshal(bodyBytes, &v) == nil { + return true + } } - bodyBytes, err = io.ReadAll(req_and_resp.response.Body) - req_and_resp.response.Body = io.NopCloser(io.MultiReader(bytes.NewReader(bodyBytes))) - if err != nil { - return false + if reqAndResp.response.ContentLength <= maxContentLength { + bodyBytes, err := io.ReadAll(reqAndResp.response.Body) + reqAndResp.response.Body = io.NopCloser(io.MultiReader(bytes.NewReader(bodyBytes))) + if err != nil { + return false + } + var v map[string]interface{} + if json.Unmarshal(bodyBytes, &v) == nil { + return true + } } - return json.Unmarshal(bodyBytes, &v) == nil + + return false } diff --git a/src/is_json_test.go b/src/is_json_test.go index ab02280..25212f7 100644 --- a/src/is_json_test.go +++ b/src/is_json_test.go @@ -9,92 +9,139 @@ import ( func TestIsJson(t *testing.T) { tests := []struct { - name string - reqContentType string - reqBody string - respContentType string - respBody string - expectedResult bool + name string + reqContentType string + reqBody string + respContentType string + respBody string + maxContentLength int64 + expectedResult bool }{ { - name: "Valid JSON in both request and response with correct content types", - reqContentType: "application/json", - reqBody: `{"key": "value"}`, - respContentType: "application/json", - respBody: `{"key": "value"}`, - expectedResult: true, + name: "Valid JSON in both request and response with correct content types", + reqContentType: "application/json", + reqBody: `{"key": "value"}`, + respContentType: "application/json", + respBody: `{"key": "value"}`, + maxContentLength: 1024, + expectedResult: true, }, { - name: "XML in request and response with correct Content-Type", - reqContentType: "application/xml", - reqBody: `value`, - respContentType: "application/xml", - respBody: `value`, - expectedResult: false, + name: "XML in request and response with correct Content-Type", + reqContentType: "application/xml", + reqBody: `value`, + respContentType: "application/xml", + respBody: `value`, + maxContentLength: 1024, + expectedResult: false, }, { - name: "XML in request with JSON in response", - reqContentType: "application/xml", - reqBody: `value`, - respContentType: "application/json", - respBody: `{"key": "value"}`, - expectedResult: true, + name: "XML in request with JSON in response", + reqContentType: "application/xml", + reqBody: `value`, + respContentType: "application/json", + respBody: `{"key": "value"}`, + maxContentLength: 1024, + expectedResult: true, }, { - name: "JSON in request with XML in response", - reqContentType: "application/json", - reqBody: `{"key": "value"}`, - respContentType: "application/xml", - respBody: `value`, - expectedResult: true, + name: "JSON in request with XML in response", + reqContentType: "application/json", + reqBody: `{"key": "value"}`, + respContentType: "application/xml", + respBody: `value`, + maxContentLength: 1024, + expectedResult: true, }, { - name: "Empty request and response bodies and headers", - reqContentType: "", - reqBody: "", - respContentType: "", - respBody: "", - expectedResult: false, + name: "Empty request and response bodies and headers", + reqContentType: "", + reqBody: "", + respContentType: "", + respBody: "", + maxContentLength: 1024, + expectedResult: false, }, { - name: "No content-type headers with valid JSON in request", - reqContentType: "", - reqBody: `{"key": "value"}`, - respContentType: "", - respBody: ``, - expectedResult: true, + name: "No content-type headers with valid JSON in request", + reqContentType: "", + reqBody: `{"key": "value"}`, + respContentType: "", + respBody: ``, + maxContentLength: 1024, + expectedResult: true, }, { - name: "No content-type headers with valid JSON in response", - reqContentType: "", - reqBody: ``, - respContentType: "", - respBody: `{"key": "value"}`, - expectedResult: true, + name: "No content-type headers with valid JSON in response", + reqContentType: "", + reqBody: ``, + respContentType: "", + respBody: `{"key": "value"}`, + maxContentLength: 1024, + expectedResult: true, }, { - name: "No content-type headers with invalid JSON in request", - reqContentType: "", - reqBody: `{"key": "value"`, - respContentType: "", - respBody: ``, - expectedResult: false, + name: "No content-type headers with invalid JSON in request", + reqContentType: "", + reqBody: `{"key": "value"`, + respContentType: "", + respBody: ``, + maxContentLength: 1024, + expectedResult: false, }, { - name: "No content-type headers with invalid JSON in response", - reqContentType: "", - reqBody: ``, - respContentType: "", - respBody: `{"key": "value"`, - expectedResult: false, + name: "No content-type headers with invalid JSON in response", + reqContentType: "", + reqBody: ``, + respContentType: "", + respBody: `{"key": "value"`, + maxContentLength: 1024, + expectedResult: false, }, { - name: "Content-type geo+json in request with invalid body", - reqContentType: "application/geo+json", - reqBody: ``, - respContentType: "", - respBody: ``, - expectedResult: true, + name: "Content-type geo+json in request with invalid body", + reqContentType: "application/geo+json", + reqBody: ``, + respContentType: "", + respBody: ``, + maxContentLength: 1024, + expectedResult: true, + }, + { + name: "No content-type headers with request payload longer than max length", + reqContentType: "", + reqBody: strings.Repeat("a", 1025), + respContentType: "", + respBody: ``, + maxContentLength: 1024, + expectedResult: false, + }, + { + name: "No content-type headers with response payload longer than max length", + reqContentType: "", + reqBody: ``, + respContentType: "", + respBody: strings.Repeat("a", 1025), + maxContentLength: 1024, + expectedResult: false, + }, + { + name: "No content-type headers with request payload longer than max length and response payload shorter", + reqContentType: "", + reqBody: strings.Repeat("a", 1025), + respContentType: "", + respBody: `{"key": "value"}`, + maxContentLength: 1024, + expectedResult: true, + }, + { + name: "No content-type headers with request payload shorter than max length and response payload longer", + reqContentType: "", + reqBody: `{"key": "value"}`, + respContentType: "", + respBody: strings.Repeat("a", 1025), + maxContentLength: 1024, + expectedResult: true, }, } @@ -121,7 +168,7 @@ func TestIsJson(t *testing.T) { response: resp, } - result := isJson(&reqAndResp) + result := isJson(&reqAndResp, tt.maxContentLength) if result != tt.expectedResult { t.Errorf("isJson() = %v, want %v", result, tt.expectedResult) } diff --git a/src/main.go b/src/main.go index 4296211..594bb55 100644 --- a/src/main.go +++ b/src/main.go @@ -54,7 +54,21 @@ func main() { ipManager = newServiceIpManager() } + var maxContentLength int64 onlyLogJson, _ := strconv.ParseBool(os.Getenv("ENABLE_ONLY_LOG_JSON")) + if onlyLogJson { + maxContentLengthStr, maxContentLengthSet := os.LookupEnv("ONLY_LOG_JSON_MAX_CONTENT_LENGTH") + if !maxContentLengthSet { + slog.Info("ONLY_LOG_JSON_MAX_CONTENT_LENGTH environment variable not set, using default: 1MiB") + maxContentLength = 1048576 // 1MiB + } else { + maxContentLength, err = strconv.ParseInt(maxContentLengthStr, 10, 64) + if err != nil { + slog.Error("Failed to parse ONLY_LOG_JSON_MAX_CONTENT_LENGTH, Defaulting to 1MiB.", "Err", err.Error()) + maxContentLength = 1048576 // 1MiB + } + } + } requestAndResponseChannel := make(chan httpRequestAndResponse, 1) httpRequestStreamer := &httpRequestAndResponseStreamer{ @@ -93,7 +107,7 @@ func main() { ) continue } - if onlyLogJson && !isJson(&requestAndResponse) { + if onlyLogJson && !isJson(&requestAndResponse, maxContentLength) { slog.Debug( "Ignoring non-JSON request:", "Src", requestAndResponse.src, From 4f27e35aa13d06447c6ef638cc9050b5304b4904 Mon Sep 17 00:00:00 2001 From: theteacat Date: Mon, 12 May 2025 10:37:35 +0100 Subject: [PATCH 11/15] fix test cases with long json payloads to have valid json payloads --- src/is_json_test.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/is_json_test.go b/src/is_json_test.go index 25212f7..43301d7 100644 --- a/src/is_json_test.go +++ b/src/is_json_test.go @@ -110,7 +110,7 @@ func TestIsJson(t *testing.T) { { name: "No content-type headers with request payload longer than max length", reqContentType: "", - reqBody: strings.Repeat("a", 1025), + reqBody: `{"key": "` + strings.Repeat("a", 1025) + `"}`, respContentType: "", respBody: ``, maxContentLength: 1024, @@ -121,7 +121,7 @@ func TestIsJson(t *testing.T) { reqContentType: "", reqBody: ``, respContentType: "", - respBody: strings.Repeat("a", 1025), + respBody: `{"key": "` + strings.Repeat("a", 1025) + `"}`, maxContentLength: 1024, expectedResult: false, }, @@ -156,8 +156,9 @@ func TestIsJson(t *testing.T) { } resp := &http.Response{ - Header: make(http.Header), - Body: io.NopCloser(strings.NewReader(tt.respBody)), + Header: make(http.Header), + Body: io.NopCloser(strings.NewReader(tt.respBody)), + ContentLength: int64(len(tt.respBody)), } if tt.respContentType != "" { resp.Header.Set("Content-Type", tt.respContentType) From 230e8c202ba73cf39288f4d80871e94c9023bfcd Mon Sep 17 00:00:00 2001 From: theteacat Date: Mon, 12 May 2025 11:19:08 +0100 Subject: [PATCH 12/15] defer closing channels --- src/bidirectional_stream.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/bidirectional_stream.go b/src/bidirectional_stream.go index e75cf14..18e8e23 100644 --- a/src/bidirectional_stream.go +++ b/src/bidirectional_stream.go @@ -53,6 +53,8 @@ func (s *bidirectionalStream) run() { requestChannel := make(chan *http.Request, 1) responseChannel := make(chan *http.Response, 1) + defer close(requestChannel) + defer close(responseChannel) go func() { reader := bufio.NewReader(&s.clientToServer) @@ -101,8 +103,6 @@ func (s *bidirectionalStream) run() { capturedRequest := <-requestChannel capturedResponse := <-responseChannel - close(requestChannel) - close(responseChannel) *s.requestAndResponseChannel <- httpRequestAndResponse{ request: capturedRequest, From fa0f586768b8e37d83cb13277d91c9ffa55ad5f7 Mon Sep 17 00:00:00 2001 From: theteacat Date: Mon, 12 May 2025 12:00:01 +0100 Subject: [PATCH 13/15] Recover from failures in req and resp readers --- src/bidirectional_stream.go | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/bidirectional_stream.go b/src/bidirectional_stream.go index 18e8e23..7849a96 100644 --- a/src/bidirectional_stream.go +++ b/src/bidirectional_stream.go @@ -5,6 +5,7 @@ import ( "bytes" "fmt" "io" + "log/slog" "net/http" "sync" @@ -57,11 +58,16 @@ func (s *bidirectionalStream) run() { defer close(responseChannel) go func() { + defer func() { + if r := recover(); r != nil { + slog.Warn("Recovered from panic in clientToServer reader:", "Err", r) + } + wg.Done() + }() reader := bufio.NewReader(&s.clientToServer) for { request, err := http.ReadRequest(reader) if err == io.EOF { - wg.Done() return } else if err != nil { continue @@ -80,11 +86,16 @@ func (s *bidirectionalStream) run() { }() go func() { + defer func() { + if r := recover(); r != nil { + slog.Warn("Recovered from panic in serverToClient reader:", "Err", r) + } + wg.Done() + }() reader := bufio.NewReader(&s.serverToClient) for { response, err := http.ReadResponse(reader, nil) if err == io.ErrUnexpectedEOF { - wg.Done() return } else if err != nil { continue From 03ea9d3804cece9950655e3e3ed9db05808dd935 Mon Sep 17 00:00:00 2001 From: theteacat Date: Mon, 12 May 2025 12:01:55 +0100 Subject: [PATCH 14/15] nonblocking reads for request and response channels --- src/bidirectional_stream.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/src/bidirectional_stream.go b/src/bidirectional_stream.go index 7849a96..ae5b417 100644 --- a/src/bidirectional_stream.go +++ b/src/bidirectional_stream.go @@ -112,8 +112,22 @@ func (s *bidirectionalStream) run() { wg.Wait() - capturedRequest := <-requestChannel - capturedResponse := <-responseChannel + var capturedRequest *http.Request + var capturedResponse *http.Response + + select { + case capturedRequest = <-requestChannel: + default: + slog.Warn("No request captured") + return + } + + select { + case capturedResponse = <-responseChannel: + default: + slog.Warn("No response captured") + return + } *s.requestAndResponseChannel <- httpRequestAndResponse{ request: capturedRequest, From c5e2648206125536c149291dd1da3b5bf205db13 Mon Sep 17 00:00:00 2001 From: theteacat Date: Mon, 12 May 2025 12:17:28 +0100 Subject: [PATCH 15/15] Improved logging on failure to capture request and/or response --- src/bidirectional_stream.go | 32 +++++++++++++++++++++++++++++--- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/src/bidirectional_stream.go b/src/bidirectional_stream.go index ae5b417..054daa7 100644 --- a/src/bidirectional_stream.go +++ b/src/bidirectional_stream.go @@ -118,14 +118,40 @@ func (s *bidirectionalStream) run() { select { case capturedRequest = <-requestChannel: default: - slog.Warn("No request captured") - return } select { case capturedResponse = <-responseChannel: default: - slog.Warn("No response captured") + } + + if capturedRequest == nil && capturedResponse == nil { + slog.Debug( + "No request or response captured from stream", + "Src", s.net.Src().String(), + "Dst", s.net.Dst().String(), + "SrcPort", s.transport.Src().String(), + "DstPort", s.transport.Dst().String(), + ) + } else if capturedRequest == nil { + slog.Warn( + "Captured response but no request from stream", + "Src", s.net.Src().String(), + "Dst", s.net.Dst().String(), + "SrcPort", s.transport.Src().String(), + "DstPort", s.transport.Dst().String(), + ) + } else if capturedResponse == nil { + slog.Warn( + "Captured request but no response from stream", + "Src", s.net.Src().String(), + "Dst", s.net.Dst().String(), + "SrcPort", s.transport.Src().String(), + "DstPort", s.transport.Dst().String(), + ) + } + + if capturedRequest == nil || capturedResponse == nil { return }