Skip to content
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ dev: build-dev
-e FIRETAIL_KUBERNETES_SENSOR_DEV_MODE=true \
-e FIRETAIL_KUBERNETES_SENSOR_DEV_SERVER_ENABLED=true \
-e DISABLE_SERVICE_IP_FILTERING=true \
-e ENABLE_ONLY_LOG_JSON=true \
firetail/kubernetes-sensor-dev
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ POC for a FireTail Kubernetes Sensor.
| `FIRETAIL_API_TOKEN` | ✅ | `PS-02-XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX-XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX` | The API token the sensor will use to report logs to FireTail |
| `BPF_EXPRESSION` | ❌ | `tcp and (port 80 or port 443)` | The BPF filter used by the sensor. See docs for syntax info: https://www.tcpdump.org/manpages/pcap-filter.7.html |
| `DISABLE_SERVICE_IP_FILTERING` | ❌ | `true` | Disables polling Kubernetes for the IP addresses of services & subsequently ignoring all requests captured that aren't made to one of those IPs. |
| `ENABLE_ONLY_LOG_JSON` | ❌ | `true` | Enables only logging requests where the content-type implies the payload should be JSON, or the payload is valid JSON regardless of the content-type. |
| `ONLY_LOG_JSON_MAX_CONTENT_LENGTH` | ❌ | `1048576` | When `ENABLE_ONLY_LOG_JSON` is `true`, the sensor will only read request or response bodies to check if they're valid JSON if their length is less than `ONLY_LOG_JSON_MAX_CONTENT_LENGTH` bytes. |
| `FIRETAIL_API_URL` | ❌ | `https://api.logging.eu-west-1.prod.firetail.app/logs/bulk` | The API url the sensor will send logs to. Defaults to the EU region production environment. |
| `FIRETAIL_KUBERNETES_SENSOR_DEV_MODE` | ❌ | `true` | Enables debug logging when set to `true`, and reduces the max age of a log in a batch to be sent to FireTail. |
| `FIRETAIL_KUBERNETES_SENSOR_DEV_SERVER_ENABLED` | ❌ | `true` | Enables a demo web server when set to `true`; useful for sending test requests to. |
Expand Down
File renamed without changes.
49 changes: 49 additions & 0 deletions src/is_json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package main

import (
"bytes"
"encoding/json"
"io"
"mime"
"net/http"
"strings"
)

func isJson(reqAndResp *httpRequestAndResponse, maxContentLength int64) bool {
for _, headers := range []http.Header{reqAndResp.request.Header, reqAndResp.response.Header} {
contentTypeHeader := headers.Get("Content-Type")
mediaType, _, err := mime.ParseMediaType(contentTypeHeader)
if err == nil && mediaType == "application/json" {
return true
}
if strings.HasSuffix(mediaType, "+json") {
return true
}
}

if reqAndResp.request.ContentLength <= maxContentLength {
bodyBytes, err := io.ReadAll(reqAndResp.request.Body)
reqAndResp.request.Body = io.NopCloser(io.MultiReader(bytes.NewReader(bodyBytes)))
if err != nil {
return false
}
var v map[string]interface{}
if json.Unmarshal(bodyBytes, &v) == nil {
return true
}
}

if reqAndResp.response.ContentLength <= maxContentLength {
bodyBytes, err := io.ReadAll(reqAndResp.response.Body)
reqAndResp.response.Body = io.NopCloser(io.MultiReader(bytes.NewReader(bodyBytes)))
if err != nil {
return false
}
var v map[string]interface{}
if json.Unmarshal(bodyBytes, &v) == nil {
return true
}
}

return false
}
178 changes: 178 additions & 0 deletions src/is_json_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package main

import (
"io"
"net/http"
"strings"
"testing"
)

