Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ POC for a FireTail Kubernetes Sensor.
| ----------------------------------------------- | --------- | ------------------------------------------------------------ | ------------------------------------------------------------ |
| `FIRETAIL_API_TOKEN` | ✅ | `PS-02-XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX-XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX` | The API token the sensor will use to report logs to FireTail |
| `BPF_EXPRESSION` | ❌ | `tcp and (port 80 or port 443)` | The BPF filter used by the sensor. See docs for syntax info: https://www.tcpdump.org/manpages/pcap-filter.7.html |
| `DISABLE_SERVICE_IP_FILTERING` | ❌ | `true` | Disables polling Kubernetes for the IP addresses of services & subsequently ignoring all requests captured that aren't made to one of those IPs. |
| `MAX_CONTENT_LENGTH` | | `1048576` | The sensor will only read request or response bodies if their length is less than `MAX_CONTENT_LENGTH` bytes. |
| `ENABLE_ONLY_LOG_JSON` | ❌ | `true` | Enables only logging requests where the content-type implies the payload should be JSON, or the payload is valid JSON regardless of the content-type. |
| `ONLY_LOG_JSON_MAX_CONTENT_LENGTH` | ❌ | `1048576` | When `ENABLE_ONLY_LOG_JSON` is `true`, the sensor will only read request or response bodies to check if they're valid JSON if their length is less than `ONLY_LOG_JSON_MAX_CONTENT_LENGTH` bytes. |
| `DISABLE_SERVICE_IP_FILTERING` | ❌ | `true` | Disables polling Kubernetes for the IP addresses of services & subsequently ignoring all requests captured that aren't made to one of those IPs. |
| `FIRETAIL_API_URL` | ❌ | `https://api.logging.eu-west-1.prod.firetail.app/logs/bulk` | The API url the sensor will send logs to. Defaults to the EU region production environment. |
| `FIRETAIL_KUBERNETES_SENSOR_DEV_MODE` | ❌ | `true` | Enables debug logging when set to `true`, and reduces the max age of a log in a batch to be sent to FireTail. |
| `FIRETAIL_KUBERNETES_SENSOR_DEV_SERVER_ENABLED` | ❌ | `true` | Enables a demo web server when set to `true`; useful for sending test requests to. |
Expand Down
3 changes: 2 additions & 1 deletion build_setup/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
FROM golang:1.24-bullseye
WORKDIR /src
RUN apt-get update && apt-get install -y --no-install-recommends libpcap-dev
COPY ./src/ ./
COPY ./src/go.* ./
RUN go mod download
COPY ./src/ ./
RUN go build -o /dist/main .
RUN rm -rf /src/*
RUN chmod +x /dist/main
Expand Down
51 changes: 33 additions & 18 deletions src/bidirectional_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,17 @@ import (
)

type bidirectionalStreamFactory struct {
conns map[string]*bidirectionalStream
conns *sync.Map
requestAndResponseChannel *chan httpRequestAndResponse
maxBodySize int64
}

func (f *bidirectionalStreamFactory) New(netFlow, tcpFlow gopacket.Flow) tcpassembly.Stream {
key := netFlow.FastHash() ^ tcpFlow.FastHash()

// The second time we see the same connection, it will be from the server to the client
if conn, ok := f.conns[fmt.Sprint(key)]; ok {
return &conn.serverToClient
if conn, ok := f.conns.Load(fmt.Sprint(key)); ok {
return &conn.(*bidirectionalStream).serverToClient
}

s := &bidirectionalStream{
Expand All @@ -33,8 +34,12 @@ func (f *bidirectionalStreamFactory) New(netFlow, tcpFlow gopacket.Flow) tcpasse
clientToServer: tcpreader.NewReaderStream(),
serverToClient: tcpreader.NewReaderStream(),
requestAndResponseChannel: f.requestAndResponseChannel,
closeCallback: func() {
f.conns.Delete(fmt.Sprint(key))
},
maxBodySize: f.maxBodySize,
}
f.conns[fmt.Sprint(key)] = s
f.conns.Store(fmt.Sprint(key), s)
go s.run()

// The first time we see the connection, it will be from the client to the server
Expand All @@ -46,9 +51,13 @@ type bidirectionalStream struct {
clientToServer tcpreader.ReaderStream
serverToClient tcpreader.ReaderStream
requestAndResponseChannel *chan httpRequestAndResponse
closeCallback func()
maxBodySize int64
}

func (s *bidirectionalStream) run() {
defer s.closeCallback()

wg := sync.WaitGroup{}
wg.Add(2)

Expand All @@ -58,11 +67,12 @@ func (s *bidirectionalStream) run() {
defer close(responseChannel)

go func() {
defer wg.Done()
defer s.clientToServer.Close()
defer func() {
if r := recover(); r != nil {
slog.Warn("Recovered from panic in clientToServer reader:", "Err", r)
slog.Error("Recovered from panic in clientToServer reader:", "Err", r)
}
wg.Done()
}()
reader := bufio.NewReader(&s.clientToServer)
for {
Expand All @@ -74,23 +84,26 @@ func (s *bidirectionalStream) run() {
}
// RemoteAddr is not filled in by ReadRequest so we have to populate it ourselves
request.RemoteAddr = fmt.Sprintf("%s:%s", s.net.Src().String(), s.transport.Src().String())
responseBody := make([]byte, request.ContentLength)
if request.ContentLength > 0 {
io.ReadFull(request.Body, responseBody)
if request.ContentLength > 0 && request.ContentLength < s.maxBodySize {
responseBody := make([]byte, request.ContentLength)
if request.ContentLength > 0 {
io.ReadFull(request.Body, responseBody)
}
request.Body.Close()
request.Body = io.NopCloser(bytes.NewReader(responseBody))
}
request.Body.Close()
request.Body = io.NopCloser(bytes.NewReader(responseBody))
requestChannel <- request

}
}()

go func() {
defer wg.Done()
defer s.serverToClient.Close()
defer func() {
if r := recover(); r != nil {
slog.Warn("Recovered from panic in serverToClient reader:", "Err", r)
slog.Error("Recovered from panic in serverToClient reader:", "Err", r)
}
wg.Done()
}()
reader := bufio.NewReader(&s.serverToClient)
for {
Expand All @@ -100,12 +113,14 @@ func (s *bidirectionalStream) run() {
} else if err != nil {
continue
}
responseBody := make([]byte, response.ContentLength)
if response.ContentLength > 0 {
io.ReadFull(response.Body, responseBody)
if response.ContentLength > 0 && response.ContentLength < s.maxBodySize {
responseBody := make([]byte, response.ContentLength)
if response.ContentLength > 0 {
io.ReadFull(response.Body, responseBody)
}
response.Body.Close()
response.Body = io.NopCloser(bytes.NewReader(responseBody))
}
response.Body.Close()
response.Body = io.NopCloser(bytes.NewReader(responseBody))
responseChannel <- response
}
}()
Expand Down
23 changes: 10 additions & 13 deletions src/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,26 +55,23 @@ func main() {
}

var maxContentLength int64
onlyLogJson, _ := strconv.ParseBool(os.Getenv("ENABLE_ONLY_LOG_JSON"))
if onlyLogJson {
maxContentLengthStr, maxContentLengthSet := os.LookupEnv("ONLY_LOG_JSON_MAX_CONTENT_LENGTH")
if !maxContentLengthSet {
slog.Info("ONLY_LOG_JSON_MAX_CONTENT_LENGTH environment variable not set, using default: 1MiB")
maxContentLength = 1048576 // 1MiB
} else {
maxContentLength, err = strconv.ParseInt(maxContentLengthStr, 10, 64)
if err != nil {
slog.Error("Failed to parse ONLY_LOG_JSON_MAX_CONTENT_LENGTH, Defaulting to 1MiB.", "Err", err.Error())
maxContentLength = 1048576 // 1MiB
}
}
maxContentLengthStr, maxContentLengthSet := os.LookupEnv("MAX_CONTENT_LENGTH")
if !maxContentLengthSet {
slog.Info("MAX_CONTENT_LENGTH environment variable not set, using default: 1MiB")
maxContentLength = 1048576 // 1MiB
} else if maxContentLength, err = strconv.ParseInt(maxContentLengthStr, 10, 64); err != nil {
slog.Error("Failed to parse MAX_CONTENT_LENGTH, Defaulting to 1MiB.", "Err", err.Error())
maxContentLength = 1048576 // 1MiB
}

onlyLogJson, _ := strconv.ParseBool(os.Getenv("ENABLE_ONLY_LOG_JSON"))

requestAndResponseChannel := make(chan httpRequestAndResponse, 1)
httpRequestStreamer := &httpRequestAndResponseStreamer{
bpfExpression: bpfExpression,
requestAndResponseChannel: &requestAndResponseChannel,
ipManager: ipManager,
maxBodySize: maxContentLength,
}
go httpRequestStreamer.start()

Expand Down
5 changes: 4 additions & 1 deletion src/request_and_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"log"
"log/slog"
"net/http"
"sync"
"time"

"github.com/google/gopacket"
Expand All @@ -25,6 +26,7 @@ type httpRequestAndResponseStreamer struct {
bpfExpression string
requestAndResponseChannel *chan httpRequestAndResponse
ipManager *serviceIpManager
maxBodySize int64
}

func (s *httpRequestAndResponseStreamer) start() {
Expand All @@ -42,8 +44,9 @@ func (s *httpRequestAndResponseStreamer) start() {
assembler := tcpassembly.NewAssembler(
tcpassembly.NewStreamPool(
&bidirectionalStreamFactory{
conns: make(map[string]*bidirectionalStream),
conns: &sync.Map{},
requestAndResponseChannel: s.requestAndResponseChannel,
maxBodySize: s.maxBodySize,
},
),
)
Expand Down