Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
a188b9e
renames
TheTeaCat May 9, 2025
bf56105
initial is_json implementation
TheTeaCat May 9, 2025
8af652f
use is_json in main
TheTeaCat May 9, 2025
7cdfb1e
add ENABLE_ONLY_LOG_JSON env var to dev target in makefile
TheTeaCat May 9, 2025
9363737
ignore packets not destined for or originating from a service IP
TheTeaCat May 9, 2025
dd0a10e
Doc `ENABLE_ONLY_LOG_JSON` env var
TheTeaCat May 9, 2025
d8bc4cf
check both request and response payloads and headers
TheTeaCat May 12, 2025
e8e9fda
Merge pull request #3 from FireTail-io/filter-packets
rileyfiretail May 12, 2025
692bf93
add test case with +json
TheTeaCat May 12, 2025
adc37a2
update isJson to check for +json suffix in content types
TheTeaCat May 12, 2025
c7fdf66
Merge branch 'dev' into is-json
TheTeaCat May 12, 2025
ea270ae
Implement & doc ONLY_LOG_JSON_MAX_CONTENT_LENGTH env var
TheTeaCat May 12, 2025
4f27e35
fix test cases with long json payloads to have valid json payloads
TheTeaCat May 12, 2025
68e9d3c
Merge pull request #4 from FireTail-io/is-json
rileyfiretail May 12, 2025
230e8c2
defer closing channels
TheTeaCat May 12, 2025
fa0f586
Recover from failures in req and resp readers
TheTeaCat May 12, 2025
03ea9d3
nonblocking reads for request and response channels
TheTeaCat May 12, 2025
4876cf5
Merge pull request #6 from FireTail-io/defer-close
rileyfiretail May 12, 2025
c5e2648
Improved logging on failure to capture request and/or response
TheTeaCat May 12, 2025
6e4da80
Merge pull request #7 from FireTail-io/improved-logs
TheTeaCat May 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ dev: build-dev
-e FIRETAIL_KUBERNETES_SENSOR_DEV_MODE=true \
-e FIRETAIL_KUBERNETES_SENSOR_DEV_SERVER_ENABLED=true \
-e DISABLE_SERVICE_IP_FILTERING=true \
-e ENABLE_ONLY_LOG_JSON=true \
firetail/kubernetes-sensor-dev
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ POC for a FireTail Kubernetes Sensor.
| `FIRETAIL_API_TOKEN` | ✅ | `PS-02-XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX-XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX` | The API token the sensor will use to report logs to FireTail |
| `BPF_EXPRESSION` | ❌ | `tcp and (port 80 or port 443)` | The BPF filter used by the sensor. See docs for syntax info: https://www.tcpdump.org/manpages/pcap-filter.7.html |
| `DISABLE_SERVICE_IP_FILTERING` | ❌ | `true` | Disables polling Kubernetes for the IP addresses of services & subsequently ignoring all requests captured that aren't made to one of those IPs. |
| `ENABLE_ONLY_LOG_JSON` | ❌ | `true` | Enables only logging requests where the content-type implies the payload should be JSON, or the payload is valid JSON regardless of the content-type. |
| `ONLY_LOG_JSON_MAX_CONTENT_LENGTH` | ❌ | `1048576` | When `ENABLE_ONLY_LOG_JSON` is `true`, the sensor will only read request or response bodies to check if they're valid JSON if their length is less than `ONLY_LOG_JSON_MAX_CONTENT_LENGTH` bytes. |
| `FIRETAIL_API_URL` | ❌ | `https://api.logging.eu-west-1.prod.firetail.app/logs/bulk` | The API url the sensor will send logs to. Defaults to the EU region production environment. |
| `FIRETAIL_KUBERNETES_SENSOR_DEV_MODE` | ❌ | `true` | Enables debug logging when set to `true`, and reduces the max age of a log in a batch to be sent to FireTail. |
| `FIRETAIL_KUBERNETES_SENSOR_DEV_SERVER_ENABLED` | ❌ | `true` | Enables a demo web server when set to `true`; useful for sending test requests to. |
Expand Down
63 changes: 57 additions & 6 deletions src/bidirectionalStream.go → src/bidirectional_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"bytes"
"fmt"
"io"
"log/slog"
"net/http"
"sync"

