Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ type buffer struct {
data []byte
}

func NewBuffer() *buffer {
// todo: bytes可以做成带缓存的channel吗?
return &buffer{bytes: make(chan []byte)}
}

func (b *buffer) Read(p []byte) (int, error) {
ok := true
for ok && len(b.data) == 0 {
Expand Down
43 changes: 33 additions & 10 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log"
"net/http"
"net/http/httputil"
"sync"

"github.com/google/gopacket"
"github.com/google/gopacket/layers"
Expand All @@ -14,13 +15,29 @@ import (
)

type tcpStream struct {
c2sBuf *buffer
s2cBuf *buffer
factoryWg *sync.WaitGroup
// protocol tcpStream挟带数据的上层协议类型
protocol string
// isDetect 已经确定该Stream的协议类型
isDetect bool
c2sBuf *buffer
s2cBuf *buffer
}

func (s *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassembly.TCPFlowDirection, nextSeq reassembly.Sequence, start *bool, ac reassembly.AssemblerContext) bool {
// todo: 我们可以在这里检测tcp挟带的应用层数据
// Your code here...
if *start {
return true // Important! First SYN packet must be accepted.
}
// 当我们检测到应用层协议后,创建消费者进行消费。
if !s.isDetect {
s.protocol = guessProtocol(tcp.Payload)
if s.protocol == UnknownType {
return false // drop it.
}
s.isDetect = true
s.factoryWg.Add(1)
go s.consume()
}
return true
}

Expand All @@ -37,20 +54,26 @@ func (s *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
}
}

// ReassemblyComplete will be called when stream receive two endpoint FIN packet.
func (s *tcpStream) ReassemblyComplete(ac reassembly.AssemblerContext) bool {
close(s.c2sBuf.bytes)
close(s.s2cBuf.bytes)
// do not remove the connection to allow last ACK
return false
return true
}

// consume 消费两个缓存中的数据进行下一步处理
func (s *tcpStream) consume() {
c2sReader := bufio.NewReader(s.c2sBuf)
s2cReader := bufio.NewReader(s.s2cBuf)
// todo: 这里等待stream检测出应用层类型后才能开始正确消费流量
// 如你所见,目前默认为http数据,以后我们还想支持grpc的http2和thrift,kafka,redis...
defer s.factoryWg.Done()

switch s.protocol {
case HttpType:
handleHttp(s.c2sBuf, s.s2cBuf)
}
}

func handleHttp(c2s, s2c io.Reader) {
c2sReader := bufio.NewReader(c2s)
s2cReader := bufio.NewReader(s2c)
for {
req, err := http.ReadRequest(c2sReader)
if err == io.EOF || err == io.ErrUnexpectedEOF {
Expand Down
15 changes: 6 additions & 9 deletions stream_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,18 @@ import (
"github.com/google/gopacket/reassembly"
)

// tcpStreamFactory 创建新的 tcpStream,并创建消费者消费数据
// tcpStreamFactory 创建新的 tcpStream
type tcpStreamFactory struct {
wg sync.WaitGroup
}

func (f *tcpStreamFactory) New(netFlow, tcpFlow gopacket.Flow, tcp *layers.TCP, ac reassembly.AssemblerContext) reassembly.Stream {
s := &tcpStream{}
s.c2sBuf = &buffer{
bytes: make(chan []byte),
s := &tcpStream{
factoryWg: &f.wg,
isDetect: false,
c2sBuf: NewBuffer(),
s2cBuf: NewBuffer(),
}
s.s2cBuf = &buffer{
bytes: make(chan []byte),
}
f.wg.Add(1)
go s.consume()
return s
}

Expand Down