TODO
- graph builder lib
- walk graph
- abstractions on top
- serious engine
- parallel operators
- partitioned streams
- remove type checks and input stream check
- fix API to be cleaner
- distinguish records from different streams
- deeper testing
- word count benchmark
- manage time
- add timestamps to records
- watermarks
- windows
- abstractions on top
- add some simple planning
Known Issues
-
Watermarks are global, but windows close on a per-key basis.
This means that any value for any key/source sets a global watermark in an operator, but propagates only once records enter nodes!
This means that we have some difficulty in understanding what happens, especially for out-of-order values.
For example:Window: size 5, slide 2. Watermark: fixed offset 5. Windows: [0, 5), [2, 7), [4, 9), [6, 11), [8, 13), [10, 15), [12, 17), ... Records: {ts: 2, value: "buz"} {ts: 13, value: "bar"} {ts: 3, value: "buz"} {ts: 10, value: "buz"} Output: count of values per window.
Record
13
will make the watermark for theWindowNode
advance to8
and, in theory, close[0, 5), [2, 7)
. The watermark will advance both forbar
andbuz
, but it will propagate tobuz
only when3
gets processed.
Thus, the result will be:[0, 5) - buz: 2 [2, 7) - buz: 2 ...
If the input, instead, is:
{ts: 2, value: "buz"} {ts: 13, value: "bar"} {ts: 10, value: "buz"} {ts: 3, value: "buz"}
So, the output will be:
[0, 5) - buz: 1 [0, 5) - buz: 1 [2, 7) - buz: 1 ...
Because
10
will makebuz
aware of the8
watermark and close[0, 5)
without adding3
.The goal is to make the two outputs be consistent.
-
FixedWindowManager
stores windows in amap
.
This makes iteration non-deterministic and makes some tests flaky. For example, we cannot determine the order in which windows close (the close function gets called).
Optional
- generate graph as command?
- multiple outputs for nodes (with tags?)
- custom triggers (time)
NOTE: These examples show some bits of code, but they could not reflect the exact status of master
.
Word Count
ctx := Context()
p := NewNode(func(collector Collector, v values.Value) error {
in := []string{
"hello",
"this",
"is",
"ssp",
"hello",
"this",
"is",
"sparta",
"sparta",
"is",
"leonida",
}
for _, v := range in {
collector.Collect(values.New(v))
}
return nil
}).SetName("source").
Out().
KeyBy(NewStringValueKeySelector(func(v values.Value) string {
return v.String()
})).
Connect(ctx, NewStatefulNode(values.New(int64(0)),
func(state values.Value, collector Collector, v values.Value) (values.Value, error) {
count := state.Int64() + 1
collector.Collect(values.New(fmt.Sprintf("%v: %d", v, count)))
return values.New(count), nil
})).
SetName("wordCounter").
SetParallelism(4).
Out()
sink, log := NewLogSink(values.String)
p.Connect(ctx, sink.SetName("sink"))
if err := Execute(ctx); err != nil {
panic(err)
}
fmt.Println(log.GetValues())
Align
ctx := Context()
source := NewNode(func(collector Collector, v values.Value) error {
in := []string{
"hello",
"this",
"is",
"ssp",
}
for _, v := range in {
collector.Collect(values.New(v))
}
return nil
}).SetName("source").Out()
upper := source.
Connect(ctx, NewNode(func(collector Collector, v values.Value) error {
collector.Collect(values.New(strings.ToUpper(v.String())))
return nil
})).SetName("upper")
count := source.
Connect(ctx, NewNode(func(collector Collector, v values.Value) error {
collector.Collect(values.New(len(v.String())))
return nil
})).SetName("count")
type state struct {
s1 []values.Value
s2 []values.Value
}
align := NewStatefulNode(values.New(&state{}), func(sv values.Value, collector Collector, v values.Value) (values.Value, error) {
s := sv.Get().(*state)
source := values.GetSource(v)
if source == 0 {
if len(s.s2) > 0 {
ov := s.s2[0]
s.s2 = s.s2[1:]
collector.Collect(values.New(fmt.Sprintf("%v: %v", v, ov)))
} else {
s.s1 = append(s.s1, v)
}
} else {
if len(s.s1) > 0 {
ov := s.s1[0]
s.s1 = s.s1[1:]
collector.Collect(values.New(fmt.Sprintf("%v: %v", ov, v)))
} else {
s.s2 = append(s.s2, v)
}
}
return sv, nil
}).SetName("aligner")
upper.Out().Connect(ctx, align)
aligned := count.Out().Connect(ctx, align).Out()
sink, log := NewLogSink(values.String)
aligned.Connect(ctx, sink.SetName("sink"))
if err := Execute(ctx); err != nil {
panic(err)
}
fmt.Println(log.GetValues())