Expand Down Expand Up @@ -53,13 +54,20 @@ func (s *bidirectionalStream) run() {

requestChannel := make(chan *http.Request, 1)
responseChannel := make(chan *http.Response, 1)
defer close(requestChannel)
defer close(responseChannel)

go func() {
defer func() {
if r := recover(); r != nil {
slog.Warn("Recovered from panic in clientToServer reader:", "Err", r)
}
wg.Done()
}()
reader := bufio.NewReader(&s.clientToServer)
for {
request, err := http.ReadRequest(reader)
if err == io.EOF {
wg.Done()
return
} else if err != nil {
continue
Expand All @@ -78,11 +86,16 @@ func (s *bidirectionalStream) run() {
}()

go func() {
defer func() {
if r := recover(); r != nil {
slog.Warn("Recovered from panic in serverToClient reader:", "Err", r)
}
wg.Done()
}()
reader := bufio.NewReader(&s.serverToClient)
for {
response, err := http.ReadResponse(reader, nil)
if err == io.ErrUnexpectedEOF {
wg.Done()
return
} else if err != nil {
continue
Expand All @@ -99,10 +112,48 @@ func (s *bidirectionalStream) run() {

wg.Wait()

capturedRequest := <-requestChannel
capturedResponse := <-responseChannel
close(requestChannel)
close(responseChannel)
var capturedRequest *http.Request
var capturedResponse *http.Response

select {
case capturedRequest = <-requestChannel:
default:
}

select {
case capturedResponse = <-responseChannel:
default:
}

if capturedRequest == nil && capturedResponse == nil {
slog.Debug(
"No request or response captured from stream",
"Src", s.net.Src().String(),
"Dst", s.net.Dst().String(),
"SrcPort", s.transport.Src().String(),
"DstPort", s.transport.Dst().String(),
)
} else if capturedRequest == nil {
slog.Warn(
"Captured response but no request from stream",
"Src", s.net.Src().String(),
"Dst", s.net.Dst().String(),
"SrcPort", s.transport.Src().String(),
"DstPort", s.transport.Dst().String(),
)
} else if capturedResponse == nil {
slog.Warn(
"Captured request but no response from stream",
"Src", s.net.Src().String(),
"Dst", s.net.Dst().String(),
"SrcPort", s.transport.Src().String(),
"DstPort", s.transport.Dst().String(),
)
}

if capturedRequest == nil || capturedResponse == nil {
return
}

*s.requestAndResponseChannel <- httpRequestAndResponse{
request: capturedRequest,
Expand Down
49 changes: 49 additions & 0 deletions src/is_json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package main

import (
"bytes"
"encoding/json"
"io"
"mime"
"net/http"
"strings"
)

func isJson(reqAndResp *httpRequestAndResponse, maxContentLength int64) bool {
for _, headers := range []http.Header{reqAndResp.request.Header, reqAndResp.response.Header} {
contentTypeHeader := headers.Get("Content-Type")
mediaType, _, err := mime.ParseMediaType(contentTypeHeader)
if err == nil && mediaType == "application/json" {
return true
}
if strings.HasSuffix(mediaType, "+json") {
return true
}
}

if reqAndResp.request.ContentLength <= maxContentLength {
bodyBytes, err := io.ReadAll(reqAndResp.request.Body)
reqAndResp.request.Body = io.NopCloser(io.MultiReader(bytes.NewReader(bodyBytes)))
if err != nil {
return false
}
var v map[string]interface{}
if json.Unmarshal(bodyBytes, &v) == nil {
return true
}
}

if reqAndResp.response.ContentLength <= maxContentLength {
bodyBytes, err := io.ReadAll(reqAndResp.response.Body)
reqAndResp.response.Body = io.NopCloser(io.MultiReader(bytes.NewReader(bodyBytes)))
if err != nil {
return false
}
var v map[string]interface{}
if json.Unmarshal(bodyBytes, &v) == nil {
return true
}
}

return false
}
178 changes: 178 additions & 0 deletions src/is_json_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package main

import (
"io"
"net/http"
"strings"
"testing"
)

func TestIsJson(t *testing.T) {
tests := []struct {
name string
reqContentType string
reqBody string
respContentType string
respBody string
maxContentLength int64
expectedResult bool
}{
{
name: "Valid JSON in both request and response with correct content types",
reqContentType: "application/json",
reqBody: `{"key": "value"}`,
respContentType: "application/json",
respBody: `{"key": "value"}`,
maxContentLength: 1024,
expectedResult: true,
},
{
name: "XML in request and response with correct Content-Type",
reqContentType: "application/xml",
reqBody: `<key>value</key>`,
respContentType: "application/xml",
respBody: `<key>value</key>`,
maxContentLength: 1024,
expectedResult: false,
},
{
name: "XML in request with JSON in response",
reqContentType: "application/xml",
reqBody: `<key>value</key>`,
respContentType: "application/json",
respBody: `{"key": "value"}`,
maxContentLength: 1024,
expectedResult: true,
},
{
name: "JSON in request with XML in response",
reqContentType: "application/json",
reqBody: `{"key": "value"}`,
respContentType: "application/xml",
respBody: `<key>value</key>`,
maxContentLength: 1024,
expectedResult: true,
},
{
name: "Empty request and response bodies and headers",
reqContentType: "",
reqBody: "",
respContentType: "",
respBody: "",
maxContentLength: 1024,
expectedResult: false,
},
{
name: "No content-type headers with valid JSON in request",
reqContentType: "",
reqBody: `{"key": "value"}`,
respContentType: "",
respBody: ``,
maxContentLength: 1024,
expectedResult: true,
},
{
name: "No content-type headers with valid JSON in response",
reqContentType: "",
reqBody: ``,
respContentType: "",
respBody: `{"key": "value"}`,
maxContentLength: 1024,
expectedResult: true,
},
{
name: "No content-type headers with invalid JSON in request",
reqContentType: "",
reqBody: `{"key": "value"`,
respContentType: "",
respBody: ``,
maxContentLength: 1024,
expectedResult: false,
},
{
name: "No content-type headers with invalid JSON in response",
reqContentType: "",
reqBody: ``,
respContentType: "",
respBody: `{"key": "value"`,
maxContentLength: 1024,
expectedResult: false,
},
{
name: "Content-type geo+json in request with invalid body",
reqContentType: "application/geo+json",
reqBody: ``,
respContentType: "",
respBody: ``,
maxContentLength: 1024,
expectedResult: true,
},
{
name: "No content-type headers with request payload longer than max length",
reqContentType: "",
reqBody: `{"key": "` + strings.Repeat("a", 1025) + `"}`,
respContentType: "",
respBody: ``,
maxContentLength: 1024,
expectedResult: false,
},
{
name: "No content-type headers with response payload longer than max length",
reqContentType: "",
reqBody: ``,
respContentType: "",
respBody: `{"key": "` + strings.Repeat("a", 1025) + `"}`,
maxContentLength: 1024,
expectedResult: false,
},
{
name: "No content-type headers with request payload longer than max length and response payload shorter",
reqContentType: "",
reqBody: strings.Repeat("a", 1025),
respContentType: "",
respBody: `{"key": "value"}`,
maxContentLength: 1024,
expectedResult: true,
},
{
name: "No content-type headers with request payload shorter than max length and response payload longer",
reqContentType: "",
reqBody: `{"key": "value"}`,
respContentType: "",
respBody: strings.Repeat("a", 1025),
maxContentLength: 1024,
expectedResult: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
req, err := http.NewRequest("POST", "/", strings.NewReader(tt.reqBody))
if err != nil {
t.Fatalf("Failed to create request: %v", err)
}
if tt.reqContentType != "" {
req.Header.Set("Content-Type", tt.reqContentType)
}

resp := &http.Response{
Header: make(http.Header),
Body: io.NopCloser(strings.NewReader(tt.respBody)),
ContentLength: int64(len(tt.respBody)),
}
if tt.respContentType != "" {
resp.Header.Set("Content-Type", tt.respContentType)
}

reqAndResp := httpRequestAndResponse{
request: req,
response: resp,
}

result := isJson(&reqAndResp, tt.maxContentLength)
if result != tt.expectedResult {
t.Errorf("isJson() = %v, want %v", result, tt.expectedResult)
}
})
}
}
Loading