-
Notifications
You must be signed in to change notification settings - Fork 4.4k
/
handler.go
70 lines (59 loc) · 2.26 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package submatview
import (
"github.com/hashicorp/consul/proto/pbsubscribe"
)
// eventHandler is a function which performs some operation on the received
// events, then returns the eventHandler that should be used for the next set
// of events.
// If eventHandler fails to handle the events it may return an error. If an
// error is returned the next eventHandler will be ignored.
// eventHandler is used to implement a very simple finite-state machine.
type eventHandler func(state viewState, events *pbsubscribe.Event) (next eventHandler, err error)
type viewState interface {
updateView(events []*pbsubscribe.Event, index uint64) error
reset()
}
func initialHandler(index uint64) eventHandler {
if index == 0 {
return newSnapshotHandler()
}
return resumeStreamHandler
}
// snapshotHandler accumulates events. When it receives an EndOfSnapshot event
// it updates the view, and then returns eventStreamHandler to handle new events.
type snapshotHandler struct {
events []*pbsubscribe.Event
}
func newSnapshotHandler() eventHandler {
return (&snapshotHandler{}).handle
}
func (h *snapshotHandler) handle(state viewState, event *pbsubscribe.Event) (eventHandler, error) {
if event.GetEndOfSnapshot() {
err := state.updateView(h.events, event.Index)
return eventStreamHandler, err
}
h.events = append(h.events, eventsFromEvent(event)...)
return h.handle, nil
}
// eventStreamHandler handles events by updating the view. It always returns
// itself as the next handler.
func eventStreamHandler(state viewState, event *pbsubscribe.Event) (eventHandler, error) {
err := state.updateView(eventsFromEvent(event), event.Index)
return eventStreamHandler, err
}
func eventsFromEvent(event *pbsubscribe.Event) []*pbsubscribe.Event {
if batch := event.GetEventBatch(); batch != nil {
return batch.Events
}
return []*pbsubscribe.Event{event}
}
// resumeStreamHandler checks if the event is a NewSnapshotToFollow event. If it
// is it resets the view and returns a snapshotHandler to handle the next event.
// Otherwise it uses eventStreamHandler to handle events.
func resumeStreamHandler(state viewState, event *pbsubscribe.Event) (eventHandler, error) {
if event.GetNewSnapshotToFollow() {
state.reset()
return newSnapshotHandler(), nil
}
return eventStreamHandler(state, event)
}