Skip to content

Commit

Permalink
Add agent gRPC plugin
Browse files Browse the repository at this point in the history
Add plugin to send pcap data over Unix socket using gRPC
  • Loading branch information
noboruma committed Jul 20, 2022
1 parent 191e826 commit d8023b0
Show file tree
Hide file tree
Showing 12 changed files with 272 additions and 9 deletions.
4 changes: 4 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[submodule "deps/agent-plugins-grpc"]
path = deps/agent-plugins-grpc
url = git@github.com:deepfence/agent-plugins-grpc.git
branch = add-pcap
17 changes: 15 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@ ifeq ($(RELEASE),1)
LDFLAGS += -s -w
endif

.PHONY: all build docker-bin docker-image test
.PHONY: all build docker-bin docker-image test localinit

all: build
all: proto build

localinit:
$(PWD)/bootstrap.sh

build:
go build -tags '$(TAGS)' --ldflags '$(LDFLAGS)' -o packetstreamer ./main.go
Expand All @@ -35,3 +38,13 @@ docker-test:

test:
go test -tags '$(TAGS)' ./...

clean:
-rm ./packetstreamer
-rm ./deps/agent-plugins-grpc/proto/*.go
-rm -r $(PWD)/proto

proto: ./deps/agent-plugins-grpc/proto/*.proto
(cd ./deps/agent-plugins-grpc && make go)
-mkdir $(PWD)/proto
cp ./deps/agent-plugins-grpc/proto/*.go $(PWD)/proto
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ little performance impact on the remote hosts. PacketStreamer sensors can be
run on bare-metal servers, on Docker hosts, and on Kubernetes nodes.

The PacketStreamer receiver accepts network traffic from multiple sensors,
collecting it into a single, central `pcap` file. You can then process the
collecting it into a single, central `pcap` file. You can then process the
pcap file or live feed the traffic to the tooling of your choice, such as
`Zeek`, `Wireshark` `Suricata`, or as a live stream for Machine Learning models.

Expand All @@ -54,16 +54,17 @@ network data from multiple machines for central logging and analysis.
For full instructions, refer to the [PacketStreamer Documentation](https://deepfence.github.io/PacketStreamer/).

You will need to install the golang toolchain and `libpcap-dev` before building PacketStreamer.

```shell script
# Pre-requisites (Ubuntu): sudo apt install golang-go libpcap-dev
git clone https://github.com/deepfence/PacketStreamer.git
cd PacketStreamer/
make localinit
make
```

Run a PacketStreamer receiver, listening on port **8081** and writing pcap output to **/tmp/dump_file** (see [receiver.yaml](contrib/config/receiver.yaml)):

