diff --git a/Makefile b/Makefile
index 32a4777..cd232dd 100644
--- a/Makefile
+++ b/Makefile
@@ -23,4 +23,5 @@ dev: build-dev
-e FIRETAIL_KUBERNETES_SENSOR_DEV_MODE=true \
-e FIRETAIL_KUBERNETES_SENSOR_DEV_SERVER_ENABLED=true \
-e DISABLE_SERVICE_IP_FILTERING=true \
+ -e ENABLE_ONLY_LOG_JSON=true \
firetail/kubernetes-sensor-dev
diff --git a/README.md b/README.md
index 4b89cc4..9aada96 100644
--- a/README.md
+++ b/README.md
@@ -11,6 +11,8 @@ POC for a FireTail Kubernetes Sensor.
| `FIRETAIL_API_TOKEN` | ✅ | `PS-02-XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX-XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX` | The API token the sensor will use to report logs to FireTail |
| `BPF_EXPRESSION` | ❌ | `tcp and (port 80 or port 443)` | The BPF filter used by the sensor. See docs for syntax info: https://www.tcpdump.org/manpages/pcap-filter.7.html |
| `DISABLE_SERVICE_IP_FILTERING` | ❌ | `true` | Disables polling Kubernetes for the IP addresses of services & subsequently ignoring all requests captured that aren't made to one of those IPs. |
+| `ENABLE_ONLY_LOG_JSON` | ❌ | `true` | Enables only logging requests where the content-type implies the payload should be JSON, or the payload is valid JSON regardless of the content-type. |
+| `ONLY_LOG_JSON_MAX_CONTENT_LENGTH` | ❌ | `1048576` | When `ENABLE_ONLY_LOG_JSON` is `true`, the sensor will only read request or response bodies to check if they're valid JSON if their length is less than `ONLY_LOG_JSON_MAX_CONTENT_LENGTH` bytes. |
| `FIRETAIL_API_URL` | ❌ | `https://api.logging.eu-west-1.prod.firetail.app/logs/bulk` | The API url the sensor will send logs to. Defaults to the EU region production environment. |
| `FIRETAIL_KUBERNETES_SENSOR_DEV_MODE` | ❌ | `true` | Enables debug logging when set to `true`, and reduces the max age of a log in a batch to be sent to FireTail. |
| `FIRETAIL_KUBERNETES_SENSOR_DEV_SERVER_ENABLED` | ❌ | `true` | Enables a demo web server when set to `true`; useful for sending test requests to. |
diff --git a/src/bidirectionalStream.go b/src/bidirectional_stream.go
similarity index 67%
rename from src/bidirectionalStream.go
rename to src/bidirectional_stream.go
index e75cf14..054daa7 100644
--- a/src/bidirectionalStream.go
+++ b/src/bidirectional_stream.go
@@ -5,6 +5,7 @@ import (
"bytes"
"fmt"
"io"
+ "log/slog"
"net/http"
"sync"
@@ -53,13 +54,20 @@ func (s *bidirectionalStream) run() {
requestChannel := make(chan *http.Request, 1)
responseChannel := make(chan *http.Response, 1)
+ defer close(requestChannel)
+ defer close(responseChannel)
go func() {
+ defer func() {
+ if r := recover(); r != nil {
+ slog.Warn("Recovered from panic in clientToServer reader:", "Err", r)
+ }
+ wg.Done()
+ }()
reader := bufio.NewReader(&s.clientToServer)
for {
request, err := http.ReadRequest(reader)
if err == io.EOF {
- wg.Done()
return
} else if err != nil {
continue
@@ -78,11 +86,16 @@ func (s *bidirectionalStream) run() {
}()
go func() {
+ defer func() {
+ if r := recover(); r != nil {
+ slog.Warn("Recovered from panic in serverToClient reader:", "Err", r)
+ }
+ wg.Done()
+ }()
reader := bufio.NewReader(&s.serverToClient)
for {
response, err := http.ReadResponse(reader, nil)
if err == io.ErrUnexpectedEOF {
- wg.Done()
return
} else if err != nil {
continue
@@ -99,10 +112,48 @@ func (s *bidirectionalStream) run() {
wg.Wait()
- capturedRequest := <-requestChannel
- capturedResponse := <-responseChannel
- close(requestChannel)
- close(responseChannel)
+ var capturedRequest *http.Request
+ var capturedResponse *http.Response
+
+ select {
+ case capturedRequest = <-requestChannel:
+ default:
+ }
+
+ select {
+ case capturedResponse = <-responseChannel:
+ default:
+ }
+
+ if capturedRequest == nil && capturedResponse == nil {
+ slog.Debug(
+ "No request or response captured from stream",
+ "Src", s.net.Src().String(),
+ "Dst", s.net.Dst().String(),
+ "SrcPort", s.transport.Src().String(),
+ "DstPort", s.transport.Dst().String(),
+ )
+ } else if capturedRequest == nil {
+ slog.Warn(
+ "Captured response but no request from stream",
+ "Src", s.net.Src().String(),
+ "Dst", s.net.Dst().String(),
+ "SrcPort", s.transport.Src().String(),
+ "DstPort", s.transport.Dst().String(),
+ )
+ } else if capturedResponse == nil {
+ slog.Warn(
+ "Captured request but no response from stream",
+ "Src", s.net.Src().String(),
+ "Dst", s.net.Dst().String(),
+ "SrcPort", s.transport.Src().String(),
+ "DstPort", s.transport.Dst().String(),
+ )
+ }
+
+ if capturedRequest == nil || capturedResponse == nil {
+ return
+ }
*s.requestAndResponseChannel <- httpRequestAndResponse{
request: capturedRequest,
diff --git a/src/is_json.go b/src/is_json.go
new file mode 100644
index 0000000..8227540
--- /dev/null
+++ b/src/is_json.go
@@ -0,0 +1,49 @@
+package main
+
+import (
+ "bytes"
+ "encoding/json"
+ "io"
+ "mime"
+ "net/http"
+ "strings"
+)
+
+func isJson(reqAndResp *httpRequestAndResponse, maxContentLength int64) bool {
+ for _, headers := range []http.Header{reqAndResp.request.Header, reqAndResp.response.Header} {
+ contentTypeHeader := headers.Get("Content-Type")
+ mediaType, _, err := mime.ParseMediaType(contentTypeHeader)
+ if err == nil && mediaType == "application/json" {
+ return true
+ }
+ if strings.HasSuffix(mediaType, "+json") {
+ return true
+ }
+ }
+
+ if reqAndResp.request.ContentLength <= maxContentLength {
+ bodyBytes, err := io.ReadAll(reqAndResp.request.Body)
+ reqAndResp.request.Body = io.NopCloser(io.MultiReader(bytes.NewReader(bodyBytes)))
+ if err != nil {
+ return false
+ }
+ var v map[string]interface{}
+ if json.Unmarshal(bodyBytes, &v) == nil {
+ return true
+ }
+ }
+
+ if reqAndResp.response.ContentLength <= maxContentLength {
+ bodyBytes, err := io.ReadAll(reqAndResp.response.Body)
+ reqAndResp.response.Body = io.NopCloser(io.MultiReader(bytes.NewReader(bodyBytes)))
+ if err != nil {
+ return false
+ }
+ var v map[string]interface{}
+ if json.Unmarshal(bodyBytes, &v) == nil {
+ return true
+ }
+ }
+
+ return false
+}
diff --git a/src/is_json_test.go b/src/is_json_test.go
new file mode 100644
index 0000000..43301d7
--- /dev/null
+++ b/src/is_json_test.go
@@ -0,0 +1,178 @@
+package main
+
+import (
+ "io"
+ "net/http"
+ "strings"
+ "testing"
+)
+
+func TestIsJson(t *testing.T) {
+ tests := []struct {
+ name string
+ reqContentType string
+ reqBody string
+ respContentType string
+ respBody string
+ maxContentLength int64
+ expectedResult bool
+ }{
+ {
+ name: "Valid JSON in both request and response with correct content types",
+ reqContentType: "application/json",
+ reqBody: `{"key": "value"}`,
+ respContentType: "application/json",
+ respBody: `{"key": "value"}`,
+ maxContentLength: 1024,
+ expectedResult: true,
+ },
+ {
+ name: "XML in request and response with correct Content-Type",
+ reqContentType: "application/xml",
+ reqBody: `value`,
+ respContentType: "application/xml",
+ respBody: `value`,
+ maxContentLength: 1024,
+ expectedResult: false,
+ },
+ {
+ name: "XML in request with JSON in response",
+ reqContentType: "application/xml",
+ reqBody: `value`,
+ respContentType: "application/json",
+ respBody: `{"key": "value"}`,
+ maxContentLength: 1024,
+ expectedResult: true,
+ },
+ {
+ name: "JSON in request with XML in response",
+ reqContentType: "application/json",
+ reqBody: `{"key": "value"}`,
+ respContentType: "application/xml",
+ respBody: `value`,
+ maxContentLength: 1024,
+ expectedResult: true,
+ },
+ {
+ name: "Empty request and response bodies and headers",
+ reqContentType: "",
+ reqBody: "",
+ respContentType: "",
+ respBody: "",
+ maxContentLength: 1024,
+ expectedResult: false,
+ },
+ {
+ name: "No content-type headers with valid JSON in request",
+ reqContentType: "",
+ reqBody: `{"key": "value"}`,
+ respContentType: "",
+ respBody: ``,
+ maxContentLength: 1024,
+ expectedResult: true,
+ },
+ {
+ name: "No content-type headers with valid JSON in response",
+ reqContentType: "",
+ reqBody: ``,
+ respContentType: "",
+ respBody: `{"key": "value"}`,
+ maxContentLength: 1024,
+ expectedResult: true,
+ },
+ {
+ name: "No content-type headers with invalid JSON in request",
+ reqContentType: "",
+ reqBody: `{"key": "value"`,
+ respContentType: "",
+ respBody: ``,
+ maxContentLength: 1024,
+ expectedResult: false,
+ },
+ {
+ name: "No content-type headers with invalid JSON in response",
+ reqContentType: "",
+ reqBody: ``,
+ respContentType: "",
+ respBody: `{"key": "value"`,
+ maxContentLength: 1024,
+ expectedResult: false,
+ },
+ {
+ name: "Content-type geo+json in request with invalid body",
+ reqContentType: "application/geo+json",
+ reqBody: ``,
+ respContentType: "",
+ respBody: ``,
+ maxContentLength: 1024,
+ expectedResult: true,
+ },
+ {
+ name: "No content-type headers with request payload longer than max length",
+ reqContentType: "",
+ reqBody: `{"key": "` + strings.Repeat("a", 1025) + `"}`,
+ respContentType: "",
+ respBody: ``,
+ maxContentLength: 1024,
+ expectedResult: false,
+ },
+ {
+ name: "No content-type headers with response payload longer than max length",
+ reqContentType: "",
+ reqBody: ``,
+ respContentType: "",
+ respBody: `{"key": "` + strings.Repeat("a", 1025) + `"}`,
+ maxContentLength: 1024,
+ expectedResult: false,
+ },
+ {
+ name: "No content-type headers with request payload longer than max length and response payload shorter",
+ reqContentType: "",
+ reqBody: strings.Repeat("a", 1025),
+ respContentType: "",
+ respBody: `{"key": "value"}`,
+ maxContentLength: 1024,
+ expectedResult: true,
+ },
+ {
+ name: "No content-type headers with request payload shorter than max length and response payload longer",
+ reqContentType: "",
+ reqBody: `{"key": "value"}`,
+ respContentType: "",
+ respBody: strings.Repeat("a", 1025),
+ maxContentLength: 1024,
+ expectedResult: true,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ req, err := http.NewRequest("POST", "/", strings.NewReader(tt.reqBody))
+ if err != nil {
+ t.Fatalf("Failed to create request: %v", err)
+ }
+ if tt.reqContentType != "" {
+ req.Header.Set("Content-Type", tt.reqContentType)
+ }
+
+ resp := &http.Response{
+ Header: make(http.Header),
+ Body: io.NopCloser(strings.NewReader(tt.respBody)),
+ ContentLength: int64(len(tt.respBody)),
+ }
+ if tt.respContentType != "" {
+ resp.Header.Set("Content-Type", tt.respContentType)
+ }
+
+ reqAndResp := httpRequestAndResponse{
+ request: req,
+ response: resp,
+ }
+
+ result := isJson(&reqAndResp, tt.maxContentLength)
+ if result != tt.expectedResult {
+ t.Errorf("isJson() = %v, want %v", result, tt.expectedResult)
+ }
+ })
+ }
+}
diff --git a/src/main.go b/src/main.go
index 420338a..594bb55 100644
--- a/src/main.go
+++ b/src/main.go
@@ -15,23 +15,26 @@ import (
)
func main() {
+ logsApiToken, logsApiTokenSet := os.LookupEnv("FIRETAIL_API_TOKEN")
+ if !logsApiTokenSet {
+ log.Fatal("FIRETAIL_API_TOKEN environment variable not set")
+ }
+
devEnabled, _ := strconv.ParseBool(os.Getenv("FIRETAIL_KUBERNETES_SENSOR_DEV_MODE"))
if devEnabled {
slog.Warn("🧰 Development mode enabled, setting log level to debug...")
slog.SetLogLoggerLevel(slog.LevelDebug)
}
- logsApiToken, logsApiTokenSet := os.LookupEnv("FIRETAIL_API_TOKEN")
- if !logsApiTokenSet {
- log.Fatal("FIRETAIL_API_TOKEN environment variable not set")
- }
-
- var ipManager *serviceIpManager
- if disableServiceIpFilter, err := strconv.ParseBool(os.Getenv("DISABLE_SERVICE_IP_FILTERING")); !(err == nil && disableServiceIpFilter) {
- slog.Info(
- "Service IP filter enabled, monitoring service IPs...",
- )
- ipManager = newServiceIpManager()
+ devServerEnabled, err := strconv.ParseBool(os.Getenv("FIRETAIL_KUBERNETES_SENSOR_DEV_SERVER_ENABLED"))
+ if err == nil && devServerEnabled {
+ slog.Warn("🧰 Development server enabled, starting example HTTP server...")
+ go func() {
+ http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
+ fmt.Fprintf(w, "Hello, %s!", r.URL.Path[1:])
+ })
+ log.Fatal(http.ListenAndServe(":80", nil))
+ }()
}
bpfExpression, bpfExpressionSet := os.LookupEnv("BPF_EXPRESSION")
@@ -43,15 +46,28 @@ func main() {
bpfExpression = "tcp and (port 80 or port 443)"
}
- devServerEnabled, err := strconv.ParseBool(os.Getenv("FIRETAIL_KUBERNETES_SENSOR_DEV_SERVER_ENABLED"))
- if err == nil && devServerEnabled {
- slog.Warn("🧰 Development server enabled, starting example HTTP server...")
- go func() {
- http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
- fmt.Fprintf(w, "Hello, %s!", r.URL.Path[1:])
- })
- log.Fatal(http.ListenAndServe(":80", nil))
- }()
+ var ipManager *serviceIpManager
+ if disableServiceIpFilter, err := strconv.ParseBool(os.Getenv("DISABLE_SERVICE_IP_FILTERING")); !(err == nil && disableServiceIpFilter) {
+ slog.Info(
+ "Service IP filter enabled, monitoring service IPs...",
+ )
+ ipManager = newServiceIpManager()
+ }
+
+ var maxContentLength int64
+ onlyLogJson, _ := strconv.ParseBool(os.Getenv("ENABLE_ONLY_LOG_JSON"))
+ if onlyLogJson {
+ maxContentLengthStr, maxContentLengthSet := os.LookupEnv("ONLY_LOG_JSON_MAX_CONTENT_LENGTH")
+ if !maxContentLengthSet {
+ slog.Info("ONLY_LOG_JSON_MAX_CONTENT_LENGTH environment variable not set, using default: 1MiB")
+ maxContentLength = 1048576 // 1MiB
+ } else {
+ maxContentLength, err = strconv.ParseInt(maxContentLengthStr, 10, 64)
+ if err != nil {
+ slog.Error("Failed to parse ONLY_LOG_JSON_MAX_CONTENT_LENGTH, Defaulting to 1MiB.", "Err", err.Error())
+ maxContentLength = 1048576 // 1MiB
+ }
+ }
}
requestAndResponseChannel := make(chan httpRequestAndResponse, 1)
@@ -91,6 +107,16 @@ func main() {
)
continue
}
+ if onlyLogJson && !isJson(&requestAndResponse, maxContentLength) {
+ slog.Debug(
+ "Ignoring non-JSON request:",
+ "Src", requestAndResponse.src,
+ "Dst", requestAndResponse.dst,
+ "SrcPort", requestAndResponse.srcPort,
+ "DstPort", requestAndResponse.dstPort,
+ )
+ continue
+ }
slog.Debug(
"Captured request and response:",
"Method", requestAndResponse.request.Method,
diff --git a/src/requestAndResponse.go b/src/request_and_response.go
similarity index 77%
rename from src/requestAndResponse.go
rename to src/request_and_response.go
index 186a182..28384ce 100644
--- a/src/requestAndResponse.go
+++ b/src/request_and_response.go
@@ -2,6 +2,7 @@ package main
import (
"log"
+ "log/slog"
"net/http"
"time"
@@ -58,6 +59,22 @@ func (s *httpRequestAndResponseStreamer) start() {
if !ok {
continue
}
+ net, ok := packet.NetworkLayer().(*layers.IPv4)
+ if !ok {
+ continue
+ }
+ src := net.SrcIP.String()
+ dst := net.DstIP.String()
+ if s.ipManager != nil && !(s.ipManager.isServiceIP(dst) || s.ipManager.isServiceIP(src)) {
+ slog.Debug(
+ "Ignoring packet not destined for or originating from a service IP:",
+ "Src", src,
+ "Dst", dst,
+ "SrcPort", tcp.SrcPort.String(),
+ "DstPort", tcp.DstPort.String(),
+ )
+ continue
+ }
assembler.AssembleWithTimestamp(packet.NetworkLayer().NetworkFlow(), tcp, packet.Metadata().Timestamp)
case <-ticker:
assembler.FlushOlderThan(time.Now().Add(-2 * time.Minute))
diff --git a/src/serviceIpManager.go b/src/service_ip_manager.go
similarity index 100%
rename from src/serviceIpManager.go
rename to src/service_ip_manager.go