From c9f5e5533731660b9a80494f4e6893b332c7ed85 Mon Sep 17 00:00:00 2001 From: peanutzhen Date: Fri, 4 Feb 2022 00:17:59 +0800 Subject: [PATCH] first commit --- .gitignore | 9 +++++ Makefile | 26 ++++++++++++++ buffer.go | 22 ++++++++++++ config.go | 48 ++++++++++++++++++++++++++ const.go | 13 +++++++ go.mod | 10 ++++++ go.sum | 26 ++++++++++++++ guess.go | 35 +++++++++++++++++++ main.go | 82 ++++++++++++++++++++++++++++++++++++++++++++ stream.go | 86 +++++++++++++++++++++++++++++++++++++++++++++++ stream_factory.go | 31 +++++++++++++++++ util.go | 37 ++++++++++++++++++++ 12 files changed, 425 insertions(+) create mode 100644 Makefile create mode 100644 buffer.go create mode 100644 config.go create mode 100644 const.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 guess.go create mode 100644 main.go create mode 100644 stream.go create mode 100644 stream_factory.go create mode 100644 util.go diff --git a/.gitignore b/.gitignore index 66fd13c..b26e5ec 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,13 @@ +# MacOS +.DS_Store + +# IDE +.vscode/ +.idea/ + # Binaries for programs and plugins +bubblecopy +output/ *.exe *.exe~ *.dll diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..8c09dd1 --- /dev/null +++ b/Makefile @@ -0,0 +1,26 @@ +# DEBUG_SETTINGS is used for local test easily. +define DEBUG_SETTINGS +{ + "interface": "lo0", + "service_port": "8080" +} +endef +export DEBUG_SETTINGS + +# todo: cross-compile for linux and windows to releases. +build: clean + @mkdir -p output + @go build -o output/bubblecopy + +run: build + @cd output && sudo ./bubblecopy + +run-debug: build + @echo "loading debug settings..." + @touch output/settings.json + @echo "$$DEBUG_SETTINGS" > output/settings.json + @cd output && sudo ./bubblecopy -debug + +clean: + @echo "clean output directory..." + @rm -rf output/ \ No newline at end of file diff --git a/buffer.go b/buffer.go new file mode 100644 index 0000000..ad521c4 --- /dev/null +++ b/buffer.go @@ -0,0 +1,22 @@ +package main + +import "io" + +type buffer struct { + bytes chan []byte + data []byte +} + +func (b *buffer) Read(p []byte) (int, error) { + ok := true + for ok && len(b.data) == 0 { + b.data, ok = <-b.bytes + } + if !ok || len(b.data) == 0 { + return 0, io.EOF + } + + l := copy(p, b.data) + b.data = b.data[l:] + return l, nil +} diff --git a/config.go b/config.go new file mode 100644 index 0000000..b85fe5b --- /dev/null +++ b/config.go @@ -0,0 +1,48 @@ +package main + +import ( + "encoding/json" + "fmt" + "os" + + "github.com/sirupsen/logrus" +) + +type config struct { + // Taskid Diff任务ID + Taskid int `json:"taskid"` + // Secret 访问对应id任务配置的密钥 + Secret string `json:"secret"` + // Device 网卡名称 + Device string `json:"interface"` + // Port 被测服务端口 + Port string `json:"service_port"` + + // DeviceIPv4 网卡ipv4地址 + DeviceIPv4 string +} + +var configuration = config{} + +func (c *config) init() { + bytes, err := os.ReadFile(SettingsFilePath) + if err != nil { + logrus.Error(err) + logrus.Fatal("Need settings.json to get configuration.") + } + err = json.Unmarshal(bytes, &configuration) + if err != nil { + logrus.Fatal(err) + } + + c.DeviceIPv4, err = getDeviceIpv4(c.Device) + if err != nil { + logrus.Error(err) + logrus.Fatalf("%s: this device has no ipv4 address.", c.Device) + } + + logrus.WithField( + "configuration", + fmt.Sprintf("%+v", configuration), + ).Debug("configuration initialized.") +} diff --git a/const.go b/const.go new file mode 100644 index 0000000..6f68982 --- /dev/null +++ b/const.go @@ -0,0 +1,13 @@ +package main + +// 参数常量 +const ( + SnapshotLen int32 = 65536 * 4 + SettingsFilePath = "./settings.json" +) + +// 应用层协议类型 +const ( + UnknownType = "unknown" + HttpType = "http" +) diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..ebbe31a --- /dev/null +++ b/go.mod @@ -0,0 +1,10 @@ +module github.com/bubble-diff/bubblecopy + +go 1.17 + +require ( + github.com/google/gopacket v1.1.19 + github.com/sirupsen/logrus v1.8.1 +) + +require golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27 // indirect diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..964afff --- /dev/null +++ b/go.sum @@ -0,0 +1,26 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8= +github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= +github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= +github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= +golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27 h1:XDXtA5hveEEV8JB2l7nhMTp3t3cHp9ZpwcdjqyEWLlo= +golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/guess.go b/guess.go new file mode 100644 index 0000000..60b0d6a --- /dev/null +++ b/guess.go @@ -0,0 +1,35 @@ +package main + +import ( + "bytes" +) + +var httpMethods = map[string]bool{ + "GET": true, "POST": true, "PUT": true, "DELETE": true, "HEAD": true, + "TRACE": true, "OPTIONS": true, "PATCH": true, +} + +// guessProtocol 根据TCP数据判断其协议类型 +func guessProtocol(payload []byte) (protocol string) { + if isHttpRequestData(payload) { + return HttpType + } + return UnknownType +} + +// isHttpRequestData 判断是否符合HTTP/1.x +func isHttpRequestData(payload []byte) bool { + // see https://stackoverflow.com/questions/25047905/http-request-minimum-size-in-bytes + if len(payload) < 26 { + return false + } + idx := bytes.IndexByte(payload, byte(' ')) + if idx < 0 { + return false + } + method := string(payload[:idx]) + if ok := httpMethods[method]; !ok { + return false + } + return true +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..34812ae --- /dev/null +++ b/main.go @@ -0,0 +1,82 @@ +package main + +import ( + "flag" + "fmt" + "os" + "time" + + "github.com/google/gopacket" + "github.com/google/gopacket/layers" + "github.com/google/gopacket/pcap" + "github.com/google/gopacket/reassembly" + "github.com/sirupsen/logrus" +) + +var debugmode bool + +func init() { + flag.BoolVar(&debugmode, "debug", false, "Run as debug mode, read settings file to override task configuration if existsed.") + flag.Parse() + + logrus.SetLevel(logrus.InfoLevel) + logrus.SetOutput(os.Stdout) + logrus.SetFormatter(&logrus.TextFormatter{ + FullTimestamp: true, + TimestampFormat: "2006-01-02 15:04:05", + CallerPrettyfier: callerPrettyfier, + }) + if debugmode { + logrus.SetLevel(logrus.DebugLevel) + logrus.SetReportCaller(true) + logrus.Infof("<---------debug mode--------->") + } + + configuration.init() +} + +func main() { + handle, err := pcap.OpenLive(configuration.Device, SnapshotLen, false, pcap.BlockForever) + defer handle.Close() + if err != nil { + logrus.Error(err) + logrus.Fatal("Try sudo.") + } + + // 过滤出当前服务的流量 + filter := fmt.Sprintf( + "(src port %s and src host %s) or (dst port %s and dst host %s)", + configuration.Port, configuration.DeviceIPv4, + configuration.Port, configuration.DeviceIPv4, + ) + logrus.Debugf("Set bpf filter as: %s", filter) + if err := handle.SetBPFFilter(filter); err != nil { + logrus.Fatal(err) + } + + source := gopacket.NewPacketSource(handle, handle.LinkType()) + source.NoCopy = true + + streamFactory := &tcpStreamFactory{} + streamPool := reassembly.NewStreamPool(streamFactory) + assembler := reassembly.NewAssembler(streamPool) + + ticker := time.NewTicker(time.Second * 30) + defer ticker.Stop() + for { + // todo: 等待Diff任务启动,若未启动,请勿进行抓包消耗CPU + // your code here... + + select { + case <-ticker.C: + // 停止监听30秒内无数据传输的连接 + assembler.FlushCloseOlderThan(time.Now().Add(time.Second * -30)) + case packet := <-source.Packets(): + tcp := packet.Layer(layers.LayerTypeTCP) + if tcp != nil { + tcp := tcp.(*layers.TCP) + assembler.Assemble(packet.NetworkLayer().NetworkFlow(), tcp) + } + } + } +} diff --git a/stream.go b/stream.go new file mode 100644 index 0000000..b3df982 --- /dev/null +++ b/stream.go @@ -0,0 +1,86 @@ +package main + +import ( + "bufio" + "io" + "log" + "net/http" + "net/http/httputil" + + "github.com/google/gopacket" + "github.com/google/gopacket/layers" + "github.com/google/gopacket/reassembly" + "github.com/sirupsen/logrus" +) + +type tcpStream struct { + c2sBuf *buffer + s2cBuf *buffer +} + +func (s *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassembly.TCPFlowDirection, nextSeq reassembly.Sequence, start *bool, ac reassembly.AssemblerContext) bool { + // todo: 我们可以在这里检测tcp挟带的应用层数据 + // Your code here... + return true +} + +func (s *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.AssemblerContext) { + dir, _, _, _ := sg.Info() + l, _ := sg.Lengths() + data := sg.Fetch(l) + if l > 0 { + if dir == reassembly.TCPDirClientToServer { + s.c2sBuf.bytes <- data + } else { + s.s2cBuf.bytes <- data + } + } +} + +func (s *tcpStream) ReassemblyComplete(ac reassembly.AssemblerContext) bool { + close(s.c2sBuf.bytes) + close(s.s2cBuf.bytes) + // do not remove the connection to allow last ACK + return false +} + +// consume 消费两个缓存中的数据进行下一步处理 +func (s *tcpStream) consume() { + c2sReader := bufio.NewReader(s.c2sBuf) + s2cReader := bufio.NewReader(s.s2cBuf) + // todo: 这里等待stream检测出应用层类型后才能开始正确消费流量 + // 如你所见,目前默认为http数据,以后我们还想支持grpc的http2和thrift,kafka,redis... + + for { + req, err := http.ReadRequest(c2sReader) + if err == io.EOF || err == io.ErrUnexpectedEOF { + break + } else if err != nil { + log.Println(err) + continue + } + resp, err := http.ReadResponse(s2cReader, nil) + if err == io.EOF || err == io.ErrUnexpectedEOF { + break + } else if err != nil { + log.Println(err) + continue + } + + // todo: 我们目前只是将http req/resp以日志的形式打印下来 + // 后期我们在这里需要添加过滤http流量,以及转发至replayer的能力 + bytes, err := httputil.DumpRequest(req, true) + if err != nil { + log.Println(err) + } + req.Body.Close() + logrus.Debug(string(bytes)) + + body, err := io.ReadAll(resp.Body) + if err != nil { + log.Println(err) + } + resp.Body.Close() + logrus.Debug(string(body)) + } +} diff --git a/stream_factory.go b/stream_factory.go new file mode 100644 index 0000000..965f842 --- /dev/null +++ b/stream_factory.go @@ -0,0 +1,31 @@ +package main + +import ( + "sync" + + "github.com/google/gopacket" + "github.com/google/gopacket/layers" + "github.com/google/gopacket/reassembly" +) + +// tcpStreamFactory 创建新的 tcpStream,并创建消费者消费数据 +type tcpStreamFactory struct { + wg sync.WaitGroup +} + +func (f *tcpStreamFactory) New(netFlow, tcpFlow gopacket.Flow, tcp *layers.TCP, ac reassembly.AssemblerContext) reassembly.Stream { + s := &tcpStream{} + s.c2sBuf = &buffer{ + bytes: make(chan []byte), + } + s.s2cBuf = &buffer{ + bytes: make(chan []byte), + } + f.wg.Add(1) + go s.consume() + return s +} + +func (f *tcpStreamFactory) WaitConsumers() { + f.wg.Wait() +} diff --git a/util.go b/util.go new file mode 100644 index 0000000..145eae0 --- /dev/null +++ b/util.go @@ -0,0 +1,37 @@ +package main + +import ( + "errors" + "fmt" + "path" + "runtime" + "strings" + + "github.com/google/gopacket/pcap" +) + +// getDeviceIpv4 获取网卡的ipv4地址,如果有的话。 +func getDeviceIpv4(deviceName string) (ipv4 string, err error) { + devices, err := pcap.FindAllDevs() + if err != nil { + return "", err + } + + for _, device := range devices { + if device.Name == deviceName { + for _, address := range device.Addresses { + if strings.IndexByte(address.IP.String(), '.') != -1 { + return address.IP.String(), nil + } + } + return "", errors.New(deviceName + ": no ipv4 for this interface") + } + } + return "", errors.New(deviceName + ": no such device") +} + +// callerPrettyfier 只显示文件名和行号。 +func callerPrettyfier(frame *runtime.Frame) (function string, file string) { + fileName := path.Base(frame.File) + return "", fmt.Sprintf("%s:%d", fileName, frame.Line) +}