```shell script
./packetstreamer receiver --config ./contrib/config/receiver.yaml
```
Expand All @@ -79,7 +80,7 @@ cp ./contrib/config/sensor-local.yaml ./contrib/config/sensor.yaml
./packetstreamer sensor --config ./contrib/config/sensor.yaml
```


## Who uses PacketStreamer?

* Deepfence [ThreatStryker](https://deepfence.io/threatstryker/) uses
Expand Down
1 change: 1 addition & 0 deletions bootstrap.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
git submodule update --init --remote --recursive ./deps/agent-plugins-grpc/
5 changes: 5 additions & 0 deletions contrib/config/sensor-agent.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
output:
plugins:
agent:
socket_path: /tmp/agent.sock
pcapMode: all
1 change: 1 addition & 0 deletions deps/agent-plugins-grpc
Submodule agent-plugins-grpc added at a94c8f
10 changes: 9 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ require (
gopkg.in/yaml.v3 v3.0.1
)

require (
github.com/golang/protobuf v1.5.2 // indirect
golang.org/x/text v0.3.6 // indirect
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect
google.golang.org/protobuf v1.27.1 // indirect
)

require (
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.1 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.11.2 // indirect
Expand All @@ -35,8 +42,9 @@ require (
github.com/miekg/dns v1.1.25 // indirect
github.com/pierrec/lz4/v4 v4.1.14 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550 // indirect
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 // indirect
golang.org/x/net v0.0.0-20211209124913-491a49abca63 // indirect
golang.org/x/sys v0.0.0-20211210111614-af8b64212486 // indirect
google.golang.org/grpc v1.48.0
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
)
109 changes: 109 additions & 0 deletions go.sum

Large diffs are not rendered by default.

28 changes: 28 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,14 @@ type KafkaPluginConfig struct {
Timeout time.Duration `yaml:"timeout,omitempty"`
}

type AgentPluginConfig struct {
SocketPath string `yaml:"socket_path"`
}

type PluginsConfig struct {
S3 *S3PluginConfig
Kafka *KafkaPluginConfig
Agent *AgentPluginConfig
}

type OutputConfig struct {
Expand All @@ -75,6 +80,10 @@ type OutputConfig struct {
Plugins *PluginsConfig
}

type AgentOutputRawConfig struct {
SocketPath string `yaml:"socket_path"`
}

type S3OutputRawConfig struct {
Bucket string
Region string
Expand All @@ -97,6 +106,7 @@ type KafkaOutputRawConfig struct {
type PluginsRawConfig struct {
S3 *S3OutputRawConfig
Kafka *KafkaOutputRawConfig
Agent *AgentOutputRawConfig
}

type OutputRawConfig struct {
Expand Down Expand Up @@ -166,6 +176,7 @@ func NewConfig(configFileName string) (*Config, error) {

var s3Config *S3PluginConfig
var kafkaConfig *KafkaPluginConfig
var agentConfig *AgentPluginConfig
if rawConfig.Output != nil && rawConfig.Output.Plugins != nil {

s3Config, err = populateS3Config(rawConfig)
Expand All @@ -179,6 +190,12 @@ func NewConfig(configFileName string) (*Config, error) {
if err != nil {
return nil, err
}

agentConfig, err = populateAgentConfig(rawConfig)

if err != nil {
return nil, err
}
}

compressBlockSize := 65
Expand Down Expand Up @@ -213,6 +230,7 @@ func NewConfig(configFileName string) (*Config, error) {
Plugins: &PluginsConfig{
S3: s3Config,
Kafka: kafkaConfig,
Agent: agentConfig,
},
},
TLS: rawConfig.TLS,
Expand Down Expand Up @@ -303,6 +321,16 @@ func populateKafkaConfig(rawConfig RawConfig) (*KafkaPluginConfig, error) {
}, nil
}

func populateAgentConfig(rawConfig RawConfig) (*AgentPluginConfig, error) {
if rawConfig.Output.Plugins.Agent == nil {
return nil, nil
}

return &AgentPluginConfig{
SocketPath: rawConfig.Output.Plugins.Agent.SocketPath,
}, nil
}

func populateS3Config(rawConfig RawConfig) (*S3PluginConfig, error) {
if rawConfig.Output.Plugins.S3 == nil {
return nil, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/sensor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ var (
func ValidateSensorConfig(config *Config) error {
if config.Output.File == nil && config.Output.Server == nil &&
(config.Output.Plugins == nil ||
(config.Output.Plugins.S3 == nil && config.Output.Plugins.Kafka == nil)) {
(config.Output.Plugins.S3 == nil && config.Output.Plugins.Kafka == nil && config.Output.Plugins.Agent == nil)) {
return ErrNoOutputConfigured
}
if config.Output.Server != nil && config.Output.Server.Port == nil {
Expand Down
75 changes: 75 additions & 0 deletions pkg/plugins/agent/agent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package agent

import (
"context"
"errors"
"os"
"time"

"github.com/deepfence/PacketStreamer/pkg/config"
pb "github.com/deepfence/PacketStreamer/proto"
"google.golang.org/grpc"
)

const (
SOCKET_RETRIES = 10
)

type Plugin struct {
client pb.PcapForwarderClient
stopChan chan struct{}
}

func NewPlugin(conf *config.AgentPluginConfig) (*Plugin, error) {

_, errw := os.Stat(conf.SocketPath)
tries := 1
for errors.Is(errw, os.ErrNotExist) {
_, errw = os.Stat(conf.SocketPath)
time.Sleep(1 * time.Second)
if tries == SOCKET_RETRIES {
return nil, errors.New("No socket found")
}
tries += 1
}

conn, err := grpc.Dial("unix://"+conf.SocketPath, grpc.WithAuthority("dummy"), grpc.WithInsecure())
if err != nil {
return nil, err
}

return &Plugin{
client: pb.NewPcapForwarderClient(conn),
stopChan: make(chan struct{}, 1),
}, nil
}

func (p *Plugin) Start(ctx context.Context) (chan<- string, error) {

stream, err := p.client.SendPackets(ctx)

if err != nil {
return nil, err
}

recvChan := make(chan string, 10)

go func() {
loop:
for {
select {
case packet := <-recvChan:
stream.Send(&pb.Packet{Payload: []byte(packet)})
case <-p.stopChan:
break loop
}
}
stream.CloseAndRecv()
}()

return recvChan, nil
}

func (p *Plugin) Stop() {
p.stopChan <- struct{}{}
}
20 changes: 19 additions & 1 deletion pkg/plugins/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log"

"github.com/deepfence/PacketStreamer/pkg/config"
"github.com/deepfence/PacketStreamer/pkg/plugins/agent"
"github.com/deepfence/PacketStreamer/pkg/plugins/kafka"
"github.com/deepfence/PacketStreamer/pkg/plugins/s3"
)
Expand Down Expand Up @@ -43,6 +44,23 @@ func Start(ctx context.Context, config *config.Config) (chan<- string, error) {
plugins = append(plugins, kafkaChan)
}

if config.Output.Plugins.Agent != nil {
log.Println("Starting Agent plugin")
agentPlugin, err := agent.NewPlugin(config.Output.Plugins.Agent)

if err != nil {
return nil, fmt.Errorf("error starting Agent plugin, %v", err)
}

agentChan, err := agentPlugin.Start(ctx)

if err != nil {
return nil, fmt.Errorf("error starting Agent plugin, %v", err)
}

plugins = append(plugins, agentChan)
}

inputChan := make(chan string)
go func() {
defer func() {
Expand All @@ -66,5 +84,5 @@ func Start(ctx context.Context, config *config.Config) (chan<- string, error) {
}

func pluginsAreDefined(pluginsConfig *config.PluginsConfig) bool {
return pluginsConfig != nil && (pluginsConfig.S3 != nil || pluginsConfig.Kafka != nil)
return pluginsConfig != nil && (pluginsConfig.Agent != nil || pluginsConfig.S3 != nil || pluginsConfig.Kafka != nil)
}

0 comments on commit d8023b0

Please sign in to comment.