diff --git a/README.md b/README.md index 9aada96..8c28c13 100644 --- a/README.md +++ b/README.md @@ -10,9 +10,9 @@ POC for a FireTail Kubernetes Sensor. | ----------------------------------------------- | --------- | ------------------------------------------------------------ | ------------------------------------------------------------ | | `FIRETAIL_API_TOKEN` | ✅ | `PS-02-XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX-XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX` | The API token the sensor will use to report logs to FireTail | | `BPF_EXPRESSION` | ❌ | `tcp and (port 80 or port 443)` | The BPF filter used by the sensor. See docs for syntax info: https://www.tcpdump.org/manpages/pcap-filter.7.html | -| `DISABLE_SERVICE_IP_FILTERING` | ❌ | `true` | Disables polling Kubernetes for the IP addresses of services & subsequently ignoring all requests captured that aren't made to one of those IPs. | +| `MAX_CONTENT_LENGTH` | ❌ | `1048576` | The sensor will only read request or response bodies if their length is less than `MAX_CONTENT_LENGTH` bytes. | | `ENABLE_ONLY_LOG_JSON` | ❌ | `true` | Enables only logging requests where the content-type implies the payload should be JSON, or the payload is valid JSON regardless of the content-type. | -| `ONLY_LOG_JSON_MAX_CONTENT_LENGTH` | ❌ | `1048576` | When `ENABLE_ONLY_LOG_JSON` is `true`, the sensor will only read request or response bodies to check if they're valid JSON if their length is less than `ONLY_LOG_JSON_MAX_CONTENT_LENGTH` bytes. | +| `DISABLE_SERVICE_IP_FILTERING` | ❌ | `true` | Disables polling Kubernetes for the IP addresses of services & subsequently ignoring all requests captured that aren't made to one of those IPs. | | `FIRETAIL_API_URL` | ❌ | `https://api.logging.eu-west-1.prod.firetail.app/logs/bulk` | The API url the sensor will send logs to. Defaults to the EU region production environment. | | `FIRETAIL_KUBERNETES_SENSOR_DEV_MODE` | ❌ | `true` | Enables debug logging when set to `true`, and reduces the max age of a log in a batch to be sent to FireTail. | | `FIRETAIL_KUBERNETES_SENSOR_DEV_SERVER_ENABLED` | ❌ | `true` | Enables a demo web server when set to `true`; useful for sending test requests to. | diff --git a/build_setup/Dockerfile b/build_setup/Dockerfile index b877994..4dd819a 100644 --- a/build_setup/Dockerfile +++ b/build_setup/Dockerfile @@ -1,8 +1,9 @@ FROM golang:1.24-bullseye WORKDIR /src RUN apt-get update && apt-get install -y --no-install-recommends libpcap-dev -COPY ./src/ ./ +COPY ./src/go.* ./ RUN go mod download +COPY ./src/ ./ RUN go build -o /dist/main . RUN rm -rf /src/* RUN chmod +x /dist/main diff --git a/src/bidirectional_stream.go b/src/bidirectional_stream.go index 054daa7..84555c9 100644 --- a/src/bidirectional_stream.go +++ b/src/bidirectional_stream.go @@ -15,16 +15,17 @@ import ( ) type bidirectionalStreamFactory struct { - conns map[string]*bidirectionalStream + conns *sync.Map requestAndResponseChannel *chan httpRequestAndResponse + maxBodySize int64 } func (f *bidirectionalStreamFactory) New(netFlow, tcpFlow gopacket.Flow) tcpassembly.Stream { key := netFlow.FastHash() ^ tcpFlow.FastHash() // The second time we see the same connection, it will be from the server to the client - if conn, ok := f.conns[fmt.Sprint(key)]; ok { - return &conn.serverToClient + if conn, ok := f.conns.Load(fmt.Sprint(key)); ok { + return &conn.(*bidirectionalStream).serverToClient } s := &bidirectionalStream{ @@ -33,8 +34,12 @@ func (f *bidirectionalStreamFactory) New(netFlow, tcpFlow gopacket.Flow) tcpasse clientToServer: tcpreader.NewReaderStream(), serverToClient: tcpreader.NewReaderStream(), requestAndResponseChannel: f.requestAndResponseChannel, + closeCallback: func() { + f.conns.Delete(fmt.Sprint(key)) + }, + maxBodySize: f.maxBodySize, } - f.conns[fmt.Sprint(key)] = s + f.conns.Store(fmt.Sprint(key), s) go s.run() // The first time we see the connection, it will be from the client to the server @@ -46,9 +51,13 @@ type bidirectionalStream struct { clientToServer tcpreader.ReaderStream serverToClient tcpreader.ReaderStream requestAndResponseChannel *chan httpRequestAndResponse + closeCallback func() + maxBodySize int64 } func (s *bidirectionalStream) run() { + defer s.closeCallback() + wg := sync.WaitGroup{} wg.Add(2) @@ -58,11 +67,12 @@ func (s *bidirectionalStream) run() { defer close(responseChannel) go func() { + defer wg.Done() + defer s.clientToServer.Close() defer func() { if r := recover(); r != nil { - slog.Warn("Recovered from panic in clientToServer reader:", "Err", r) + slog.Error("Recovered from panic in clientToServer reader:", "Err", r) } - wg.Done() }() reader := bufio.NewReader(&s.clientToServer) for { @@ -74,23 +84,26 @@ func (s *bidirectionalStream) run() { } // RemoteAddr is not filled in by ReadRequest so we have to populate it ourselves request.RemoteAddr = fmt.Sprintf("%s:%s", s.net.Src().String(), s.transport.Src().String()) - responseBody := make([]byte, request.ContentLength) - if request.ContentLength > 0 { - io.ReadFull(request.Body, responseBody) + if request.ContentLength > 0 && request.ContentLength < s.maxBodySize { + responseBody := make([]byte, request.ContentLength) + if request.ContentLength > 0 { + io.ReadFull(request.Body, responseBody) + } + request.Body.Close() + request.Body = io.NopCloser(bytes.NewReader(responseBody)) } - request.Body.Close() - request.Body = io.NopCloser(bytes.NewReader(responseBody)) requestChannel <- request } }() go func() { + defer wg.Done() + defer s.serverToClient.Close() defer func() { if r := recover(); r != nil { - slog.Warn("Recovered from panic in serverToClient reader:", "Err", r) + slog.Error("Recovered from panic in serverToClient reader:", "Err", r) } - wg.Done() }() reader := bufio.NewReader(&s.serverToClient) for { @@ -100,12 +113,14 @@ func (s *bidirectionalStream) run() { } else if err != nil { continue } - responseBody := make([]byte, response.ContentLength) - if response.ContentLength > 0 { - io.ReadFull(response.Body, responseBody) + if response.ContentLength > 0 && response.ContentLength < s.maxBodySize { + responseBody := make([]byte, response.ContentLength) + if response.ContentLength > 0 { + io.ReadFull(response.Body, responseBody) + } + response.Body.Close() + response.Body = io.NopCloser(bytes.NewReader(responseBody)) } - response.Body.Close() - response.Body = io.NopCloser(bytes.NewReader(responseBody)) responseChannel <- response } }() diff --git a/src/main.go b/src/main.go index 594bb55..940d71f 100644 --- a/src/main.go +++ b/src/main.go @@ -55,26 +55,23 @@ func main() { } var maxContentLength int64 - onlyLogJson, _ := strconv.ParseBool(os.Getenv("ENABLE_ONLY_LOG_JSON")) - if onlyLogJson { - maxContentLengthStr, maxContentLengthSet := os.LookupEnv("ONLY_LOG_JSON_MAX_CONTENT_LENGTH") - if !maxContentLengthSet { - slog.Info("ONLY_LOG_JSON_MAX_CONTENT_LENGTH environment variable not set, using default: 1MiB") - maxContentLength = 1048576 // 1MiB - } else { - maxContentLength, err = strconv.ParseInt(maxContentLengthStr, 10, 64) - if err != nil { - slog.Error("Failed to parse ONLY_LOG_JSON_MAX_CONTENT_LENGTH, Defaulting to 1MiB.", "Err", err.Error()) - maxContentLength = 1048576 // 1MiB - } - } + maxContentLengthStr, maxContentLengthSet := os.LookupEnv("MAX_CONTENT_LENGTH") + if !maxContentLengthSet { + slog.Info("MAX_CONTENT_LENGTH environment variable not set, using default: 1MiB") + maxContentLength = 1048576 // 1MiB + } else if maxContentLength, err = strconv.ParseInt(maxContentLengthStr, 10, 64); err != nil { + slog.Error("Failed to parse MAX_CONTENT_LENGTH, Defaulting to 1MiB.", "Err", err.Error()) + maxContentLength = 1048576 // 1MiB } + onlyLogJson, _ := strconv.ParseBool(os.Getenv("ENABLE_ONLY_LOG_JSON")) + requestAndResponseChannel := make(chan httpRequestAndResponse, 1) httpRequestStreamer := &httpRequestAndResponseStreamer{ bpfExpression: bpfExpression, requestAndResponseChannel: &requestAndResponseChannel, ipManager: ipManager, + maxBodySize: maxContentLength, } go httpRequestStreamer.start() diff --git a/src/request_and_response.go b/src/request_and_response.go index 28384ce..d88ab90 100644 --- a/src/request_and_response.go +++ b/src/request_and_response.go @@ -4,6 +4,7 @@ import ( "log" "log/slog" "net/http" + "sync" "time" "github.com/google/gopacket" @@ -25,6 +26,7 @@ type httpRequestAndResponseStreamer struct { bpfExpression string requestAndResponseChannel *chan httpRequestAndResponse ipManager *serviceIpManager + maxBodySize int64 } func (s *httpRequestAndResponseStreamer) start() { @@ -42,8 +44,9 @@ func (s *httpRequestAndResponseStreamer) start() { assembler := tcpassembly.NewAssembler( tcpassembly.NewStreamPool( &bidirectionalStreamFactory{ - conns: make(map[string]*bidirectionalStream), + conns: &sync.Map{}, requestAndResponseChannel: s.requestAndResponseChannel, + maxBodySize: s.maxBodySize, }, ), )