func TestIsJson(t *testing.T) {
tests := []struct {
name string
reqContentType string
reqBody string
respContentType string
respBody string
maxContentLength int64
expectedResult bool
}{
{
name: "Valid JSON in both request and response with correct content types",
reqContentType: "application/json",
reqBody: `{"key": "value"}`,
respContentType: "application/json",
respBody: `{"key": "value"}`,
maxContentLength: 1024,
expectedResult: true,
},
{
name: "XML in request and response with correct Content-Type",
reqContentType: "application/xml",
reqBody: `<key>value</key>`,
respContentType: "application/xml",
respBody: `<key>value</key>`,
maxContentLength: 1024,
expectedResult: false,
},
{
name: "XML in request with JSON in response",
reqContentType: "application/xml",
reqBody: `<key>value</key>`,
respContentType: "application/json",
respBody: `{"key": "value"}`,
maxContentLength: 1024,
expectedResult: true,
},
{
name: "JSON in request with XML in response",
reqContentType: "application/json",
reqBody: `{"key": "value"}`,
respContentType: "application/xml",
respBody: `<key>value</key>`,
maxContentLength: 1024,
expectedResult: true,
},
{
name: "Empty request and response bodies and headers",
reqContentType: "",
reqBody: "",
respContentType: "",
respBody: "",
maxContentLength: 1024,
expectedResult: false,
},
{
name: "No content-type headers with valid JSON in request",
reqContentType: "",
reqBody: `{"key": "value"}`,
respContentType: "",
respBody: ``,
maxContentLength: 1024,
expectedResult: true,
},
{
name: "No content-type headers with valid JSON in response",
reqContentType: "",
reqBody: ``,
respContentType: "",
respBody: `{"key": "value"}`,
maxContentLength: 1024,
expectedResult: true,
},
{
name: "No content-type headers with invalid JSON in request",
reqContentType: "",
reqBody: `{"key": "value"`,
respContentType: "",
respBody: ``,
maxContentLength: 1024,
expectedResult: false,
},
{
name: "No content-type headers with invalid JSON in response",
reqContentType: "",
reqBody: ``,
respContentType: "",
respBody: `{"key": "value"`,
maxContentLength: 1024,
expectedResult: false,
},
{
name: "Content-type geo+json in request with invalid body",
reqContentType: "application/geo+json",
reqBody: ``,
respContentType: "",
respBody: ``,
maxContentLength: 1024,
expectedResult: true,
},
{
name: "No content-type headers with request payload longer than max length",
reqContentType: "",
reqBody: `{"key": "` + strings.Repeat("a", 1025) + `"}`,
respContentType: "",
respBody: ``,
maxContentLength: 1024,
expectedResult: false,
},
{
name: "No content-type headers with response payload longer than max length",
reqContentType: "",
reqBody: ``,
respContentType: "",
respBody: `{"key": "` + strings.Repeat("a", 1025) + `"}`,
maxContentLength: 1024,
expectedResult: false,
},
{
name: "No content-type headers with request payload longer than max length and response payload shorter",
reqContentType: "",
reqBody: strings.Repeat("a", 1025),
respContentType: "",
respBody: `{"key": "value"}`,
maxContentLength: 1024,
expectedResult: true,
},
{
name: "No content-type headers with request payload shorter than max length and response payload longer",
reqContentType: "",
reqBody: `{"key": "value"}`,
respContentType: "",
respBody: strings.Repeat("a", 1025),
maxContentLength: 1024,
expectedResult: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
req, err := http.NewRequest("POST", "/", strings.NewReader(tt.reqBody))
if err != nil {
t.Fatalf("Failed to create request: %v", err)
}
if tt.reqContentType != "" {
req.Header.Set("Content-Type", tt.reqContentType)
}

resp := &http.Response{
Header: make(http.Header),
Body: io.NopCloser(strings.NewReader(tt.respBody)),
ContentLength: int64(len(tt.respBody)),
}
if tt.respContentType != "" {
resp.Header.Set("Content-Type", tt.respContentType)
}

reqAndResp := httpRequestAndResponse{
request: req,
response: resp,
}

result := isJson(&reqAndResp, tt.maxContentLength)
if result != tt.expectedResult {
t.Errorf("isJson() = %v, want %v", result, tt.expectedResult)
}
})
}
}
66 changes: 46 additions & 20 deletions src/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,26 @@ import (
)

