/
observer.go
78 lines (70 loc) · 1.64 KB
/
observer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package observer
import (
"github.com/reactivex/rxgo"
"github.com/reactivex/rxgo/handlers"
)
// Observer represents a group of EventHandlers.
type Observer struct {
NextHandler handlers.NextFunc
ErrHandler handlers.ErrFunc
DoneHandler handlers.DoneFunc
}
// DefaultObserver guarantees any handler won't be nil.
var DefaultObserver = Observer{
NextHandler: func(interface{}) {},
ErrHandler: func(err error) {},
DoneHandler: func() {},
}
// Handle registers Observer to EventHandler.
func (ob Observer) Handle(item interface{}) {
switch item := item.(type) {
case error:
ob.ErrHandler(item)
return
default:
ob.NextHandler(item)
}
}
// New constructs a new Observer instance with default Observer and accept
// any number of EventHandler
func New(eventHandlers ...rx.EventHandler) Observer {
ob := DefaultObserver
if len(eventHandlers) > 0 {
for _, handler := range eventHandlers {
switch handler := handler.(type) {
case handlers.NextFunc:
ob.NextHandler = handler
case handlers.ErrFunc:
ob.ErrHandler = handler
case handlers.DoneFunc:
ob.DoneHandler = handler
case Observer:
ob = handler
}
}
}
return ob
}
// OnNext applies Observer's NextHandler to an Item
func (ob Observer) OnNext(item interface{}) {
switch item := item.(type) {
case error:
return
default:
if ob.NextHandler != nil {
ob.NextHandler(item)
}
}
}
// OnError applies Observer's ErrHandler to an error
func (ob Observer) OnError(err error) {
if ob.ErrHandler != nil {
ob.ErrHandler(err)
}
}
// OnDone terminates the Observer's internal Observable
func (ob Observer) OnDone() {
if ob.DoneHandler != nil {
ob.DoneHandler()
}
}