-
Notifications
You must be signed in to change notification settings - Fork 783
/
scanner.go
117 lines (97 loc) · 3.36 KB
/
scanner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package interop
import (
"context"
"io"
"github.com/benthosdev/benthos/v4/internal/codec"
"github.com/benthosdev/benthos/v4/internal/component/scanner"
"github.com/benthosdev/benthos/v4/public/service"
)
const (
fieldCodecFromString = "codec"
crFieldCodec = "scanner"
crFieldMaxBuffer = "max_buffer"
)
func OldReaderCodecFields(defaultScanner string) []*service.ConfigField {
return []*service.ConfigField{
service.NewInternalField(codec.NewReaderDocs(fieldCodecFromString)).Deprecated().Optional(),
service.NewIntField(crFieldMaxBuffer).Deprecated().Default(1000000),
service.NewScannerField(crFieldCodec).
Description("The [scanner](/docs/components/scanners/about) by which the stream of bytes consumed will be broken out into individual messages. Scanners are useful for processing large sources of data without holding the entirety of it within memory. For example, the `csv` scanner allows you to process individual CSV rows without loading the entire CSV file in memory at once.").
Default(map[string]any{defaultScanner: map[string]any{}}).
Version("4.25.0").
Optional(),
}
}
type FallbackReaderCodec interface {
Create(rdr io.ReadCloser, aFn service.AckFunc, details scanner.SourceDetails) (FallbackReaderStream, error)
Close(context.Context) error
}
type FallbackReaderStream interface {
NextBatch(ctx context.Context) (service.MessageBatch, service.AckFunc, error)
Close(context.Context) error
}
func OldReaderCodecFromParsed(conf *service.ParsedConfig) (FallbackReaderCodec, error) {
if conf.Contains(fieldCodecFromString) {
codecName, err := conf.FieldString(fieldCodecFromString)
if err != nil {
return nil, err
}
maxBuffer, _ := conf.FieldInt(crFieldMaxBuffer)
if maxBuffer == 0 {
maxBuffer = 1000000
}
oldCtor, err := codec.GetReader(codecName, codec.ReaderConfig{
MaxScanTokenSize: maxBuffer,
})
if err != nil {
return nil, err
}
return &codecRInternal{oldCtor}, nil
}
ownedCodec, err := conf.FieldScanner(crFieldCodec)
if err != nil {
return nil, err
}
return &codecRPublic{newCtor: ownedCodec}, nil
}
type codecRInternal struct {
oldCtor codec.ReaderConstructor
}
func (r *codecRInternal) Create(rdr io.ReadCloser, aFn service.AckFunc, details scanner.SourceDetails) (FallbackReaderStream, error) {
oldR, err := r.oldCtor(details.Name, rdr, codec.ReaderAckFn(aFn))
if err != nil {
return nil, err
}
return &streamRInternal{oldR}, nil
}
func (r *codecRInternal) Close(ctx context.Context) error {
return nil
}
type streamRInternal struct {
old codec.Reader
}
func (r *streamRInternal) NextBatch(ctx context.Context) (service.MessageBatch, service.AckFunc, error) {
ib, aFn, err := r.old.Next(ctx)
if err != nil {
return nil, nil, err
}
batch := make(service.MessageBatch, len(ib))
for i := range ib {
batch[i] = service.NewInternalMessage(ib[i])
}
return batch, service.AckFunc(aFn), nil
}
func (r *streamRInternal) Close(ctx context.Context) error {
return r.old.Close(ctx)
}
type codecRPublic struct {
newCtor *service.OwnedScannerCreator
}
func (r *codecRPublic) Create(rdr io.ReadCloser, aFn service.AckFunc, details scanner.SourceDetails) (FallbackReaderStream, error) {
sDetails := service.NewScannerSourceDetails()
sDetails.SetName(details.Name)
return r.newCtor.Create(rdr, aFn, sDetails)
}
func (r *codecRPublic) Close(ctx context.Context) error {
return r.newCtor.Close(ctx)
}