-
Notifications
You must be signed in to change notification settings - Fork 0
/
task.go
100 lines (83 loc) · 2.8 KB
/
task.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package streams
/*
Copyright 2018 Bruno Moura <brunotm@gmail.com>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import (
"sync"
"github.com/dgryski/go-jump"
)
// tasks are dedicated concurrent tasks and buffers for each source or processor
// node that has successors nodes which are the ones allowed to forward in the topology.
// Each task consists of a goroutine and buffer pair to which the forward requests
// from that node are routed to. Ordered processing of records per multiple goroutines
// are guaranteed by using a consistent hash with the the record.id and number of tasks, for
// assigning records with same id to the same task. record ids are generated by hashing the
// encoded key or lately value of a record.
type nodeTasks map[*Node]*tasks
type tasks struct {
sync.RWMutex
buffers []chan Record
}
// forwardFrom forwards the given record to the given node successors.
// If the node has associated tasks, forward the record to the appropriate one.
func (nt nodeTasks) forwardFrom(from *Node, record Record) {
st := nt[from]
// TODO: rework locking strategy for subtasks
st.RLock()
if buckets := len(st.buffers); buckets > 0 {
// Ensure we always process records with same keys within the same task
st.buffers[jump.Hash(record.id, buckets)] <- record
st.RUnlock()
return
}
st.RUnlock()
// if node has no tasks
from.forward(record)
}
func (nt nodeTasks) forwardTo(to string, record Record) (err error) {
// TODO: need a map[node.name]node
for node := range nt {
if node.name == to {
node.pc.activate()
node.processor.Process(node.pc, record)
node.pc.deactivate()
return nil
}
}
return errNodeNotFound
}
// setScale scales the number of tasks to the given scale
func (nt nodeTasks) setScale(node *Node, scale, buffer int) {
st := nt[node]
st.Lock()
defer st.RUnlock()
currScale := len(st.buffers)
// Increase the number of tasks for the given node.
// a scale of 1 only adds a buffer with no scale.
if scale > currScale {
for ; scale > currScale; currScale++ {
task := make(chan Record, buffer)
st.buffers = append(st.buffers, task)
go func() {
for record := range task {
node.forward(record)
}
}()
}
}
if scale < currScale {
for ; scale < currScale; currScale-- {
close(st.buffers[currScale-1])
st.buffers = st.buffers[:currScale-1]
}
}
}