Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
| ----------------------------------------------- | --------- | ------------------------------------------------------------ | ------------------------------------------------------------ |
| `FIRETAIL_API_TOKEN` | ✅ | `PS-02-XXXXXXXX` | The API token the sensor will use to report logs to FireTail |
| `BPF_EXPRESSION` | ❌ | `tcp and (port 80 or port 443)` | The BPF filter used by the sensor. See docs for syntax info: https://www.tcpdump.org/manpages/pcap-filter.7.html |
| `MAX_CONTENT_LENGTH` | ❌ | `1048576` | The sensor will only read request or response bodies if their length is less than `MAX_CONTENT_LENGTH` bytes. |
| `MAX_CONTENT_LENGTH` | ❌ | `1048576` | The sensor will only read requests or responses if their length is less than `MAX_CONTENT_LENGTH` bytes. |
| `ENABLE_ONLY_LOG_JSON` | ❌ | `true` | Enables only logging requests where the content-type implies the payload should be JSON, or the payload is valid JSON regardless of the content-type. |
| `DISABLE_SERVICE_IP_FILTERING` | ❌ | `true` | Disables polling Kubernetes for the IP addresses of services & subsequently ignoring all requests captured that aren't made to one of those IPs. |
| `FIRETAIL_API_URL` | ❌ | `https://api.logging.eu-west-1.prod.firetail.app/logs/bulk` | The API url the sensor will send logs to. Defaults to the EU region production environment. |
Expand Down
54 changes: 45 additions & 9 deletions src/bidirectional_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@ package main
import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"log/slog"
"net/http"
"sync"
"time"

"github.com/google/gopacket"
"github.com/google/gopacket/tcpassembly"
"github.com/google/gopacket/tcpassembly/tcpreader"
"golang.org/x/sync/semaphore"
)