func main() {
logsApiToken, logsApiTokenSet := os.LookupEnv("FIRETAIL_API_TOKEN")
if !logsApiTokenSet {
log.Fatal("FIRETAIL_API_TOKEN environment variable not set")
}

devEnabled, _ := strconv.ParseBool(os.Getenv("FIRETAIL_KUBERNETES_SENSOR_DEV_MODE"))
if devEnabled {
slog.Warn("🧰 Development mode enabled, setting log level to debug...")
slog.SetLogLoggerLevel(slog.LevelDebug)
}

logsApiToken, logsApiTokenSet := os.LookupEnv("FIRETAIL_API_TOKEN")
if !logsApiTokenSet {
log.Fatal("FIRETAIL_API_TOKEN environment variable not set")
}

var ipManager *serviceIpManager
if disableServiceIpFilter, err := strconv.ParseBool(os.Getenv("DISABLE_SERVICE_IP_FILTERING")); !(err == nil && disableServiceIpFilter) {
slog.Info(
"Service IP filter enabled, monitoring service IPs...",
)
ipManager = newServiceIpManager()
devServerEnabled, err := strconv.ParseBool(os.Getenv("FIRETAIL_KUBERNETES_SENSOR_DEV_SERVER_ENABLED"))
if err == nil && devServerEnabled {
slog.Warn("🧰 Development server enabled, starting example HTTP server...")
go func() {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Hello, %s!", r.URL.Path[1:])
})
log.Fatal(http.ListenAndServe(":80", nil))
}()
}

bpfExpression, bpfExpressionSet := os.LookupEnv("BPF_EXPRESSION")
Expand All @@ -43,15 +46,28 @@ func main() {
bpfExpression = "tcp and (port 80 or port 443)"
}

devServerEnabled, err := strconv.ParseBool(os.Getenv("FIRETAIL_KUBERNETES_SENSOR_DEV_SERVER_ENABLED"))
if err == nil && devServerEnabled {
slog.Warn("🧰 Development server enabled, starting example HTTP server...")
go func() {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Hello, %s!", r.URL.Path[1:])
})
log.Fatal(http.ListenAndServe(":80", nil))
}()
var ipManager *serviceIpManager
if disableServiceIpFilter, err := strconv.ParseBool(os.Getenv("DISABLE_SERVICE_IP_FILTERING")); !(err == nil && disableServiceIpFilter) {
slog.Info(
"Service IP filter enabled, monitoring service IPs...",
)
ipManager = newServiceIpManager()
}

var maxContentLength int64
onlyLogJson, _ := strconv.ParseBool(os.Getenv("ENABLE_ONLY_LOG_JSON"))
if onlyLogJson {
maxContentLengthStr, maxContentLengthSet := os.LookupEnv("ONLY_LOG_JSON_MAX_CONTENT_LENGTH")
if !maxContentLengthSet {
slog.Info("ONLY_LOG_JSON_MAX_CONTENT_LENGTH environment variable not set, using default: 1MiB")
maxContentLength = 1048576 // 1MiB
} else {
maxContentLength, err = strconv.ParseInt(maxContentLengthStr, 10, 64)
if err != nil {
slog.Error("Failed to parse ONLY_LOG_JSON_MAX_CONTENT_LENGTH, Defaulting to 1MiB.", "Err", err.Error())
maxContentLength = 1048576 // 1MiB
}
}
}

requestAndResponseChannel := make(chan httpRequestAndResponse, 1)
Expand Down Expand Up @@ -91,6 +107,16 @@ func main() {
)
continue
}
if onlyLogJson && !isJson(&requestAndResponse, maxContentLength) {
slog.Debug(
"Ignoring non-JSON request:",
"Src", requestAndResponse.src,
"Dst", requestAndResponse.dst,
"SrcPort", requestAndResponse.srcPort,
"DstPort", requestAndResponse.dstPort,
)
continue
}
slog.Debug(
"Captured request and response:",
"Method", requestAndResponse.request.Method,
Expand Down
File renamed without changes.
File renamed without changes.