diff --git a/Makefile b/Makefile index 32a4777..cd232dd 100644 --- a/Makefile +++ b/Makefile @@ -23,4 +23,5 @@ dev: build-dev -e FIRETAIL_KUBERNETES_SENSOR_DEV_MODE=true \ -e FIRETAIL_KUBERNETES_SENSOR_DEV_SERVER_ENABLED=true \ -e DISABLE_SERVICE_IP_FILTERING=true \ + -e ENABLE_ONLY_LOG_JSON=true \ firetail/kubernetes-sensor-dev diff --git a/README.md b/README.md index 4b89cc4..9aada96 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,8 @@ POC for a FireTail Kubernetes Sensor. | `FIRETAIL_API_TOKEN` | ✅ | `PS-02-XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX-XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX` | The API token the sensor will use to report logs to FireTail | | `BPF_EXPRESSION` | ❌ | `tcp and (port 80 or port 443)` | The BPF filter used by the sensor. See docs for syntax info: https://www.tcpdump.org/manpages/pcap-filter.7.html | | `DISABLE_SERVICE_IP_FILTERING` | ❌ | `true` | Disables polling Kubernetes for the IP addresses of services & subsequently ignoring all requests captured that aren't made to one of those IPs. | +| `ENABLE_ONLY_LOG_JSON` | ❌ | `true` | Enables only logging requests where the content-type implies the payload should be JSON, or the payload is valid JSON regardless of the content-type. | +| `ONLY_LOG_JSON_MAX_CONTENT_LENGTH` | ❌ | `1048576` | When `ENABLE_ONLY_LOG_JSON` is `true`, the sensor will only read request or response bodies to check if they're valid JSON if their length is less than `ONLY_LOG_JSON_MAX_CONTENT_LENGTH` bytes. | | `FIRETAIL_API_URL` | ❌ | `https://api.logging.eu-west-1.prod.firetail.app/logs/bulk` | The API url the sensor will send logs to. Defaults to the EU region production environment. | | `FIRETAIL_KUBERNETES_SENSOR_DEV_MODE` | ❌ | `true` | Enables debug logging when set to `true`, and reduces the max age of a log in a batch to be sent to FireTail. | | `FIRETAIL_KUBERNETES_SENSOR_DEV_SERVER_ENABLED` | ❌ | `true` | Enables a demo web server when set to `true`; useful for sending test requests to. | diff --git a/src/bidirectionalStream.go b/src/bidirectional_stream.go similarity index 100% rename from src/bidirectionalStream.go rename to src/bidirectional_stream.go diff --git a/src/is_json.go b/src/is_json.go new file mode 100644 index 0000000..8227540 --- /dev/null +++ b/src/is_json.go @@ -0,0 +1,49 @@ +package main + +import ( + "bytes" + "encoding/json" + "io" + "mime" + "net/http" + "strings" +) + +func isJson(reqAndResp *httpRequestAndResponse, maxContentLength int64) bool { + for _, headers := range []http.Header{reqAndResp.request.Header, reqAndResp.response.Header} { + contentTypeHeader := headers.Get("Content-Type") + mediaType, _, err := mime.ParseMediaType(contentTypeHeader) + if err == nil && mediaType == "application/json" { + return true + } + if strings.HasSuffix(mediaType, "+json") { + return true + } + } + + if reqAndResp.request.ContentLength <= maxContentLength { + bodyBytes, err := io.ReadAll(reqAndResp.request.Body) + reqAndResp.request.Body = io.NopCloser(io.MultiReader(bytes.NewReader(bodyBytes))) + if err != nil { + return false + } + var v map[string]interface{} + if json.Unmarshal(bodyBytes, &v) == nil { + return true + } + } + + if reqAndResp.response.ContentLength <= maxContentLength { + bodyBytes, err := io.ReadAll(reqAndResp.response.Body) + reqAndResp.response.Body = io.NopCloser(io.MultiReader(bytes.NewReader(bodyBytes))) + if err != nil { + return false + } + var v map[string]interface{} + if json.Unmarshal(bodyBytes, &v) == nil { + return true + } + } + + return false +} diff --git a/src/is_json_test.go b/src/is_json_test.go new file mode 100644 index 0000000..43301d7 --- /dev/null +++ b/src/is_json_test.go @@ -0,0 +1,178 @@ +package main + +import ( + "io" + "net/http" + "strings" + "testing" +) + +func TestIsJson(t *testing.T) { + tests := []struct { + name string + reqContentType string + reqBody string + respContentType string + respBody string + maxContentLength int64 + expectedResult bool + }{ + { + name: "Valid JSON in both request and response with correct content types", + reqContentType: "application/json", + reqBody: `{"key": "value"}`, + respContentType: "application/json", + respBody: `{"key": "value"}`, + maxContentLength: 1024, + expectedResult: true, + }, + { + name: "XML in request and response with correct Content-Type", + reqContentType: "application/xml", + reqBody: `value`, + respContentType: "application/xml", + respBody: `value`, + maxContentLength: 1024, + expectedResult: false, + }, + { + name: "XML in request with JSON in response", + reqContentType: "application/xml", + reqBody: `value`, + respContentType: "application/json", + respBody: `{"key": "value"}`, + maxContentLength: 1024, + expectedResult: true, + }, + { + name: "JSON in request with XML in response", + reqContentType: "application/json", + reqBody: `{"key": "value"}`, + respContentType: "application/xml", + respBody: `value`, + maxContentLength: 1024, + expectedResult: true, + }, + { + name: "Empty request and response bodies and headers", + reqContentType: "", + reqBody: "", + respContentType: "", + respBody: "", + maxContentLength: 1024, + expectedResult: false, + }, + { + name: "No content-type headers with valid JSON in request", + reqContentType: "", + reqBody: `{"key": "value"}`, + respContentType: "", + respBody: ``, + maxContentLength: 1024, + expectedResult: true, + }, + { + name: "No content-type headers with valid JSON in response", + reqContentType: "", + reqBody: ``, + respContentType: "", + respBody: `{"key": "value"}`, + maxContentLength: 1024, + expectedResult: true, + }, + { + name: "No content-type headers with invalid JSON in request", + reqContentType: "", + reqBody: `{"key": "value"`, + respContentType: "", + respBody: ``, + maxContentLength: 1024, + expectedResult: false, + }, + { + name: "No content-type headers with invalid JSON in response", + reqContentType: "", + reqBody: ``, + respContentType: "", + respBody: `{"key": "value"`, + maxContentLength: 1024, + expectedResult: false, + }, + { + name: "Content-type geo+json in request with invalid body", + reqContentType: "application/geo+json", + reqBody: ``, + respContentType: "", + respBody: ``, + maxContentLength: 1024, + expectedResult: true, + }, + { + name: "No content-type headers with request payload longer than max length", + reqContentType: "", + reqBody: `{"key": "` + strings.Repeat("a", 1025) + `"}`, + respContentType: "", + respBody: ``, + maxContentLength: 1024, + expectedResult: false, + }, + { + name: "No content-type headers with response payload longer than max length", + reqContentType: "", + reqBody: ``, + respContentType: "", + respBody: `{"key": "` + strings.Repeat("a", 1025) + `"}`, + maxContentLength: 1024, + expectedResult: false, + }, + { + name: "No content-type headers with request payload longer than max length and response payload shorter", + reqContentType: "", + reqBody: strings.Repeat("a", 1025), + respContentType: "", + respBody: `{"key": "value"}`, + maxContentLength: 1024, + expectedResult: true, + }, + { + name: "No content-type headers with request payload shorter than max length and response payload longer", + reqContentType: "", + reqBody: `{"key": "value"}`, + respContentType: "", + respBody: strings.Repeat("a", 1025), + maxContentLength: 1024, + expectedResult: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + req, err := http.NewRequest("POST", "/", strings.NewReader(tt.reqBody)) + if err != nil { + t.Fatalf("Failed to create request: %v", err) + } + if tt.reqContentType != "" { + req.Header.Set("Content-Type", tt.reqContentType) + } + + resp := &http.Response{ + Header: make(http.Header), + Body: io.NopCloser(strings.NewReader(tt.respBody)), + ContentLength: int64(len(tt.respBody)), + } + if tt.respContentType != "" { + resp.Header.Set("Content-Type", tt.respContentType) + } + + reqAndResp := httpRequestAndResponse{ + request: req, + response: resp, + } + + result := isJson(&reqAndResp, tt.maxContentLength) + if result != tt.expectedResult { + t.Errorf("isJson() = %v, want %v", result, tt.expectedResult) + } + }) + } +} diff --git a/src/main.go b/src/main.go index 420338a..594bb55 100644 --- a/src/main.go +++ b/src/main.go @@ -15,23 +15,26 @@ import ( ) func main() { + logsApiToken, logsApiTokenSet := os.LookupEnv("FIRETAIL_API_TOKEN") + if !logsApiTokenSet { + log.Fatal("FIRETAIL_API_TOKEN environment variable not set") + } + devEnabled, _ := strconv.ParseBool(os.Getenv("FIRETAIL_KUBERNETES_SENSOR_DEV_MODE")) if devEnabled { slog.Warn("🧰 Development mode enabled, setting log level to debug...") slog.SetLogLoggerLevel(slog.LevelDebug) } - logsApiToken, logsApiTokenSet := os.LookupEnv("FIRETAIL_API_TOKEN") - if !logsApiTokenSet { - log.Fatal("FIRETAIL_API_TOKEN environment variable not set") - } - - var ipManager *serviceIpManager - if disableServiceIpFilter, err := strconv.ParseBool(os.Getenv("DISABLE_SERVICE_IP_FILTERING")); !(err == nil && disableServiceIpFilter) { - slog.Info( - "Service IP filter enabled, monitoring service IPs...", - ) - ipManager = newServiceIpManager() + devServerEnabled, err := strconv.ParseBool(os.Getenv("FIRETAIL_KUBERNETES_SENSOR_DEV_SERVER_ENABLED")) + if err == nil && devServerEnabled { + slog.Warn("🧰 Development server enabled, starting example HTTP server...") + go func() { + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, "Hello, %s!", r.URL.Path[1:]) + }) + log.Fatal(http.ListenAndServe(":80", nil)) + }() } bpfExpression, bpfExpressionSet := os.LookupEnv("BPF_EXPRESSION") @@ -43,15 +46,28 @@ func main() { bpfExpression = "tcp and (port 80 or port 443)" } - devServerEnabled, err := strconv.ParseBool(os.Getenv("FIRETAIL_KUBERNETES_SENSOR_DEV_SERVER_ENABLED")) - if err == nil && devServerEnabled { - slog.Warn("🧰 Development server enabled, starting example HTTP server...") - go func() { - http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - fmt.Fprintf(w, "Hello, %s!", r.URL.Path[1:]) - }) - log.Fatal(http.ListenAndServe(":80", nil)) - }() + var ipManager *serviceIpManager + if disableServiceIpFilter, err := strconv.ParseBool(os.Getenv("DISABLE_SERVICE_IP_FILTERING")); !(err == nil && disableServiceIpFilter) { + slog.Info( + "Service IP filter enabled, monitoring service IPs...", + ) + ipManager = newServiceIpManager() + } + + var maxContentLength int64 + onlyLogJson, _ := strconv.ParseBool(os.Getenv("ENABLE_ONLY_LOG_JSON")) + if onlyLogJson { + maxContentLengthStr, maxContentLengthSet := os.LookupEnv("ONLY_LOG_JSON_MAX_CONTENT_LENGTH") + if !maxContentLengthSet { + slog.Info("ONLY_LOG_JSON_MAX_CONTENT_LENGTH environment variable not set, using default: 1MiB") + maxContentLength = 1048576 // 1MiB + } else { + maxContentLength, err = strconv.ParseInt(maxContentLengthStr, 10, 64) + if err != nil { + slog.Error("Failed to parse ONLY_LOG_JSON_MAX_CONTENT_LENGTH, Defaulting to 1MiB.", "Err", err.Error()) + maxContentLength = 1048576 // 1MiB + } + } } requestAndResponseChannel := make(chan httpRequestAndResponse, 1) @@ -91,6 +107,16 @@ func main() { ) continue } + if onlyLogJson && !isJson(&requestAndResponse, maxContentLength) { + slog.Debug( + "Ignoring non-JSON request:", + "Src", requestAndResponse.src, + "Dst", requestAndResponse.dst, + "SrcPort", requestAndResponse.srcPort, + "DstPort", requestAndResponse.dstPort, + ) + continue + } slog.Debug( "Captured request and response:", "Method", requestAndResponse.request.Method, diff --git a/src/requestAndResponse.go b/src/request_and_response.go similarity index 100% rename from src/requestAndResponse.go rename to src/request_and_response.go diff --git a/src/serviceIpManager.go b/src/service_ip_manager.go similarity index 100% rename from src/serviceIpManager.go rename to src/service_ip_manager.go