type bidirectionalStreamFactory struct {
Expand All @@ -25,8 +28,22 @@ func (f *bidirectionalStreamFactory) New(netFlow, tcpFlow gopacket.Flow) tcpasse

// The second time we see the same connection, it will be from the server to the client
if conn, ok := f.conns.LoadAndDelete(fmt.Sprint(key)); ok {
slog.Debug(
"Found existing connection, assuming this is a server to client connection",
"Src", netFlow.Src().String(),
"Dst", netFlow.Dst().String(),
"SrcPort", tcpFlow.Src().String(),
"DstPort", tcpFlow.Dst().String(),
)
return &conn.(*bidirectionalStream).serverToClient
}
slog.Debug(
"Found new connection, assuming this is a client to server connection",
"Src", netFlow.Src().String(),
"Dst", netFlow.Dst().String(),
"SrcPort", tcpFlow.Src().String(),
"DstPort", tcpFlow.Dst().String(),
)

s := &bidirectionalStream{
net: netFlow,
Expand Down Expand Up @@ -57,17 +74,22 @@ type bidirectionalStream struct {

func (s *bidirectionalStream) run() {
defer s.closeCallback()
defer s.clientToServer.Close()
defer s.serverToClient.Close()

wg := sync.WaitGroup{}
wg.Add(2)
sem := semaphore.NewWeighted(2)

requestChannel := make(chan *http.Request, 1)
responseChannel := make(chan *http.Response, 1)
defer close(requestChannel)
defer close(responseChannel)

err := sem.Acquire(context.Background(), 1)
if err != nil {
slog.Error("Failed to acquire semaphore for clientToServer reader:", "Err", err.Error())
return
}
go func() {
defer wg.Done()
defer sem.Release(1)
defer close(requestChannel)
defer func() {
if r := recover(); r != nil {
slog.Error("Recovered from panic in clientToServer reader:", "Err", r)
Expand All @@ -79,7 +101,7 @@ func (s *bidirectionalStream) run() {
slog.Debug("Failed to read request bytes from stream:", "Err", err.Error(), "BytesRead", bytesRead)
return
}
request, err := http.ReadRequest(bufio.NewReader(bytes.NewReader(requestBytes)))
request, err := http.ReadRequest(bufio.NewReader(bytes.NewReader(requestBytes[:bytesRead])))
if err != nil {
slog.Debug("Failed to read request bytes:", "Err", err.Error())
return
Expand All @@ -89,8 +111,14 @@ func (s *bidirectionalStream) run() {
requestChannel <- request
}()

err = sem.Acquire(context.Background(), 1)
if err != nil {
slog.Error("Failed to acquire semaphore for serverToClient reader:", "Err", err.Error())
return
}
go func() {
defer wg.Done()
defer sem.Release(1)
defer close(responseChannel)
defer func() {
if r := recover(); r != nil {
slog.Error("Recovered from panic in serverToClient reader:", "Err", r)
Expand All @@ -102,15 +130,23 @@ func (s *bidirectionalStream) run() {
slog.Debug("Failed to read response bytes from stream:", "Err", err.Error(), "BytesRead", bytesRead)
return
}
response, err := http.ReadResponse(bufio.NewReader(bytes.NewReader(responseBytes)), nil)
response, err := http.ReadResponse(bufio.NewReader(bytes.NewReader(responseBytes[:bytesRead])), nil)
if err != nil {
slog.Debug("Failed to read response bytes:", "Err", err.Error())
return
}
responseChannel <- response
}()

wg.Wait()
// Wait for both goroutines to finish with timeout of 2 minutes
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
if err := sem.Acquire(ctx, 2); err != nil {
if err != context.DeadlineExceeded {
slog.Error("Failed to acquire semaphore for both readers:", "Err", err.Error())
}
return
}

var capturedRequest *http.Request
var capturedResponse *http.Response
Expand Down
1 change: 1 addition & 0 deletions src/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ require (
github.com/x448/float16 v0.8.4 // indirect
golang.org/x/net v0.39.0 // indirect
golang.org/x/oauth2 v0.27.0 // indirect
golang.org/x/sync v0.14.0
golang.org/x/sys v0.32.0 // indirect
golang.org/x/term v0.31.0 // indirect
golang.org/x/text v0.24.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions src/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ golang.org/x/oauth2 v0.27.0/go.mod h1:onh5ek6nERTohokkhCD/y2cV4Do3fxFHFuAejCkRWT
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ=
golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
11 changes: 10 additions & 1 deletion src/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func main() {
"Content-Length", strconv.Itoa(int(requestAndResponse.request.ContentLength)),
)
requestAndResponse.request.Header.Set("Host", requestAndResponse.request.Host)
responseRecorder := httptest.NewRecorder()
firetailMiddleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(requestAndResponse.response.StatusCode)
for key, values := range requestAndResponse.response.Header {
Expand All @@ -145,9 +146,17 @@ func main() {
}
w.Write(capturedResponseBody)
})).ServeHTTP(
httptest.NewRecorder(),
responseRecorder,
requestAndResponse.request,
)
if responseRecorder != nil {
slog.Debug(
"Response from Firetail middleware:",
"StatusCode", responseRecorder.Code,
"Header", responseRecorder.Header(),
"Body", responseRecorder.Body.String(),
)
}
default:
}
}
Expand Down
40 changes: 31 additions & 9 deletions src/request_and_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,20 @@ type httpRequestAndResponseStreamer struct {
maxBodySize int64
}

func (s *httpRequestAndResponseStreamer) start() {
func (s *httpRequestAndResponseStreamer) getHandleAndPacketsChannel() (*pcap.Handle, <-chan gopacket.Packet) {
handle, err := pcap.OpenLive("any", 1600, true, pcap.BlockForever)
if err != nil {
log.Fatal(err)
}
defer handle.Close()

err = handle.SetBPFFilter(s.bpfExpression)
if err != nil {
log.Fatal(err)
}
packetsChannel := gopacket.NewPacketSource(handle, handle.LinkType()).Packets()
return handle, packetsChannel
}

func (s *httpRequestAndResponseStreamer) start() {
assembler := tcpassembly.NewAssembler(
tcpassembly.NewStreamPool(
&bidirectionalStreamFactory{
Expand All @@ -50,11 +52,27 @@ func (s *httpRequestAndResponseStreamer) start() {
},
),
)
ticker := time.Tick(time.Minute)
packetsChannel := gopacket.NewPacketSource(handle, handle.LinkType()).Packets()

go func() {
ticker := time.Tick(time.Minute)
for {
select {
case <-ticker:
slog.Debug("Flushing old conns...")
assembler.FlushOlderThan(time.Now().Add(-2 * time.Minute))
}
}
}()

handler, packetsChannel := s.getHandleAndPacketsChannel()
for {
select {
case packet := <-packetsChannel:
case packet, ok := <-packetsChannel:
if !ok {
slog.Warn("Packet channel closed. Reinitializing...")
handler.Close()
handler, packetsChannel = s.getHandleAndPacketsChannel()
}
if packet.NetworkLayer() == nil || packet.TransportLayer() == nil {
continue
}
Expand All @@ -78,10 +96,14 @@ func (s *httpRequestAndResponseStreamer) start() {
)
continue
}
slog.Debug(
"Captured packet:",
"Src", src,
"Dst", dst,
"SrcPort", tcp.SrcPort.String(),
"DstPort", tcp.DstPort.String(),
)
assembler.AssembleWithTimestamp(packet.NetworkLayer().NetworkFlow(), tcp, packet.Metadata().Timestamp)
case <-ticker:
assembler.FlushOlderThan(time.Now().Add(-2 * time.Minute))
default:
}
}
}