/
plugin.go
111 lines (99 loc) · 2.61 KB
/
plugin.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package main
import (
"bytes"
"context"
"crypto/md5"
"encoding/json"
"fmt"
"html/template"
"log/slog"
"net/http"
_ "net/http/pprof"
"github.com/benji-bou/SecPipeline/helper"
"github.com/benji-bou/SecPipeline/pluginctl"
"github.com/benji-bou/chantools"
)
type MemFilterOption = helper.Option[MemFilter]
func MaxBuffSize(buffSize int) MemFilterOption {
return func(configure *MemFilter) {
configure.buffSizeMax = buffSize
}
}
func IllimitedBuffSize() MemFilterOption {
return func(configure *MemFilter) {
configure.buffSizeMax = -1
}
}
func DefaultBuffSize() MemFilterOption {
return IllimitedBuffSize()
// return func(configure *MemFilter) {
// configure.buffSizeMax = 1024
// }
}
type MemFilter struct {
buffSizeMax int
inmem map[[16]byte]struct{}
goTemplateFilter *template.Template
}
func NewMemFilter(opt ...MemFilterOption) *MemFilter {
return helper.ConfigurePtr(&MemFilter{inmem: map[[16]byte]struct{}{}}, append([]MemFilterOption{DefaultBuffSize()}, opt...)...)
}
func (mp *MemFilter) GetInputSchema() ([]byte, error) {
return nil, nil
}
func (mp *MemFilter) Config(config []byte) error {
configFilter := struct {
GoTemplateFilter string `json:"goTemplateFilter"`
}{}
err := json.Unmarshal(config, &configFilter)
if err != nil {
return fmt.Errorf("couldn't unmarshal Distinct plugin config because %w", err)
}
tpl, err := template.New("distinct").Parse(configFilter.GoTemplateFilter)
if err != nil {
return fmt.Errorf("couldn't generate Distinct go template pattern because %w", err)
}
mp.goTemplateFilter = tpl
return nil
}
func (mp *MemFilter) Run(context context.Context, input <-chan *pluginctl.DataStream) (<-chan *pluginctl.DataStream, <-chan error) {
return chantools.NewWithErr(func(c chan<- *pluginctl.DataStream, eC chan<- error, params ...any) {
for {
select {
case <-context.Done():
return
case i := <-input:
slog.Debug("received data", "data", string(i.Data))
buff := &bytes.Buffer{}
if mp.goTemplateFilter != nil {
mp.goTemplateFilter.Execute(buff, i.Data)
}
if buff.Len() == 0 {
buff.Write(i.Data)
}
hash := md5.Sum(buff.Bytes())
if _, exists := mp.inmem[hash]; exists {
continue
}
if len(mp.inmem) >= mp.buffSizeMax && mp.buffSizeMax > 0 {
for toDel := range mp.inmem {
delete(mp.inmem, toDel)
break
}
}
mp.inmem[hash] = struct{}{}
c <- i
}
}
})
}
func main() {
go func() {
http.ListenAndServe("localhost:6061", nil)
}()
helper.SetLog(slog.LevelError)
plugin := pluginctl.NewPlugin("",
pluginctl.WithPluginImplementation(NewMemFilter()),
)
plugin.Serve()
}