-
Notifications
You must be signed in to change notification settings - Fork 1
/
graph_node.go
94 lines (77 loc) · 2.46 KB
/
graph_node.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package gstream
type graphNode[T, TR any] struct {
rid routineID
processor Processor[T]
supplier ProcessorSupplier[T, TR]
forwards func() []Processor[TR]
isSrc bool
sctx *streamContext
pipe <-chan T
pool int
}
func (p *graphNode[_, _]) RoutineId() routineID {
return p.rid
}
func newProcessorNode[T, TR any](supplier ProcessorSupplier[T, TR]) *graphNode[T, TR] {
return &graphNode[T, TR]{
supplier: supplier,
forwards: func() []Processor[TR] { return make([]Processor[TR], 0) },
}
}
func newVoidNode[T, TR any]() *graphNode[T, TR] {
return newProcessorNode[T, TR](newVoidSupplier[T, TR]())
}
func newFallThroughNode[T any]() *graphNode[T, T] {
return newProcessorNode[T, T](newFallThroughSupplier[T]())
}
func newSourceNode[T any](rid routineID, sctx *streamContext, pipe <-chan T, pool int) *graphNode[T, T] {
node := newFallThroughNode[T]()
node.rid = rid
node.isSrc = true
node.sctx = sctx
node.pipe = pipe
node.pool = pool
return node
}
func newStreamToTableNode[K, V any](supplier *streamToTableSupplier[K, V]) *graphNode[KeyValue[K, V], KeyValue[K, Change[V]]] {
return newProcessorNode[KeyValue[K, V], KeyValue[K, Change[V]]](supplier)
}
func newTableToValueStreamNode[K, V any]() *graphNode[KeyValue[K, Change[V]], V] {
supplier := newTableToValueStreamSupplier[K, V]()
return newProcessorNode[KeyValue[K, Change[V]], V](supplier)
}
func newTableToStreamNode[K, V any]() *graphNode[KeyValue[K, Change[V]], KeyValue[K, V]] {
supplier := newTableToStreamSupplier[K, V]()
return newProcessorNode[KeyValue[K, Change[V]], KeyValue[K, V]](supplier)
}
func addChild[T, TR, TRR any](parent *graphNode[T, TR], child *graphNode[TR, TRR]) {
current := parent.forwards
parent.forwards = func() []Processor[TR] {
return append(current(), buildAndStart(child))
}
if !child.isSrc {
child.rid = parent.rid
}
}
func curryingAddChild[T, TR, TRR any](parent *graphNode[T, TR]) func(*graphNode[TR, TRR]) {
return func(child *graphNode[TR, TRR]) {
addChild(parent, child)
}
}
func castAddChild[T, TR any](curriedAddChild func(*graphNode[T, T])) func(*graphNode[T, TR]) {
return func(child *graphNode[T, TR]) {
passNode := newFallThroughNode[T]()
curriedAddChild(passNode)
addChild(passNode, child)
}
}
func buildAndStart[T, TR any](n *graphNode[T, TR]) Processor[T] {
if n.processor == nil {
n.processor = n.supplier.Processor(n.forwards()...)
if n.isSrc {
r := newRoutine(n.rid, n.pipe, n.pool, n.processor)
r.run(n.sctx)
}
}
return n.processor
}