forked from redpanda-data/connect
-
Notifications
You must be signed in to change notification settings - Fork 0
/
config_sequence.go
47 lines (43 loc) · 1.85 KB
/
config_sequence.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package input
// SequenceShardedJoinConfig describes an optional mechanism for performing
// sharded joins of structured data resulting from the input sequence. This is a
// way to merge the structured fields of fragmented datasets within memory even
// when the overall size of the data surpasses the memory available on the
// machine.
//
// When configured the sequence of inputs will be consumed multiple times
// according to the number of iterations, and each iteration will process an
// entirely different set of messages by sharding them by the ID field.
//
// Each message must be structured (JSON or otherwise processed into a
// structured form) and the fields will be aggregated with those of other
// messages sharing the ID. At the end of each iteration the joined messages are
// flushed downstream before the next iteration begins.
type SequenceShardedJoinConfig struct {
Type string `json:"type" yaml:"type"`
IDPath string `json:"id_path" yaml:"id_path"`
Iterations int `json:"iterations" yaml:"iterations"`
MergeStrategy string `json:"merge_strategy" yaml:"merge_strategy"`
}
// NewSequenceShardedJoinConfig creates a new sequence sharding configuration
// with default values.
func NewSequenceShardedJoinConfig() SequenceShardedJoinConfig {
return SequenceShardedJoinConfig{
Type: "none",
IDPath: "",
Iterations: 1,
MergeStrategy: "array",
}
}
// SequenceConfig contains configuration values for the Sequence input type.
type SequenceConfig struct {
ShardedJoin SequenceShardedJoinConfig `json:"sharded_join" yaml:"sharded_join"`
Inputs []Config `json:"inputs" yaml:"inputs"`
}
// NewSequenceConfig creates a new SequenceConfig with default values.
func NewSequenceConfig() SequenceConfig {
return SequenceConfig{
ShardedJoin: NewSequenceShardedJoinConfig(),
Inputs: []Config{},
}
}