Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
# MacOS
.DS_Store

# IDE
.vscode/
.idea/

# Binaries for programs and plugins
bubblecopy
output/
*.exe
*.exe~
*.dll
Expand Down
26 changes: 26 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# DEBUG_SETTINGS is used for local test easily.
define DEBUG_SETTINGS
{
"interface": "lo0",
"service_port": "8080"
}
endef
export DEBUG_SETTINGS

# todo: cross-compile for linux and windows to releases.
build: clean
@mkdir -p output
@go build -o output/bubblecopy

run: build
@cd output && sudo ./bubblecopy

run-debug: build
@echo "loading debug settings..."
@touch output/settings.json
@echo "$$DEBUG_SETTINGS" > output/settings.json
@cd output && sudo ./bubblecopy -debug

clean:
@echo "clean output directory..."
@rm -rf output/
22 changes: 22 additions & 0 deletions buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package main

import "io"

type buffer struct {
bytes chan []byte
data []byte
}

func (b *buffer) Read(p []byte) (int, error) {
ok := true
for ok && len(b.data) == 0 {
b.data, ok = <-b.bytes
}
if !ok || len(b.data) == 0 {
return 0, io.EOF
}

l := copy(p, b.data)
b.data = b.data[l:]
return l, nil
}
48 changes: 48 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package main

import (
"encoding/json"
"fmt"
"os"

"github.com/sirupsen/logrus"
)

type config struct {
// Taskid Diff任务ID
Taskid int `json:"taskid"`
// Secret 访问对应id任务配置的密钥
Secret string `json:"secret"`
// Device 网卡名称
Device string `json:"interface"`
// Port 被测服务端口
Port string `json:"service_port"`

// DeviceIPv4 网卡ipv4地址
DeviceIPv4 string
}

var configuration = config{}

func (c *config) init() {
bytes, err := os.ReadFile(SettingsFilePath)
if err != nil {
logrus.Error(err)
logrus.Fatal("Need settings.json to get configuration.")
}
err = json.Unmarshal(bytes, &configuration)
if err != nil {
logrus.Fatal(err)
}

c.DeviceIPv4, err = getDeviceIpv4(c.Device)
if err != nil {
logrus.Error(err)
logrus.Fatalf("%s: this device has no ipv4 address.", c.Device)
}

logrus.WithField(
"configuration",
fmt.Sprintf("%+v", configuration),
).Debug("configuration initialized.")
}
13 changes: 13 additions & 0 deletions const.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package main

// 参数常量
const (
SnapshotLen int32 = 65536 * 4
SettingsFilePath = "./settings.json"
)

// 应用层协议类型
const (
UnknownType = "unknown"
HttpType = "http"
)
10 changes: 10 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
module github.com/bubble-diff/bubblecopy

go 1.17

require (
github.com/google/gopacket v1.1.19
github.com/sirupsen/logrus v1.8.1
)

require golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27 // indirect
26 changes: 26 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8=
github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27 h1:XDXtA5hveEEV8JB2l7nhMTp3t3cHp9ZpwcdjqyEWLlo=
golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
35 changes: 35 additions & 0 deletions guess.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package main

import (
"bytes"
)

var httpMethods = map[string]bool{
"GET": true, "POST": true, "PUT": true, "DELETE": true, "HEAD": true,
"TRACE": true, "OPTIONS": true, "PATCH": true,
}

// guessProtocol 根据TCP数据判断其协议类型
func guessProtocol(payload []byte) (protocol string) {
if isHttpRequestData(payload) {
return HttpType
}
return UnknownType
}

// isHttpRequestData 判断是否符合HTTP/1.x
func isHttpRequestData(payload []byte) bool {
// see https://stackoverflow.com/questions/25047905/http-request-minimum-size-in-bytes
if len(payload) < 26 {
return false
}
idx := bytes.IndexByte(payload, byte(' '))
if idx < 0 {
return false
}
method := string(payload[:idx])
if ok := httpMethods[method]; !ok {
return false
}
return true
}
82 changes: 82 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package main

import (
"flag"
"fmt"
"os"
"time"

"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcap"
"github.com/google/gopacket/reassembly"
"github.com/sirupsen/logrus"
)

var debugmode bool

