From e0bfcb5460c813bf4e5e696cb4747bb5da31fda9 Mon Sep 17 00:00:00 2001 From: peanutzhen Date: Sat, 5 Feb 2022 00:18:10 +0800 Subject: [PATCH] add code skeleton to support multiple application layer data. --- buffer.go | 5 +++++ stream.go | 43 +++++++++++++++++++++++++++++++++---------- stream_factory.go | 15 ++++++--------- 3 files changed, 44 insertions(+), 19 deletions(-) diff --git a/buffer.go b/buffer.go index ad521c4..68cbf0d 100644 --- a/buffer.go +++ b/buffer.go @@ -7,6 +7,11 @@ type buffer struct { data []byte } +func NewBuffer() *buffer { + // todo: bytes可以做成带缓存的channel吗? + return &buffer{bytes: make(chan []byte)} +} + func (b *buffer) Read(p []byte) (int, error) { ok := true for ok && len(b.data) == 0 { diff --git a/stream.go b/stream.go index b3df982..f2820b1 100644 --- a/stream.go +++ b/stream.go @@ -6,6 +6,7 @@ import ( "log" "net/http" "net/http/httputil" + "sync" "github.com/google/gopacket" "github.com/google/gopacket/layers" @@ -14,13 +15,29 @@ import ( ) type tcpStream struct { - c2sBuf *buffer - s2cBuf *buffer + factoryWg *sync.WaitGroup + // protocol tcpStream挟带数据的上层协议类型 + protocol string + // isDetect 已经确定该Stream的协议类型 + isDetect bool + c2sBuf *buffer + s2cBuf *buffer } func (s *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassembly.TCPFlowDirection, nextSeq reassembly.Sequence, start *bool, ac reassembly.AssemblerContext) bool { - // todo: 我们可以在这里检测tcp挟带的应用层数据 - // Your code here... + if *start { + return true // Important! First SYN packet must be accepted. + } + // 当我们检测到应用层协议后,创建消费者进行消费。 + if !s.isDetect { + s.protocol = guessProtocol(tcp.Payload) + if s.protocol == UnknownType { + return false // drop it. + } + s.isDetect = true + s.factoryWg.Add(1) + go s.consume() + } return true } @@ -37,20 +54,26 @@ func (s *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass } } +// ReassemblyComplete will be called when stream receive two endpoint FIN packet. func (s *tcpStream) ReassemblyComplete(ac reassembly.AssemblerContext) bool { close(s.c2sBuf.bytes) close(s.s2cBuf.bytes) - // do not remove the connection to allow last ACK - return false + return true } // consume 消费两个缓存中的数据进行下一步处理 func (s *tcpStream) consume() { - c2sReader := bufio.NewReader(s.c2sBuf) - s2cReader := bufio.NewReader(s.s2cBuf) - // todo: 这里等待stream检测出应用层类型后才能开始正确消费流量 - // 如你所见,目前默认为http数据,以后我们还想支持grpc的http2和thrift,kafka,redis... + defer s.factoryWg.Done() + + switch s.protocol { + case HttpType: + handleHttp(s.c2sBuf, s.s2cBuf) + } +} +func handleHttp(c2s, s2c io.Reader) { + c2sReader := bufio.NewReader(c2s) + s2cReader := bufio.NewReader(s2c) for { req, err := http.ReadRequest(c2sReader) if err == io.EOF || err == io.ErrUnexpectedEOF { diff --git a/stream_factory.go b/stream_factory.go index 965f842..a8b0864 100644 --- a/stream_factory.go +++ b/stream_factory.go @@ -8,21 +8,18 @@ import ( "github.com/google/gopacket/reassembly" ) -// tcpStreamFactory 创建新的 tcpStream,并创建消费者消费数据 +// tcpStreamFactory 创建新的 tcpStream type tcpStreamFactory struct { wg sync.WaitGroup } func (f *tcpStreamFactory) New(netFlow, tcpFlow gopacket.Flow, tcp *layers.TCP, ac reassembly.AssemblerContext) reassembly.Stream { - s := &tcpStream{} - s.c2sBuf = &buffer{ - bytes: make(chan []byte), + s := &tcpStream{ + factoryWg: &f.wg, + isDetect: false, + c2sBuf: NewBuffer(), + s2cBuf: NewBuffer(), } - s.s2cBuf = &buffer{ - bytes: make(chan []byte), - } - f.wg.Add(1) - go s.consume() return s }