func init() {
flag.BoolVar(&debugmode, "debug", false, "Run as debug mode, read settings file to override task configuration if existsed.")
flag.Parse()

logrus.SetLevel(logrus.InfoLevel)
logrus.SetOutput(os.Stdout)
logrus.SetFormatter(&logrus.TextFormatter{
FullTimestamp: true,
TimestampFormat: "2006-01-02 15:04:05",
CallerPrettyfier: callerPrettyfier,
})
if debugmode {
logrus.SetLevel(logrus.DebugLevel)
logrus.SetReportCaller(true)
logrus.Infof("<---------debug mode--------->")
}

configuration.init()
}

func main() {
handle, err := pcap.OpenLive(configuration.Device, SnapshotLen, false, pcap.BlockForever)
defer handle.Close()
if err != nil {
logrus.Error(err)
logrus.Fatal("Try sudo.")
}

// 过滤出当前服务的流量
filter := fmt.Sprintf(
"(src port %s and src host %s) or (dst port %s and dst host %s)",
configuration.Port, configuration.DeviceIPv4,
configuration.Port, configuration.DeviceIPv4,
)
logrus.Debugf("Set bpf filter as: %s", filter)
if err := handle.SetBPFFilter(filter); err != nil {
logrus.Fatal(err)
}

source := gopacket.NewPacketSource(handle, handle.LinkType())
source.NoCopy = true

streamFactory := &tcpStreamFactory{}
streamPool := reassembly.NewStreamPool(streamFactory)
assembler := reassembly.NewAssembler(streamPool)

ticker := time.NewTicker(time.Second * 30)
defer ticker.Stop()
for {
// todo: 等待Diff任务启动,若未启动,请勿进行抓包消耗CPU
// your code here...

select {
case <-ticker.C:
// 停止监听30秒内无数据传输的连接
assembler.FlushCloseOlderThan(time.Now().Add(time.Second * -30))
case packet := <-source.Packets():
tcp := packet.Layer(layers.LayerTypeTCP)
if tcp != nil {
tcp := tcp.(*layers.TCP)
assembler.Assemble(packet.NetworkLayer().NetworkFlow(), tcp)
}
}
}
}
86 changes: 86 additions & 0 deletions stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package main

import (
"bufio"
"io"
"log"
"net/http"
"net/http/httputil"

"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/reassembly"
"github.com/sirupsen/logrus"
)

type tcpStream struct {
c2sBuf *buffer
s2cBuf *buffer
}

func (s *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassembly.TCPFlowDirection, nextSeq reassembly.Sequence, start *bool, ac reassembly.AssemblerContext) bool {
// todo: 我们可以在这里检测tcp挟带的应用层数据
// Your code here...
return true
}

func (s *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.AssemblerContext) {
dir, _, _, _ := sg.Info()
l, _ := sg.Lengths()
data := sg.Fetch(l)
if l > 0 {
if dir == reassembly.TCPDirClientToServer {
s.c2sBuf.bytes <- data
} else {
s.s2cBuf.bytes <- data
}
}
}

func (s *tcpStream) ReassemblyComplete(ac reassembly.AssemblerContext) bool {
close(s.c2sBuf.bytes)
close(s.s2cBuf.bytes)
// do not remove the connection to allow last ACK
return false
}

// consume 消费两个缓存中的数据进行下一步处理
func (s *tcpStream) consume() {
c2sReader := bufio.NewReader(s.c2sBuf)
s2cReader := bufio.NewReader(s.s2cBuf)
// todo: 这里等待stream检测出应用层类型后才能开始正确消费流量
// 如你所见,目前默认为http数据,以后我们还想支持grpc的http2和thrift,kafka,redis...

for {
req, err := http.ReadRequest(c2sReader)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
log.Println(err)
continue
}
resp, err := http.ReadResponse(s2cReader, nil)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
log.Println(err)
continue
}

// todo: 我们目前只是将http req/resp以日志的形式打印下来
// 后期我们在这里需要添加过滤http流量,以及转发至replayer的能力
bytes, err := httputil.DumpRequest(req, true)
if err != nil {
log.Println(err)
}
req.Body.Close()
logrus.Debug(string(bytes))

body, err := io.ReadAll(resp.Body)
if err != nil {
log.Println(err)
}
resp.Body.Close()
logrus.Debug(string(body))
}
}
Loading