forked from sensorbee/sensorbee
-
Notifications
You must be signed in to change notification settings - Fork 0
/
source_registry.go
154 lines (129 loc) · 4.71 KB
/
source_registry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package bql
import (
"fmt"
"gopkg.in/sensorbee/sensorbee.v0/core"
"gopkg.in/sensorbee/sensorbee.v0/data"
"strings"
"sync"
)
// IOParams has parameters for IO plugins.
type IOParams struct {
// TypeName is the name of the type registered to SensorBee.
TypeName string
// Name is the name of the instance specified in a CREATE statement.
Name string
}
// SourceCreator is an interface which creates instances of a Source.
type SourceCreator interface {
// CreateSource creates a new Source instance using given parameters.
CreateSource(ctx *core.Context, ioParams *IOParams, params data.Map) (core.Source, error)
}
type sourceCreatorFunc func(*core.Context, *IOParams, data.Map) (core.Source, error)
func (f sourceCreatorFunc) CreateSource(ctx *core.Context, ioParams *IOParams, params data.Map) (core.Source, error) {
return f(ctx, ioParams, params)
}
// SourceCreatorFunc creates a SourceCreator from a function.
func SourceCreatorFunc(f func(*core.Context, *IOParams, data.Map) (core.Source, error)) SourceCreator {
return sourceCreatorFunc(f)
}
// SourceCreatorRegistry manages creators of Sources.
type SourceCreatorRegistry interface {
// Register adds a Source creator to the registry. It returns an error if
// the type name is already registered.
Register(typeName string, c SourceCreator) error
// Lookup returns a Source creator having the type name. It returns
// core.NotExistError if it doesn't have the creator.
Lookup(typeName string) (SourceCreator, error)
// List returns all creators the registry has. The caller can safely modify
// the map returned from this method.
List() (map[string]SourceCreator, error)
// Unregister removes a creator from the registry. It returns core.NotExistError
// when the registry doesn't have a creator having the type name.
//
// The registry itself doesn't support cascading delete. It should properly
// done by the caller.
Unregister(typeName string) error
}
type defaultSourceCreatorRegistry struct {
m sync.RWMutex
creators map[string]SourceCreator
}
// NewDefaultSourceCreatorRegistry returns a SourceCreatorRegistry having a
// default implementation.
func NewDefaultSourceCreatorRegistry() SourceCreatorRegistry {
return &defaultSourceCreatorRegistry{
creators: map[string]SourceCreator{},
}
}
func (r *defaultSourceCreatorRegistry) Register(typeName string, c SourceCreator) error {
if err := core.ValidateSymbol(typeName); err != nil {
return fmt.Errorf("invalid name for source type: %s", err.Error())
}
r.m.Lock()
defer r.m.Unlock()
lowerName := strings.ToLower(typeName)
if _, ok := r.creators[lowerName]; ok {
return fmt.Errorf("source type '%v' is already registered", typeName)
}
r.creators[lowerName] = c
return nil
}
func (r *defaultSourceCreatorRegistry) Lookup(typeName string) (SourceCreator, error) {
r.m.RLock()
defer r.m.RUnlock()
if c, ok := r.creators[strings.ToLower(typeName)]; ok {
return c, nil
}
return nil, core.NotExistError(fmt.Errorf("source type '%v' is not registered", typeName))
}
func (r *defaultSourceCreatorRegistry) List() (map[string]SourceCreator, error) {
r.m.RLock()
defer r.m.RUnlock()
m := make(map[string]SourceCreator, len(r.creators))
for t, c := range r.creators {
m[t] = c
}
return m, nil
}
func (r *defaultSourceCreatorRegistry) Unregister(typeName string) error {
r.m.Lock()
defer r.m.Unlock()
tn := strings.ToLower(typeName)
if _, ok := r.creators[tn]; !ok {
return core.NotExistError(fmt.Errorf("source type '%v' is not registered", typeName))
}
delete(r.creators, tn)
return nil
}
var (
globalSourceCreatorRegistry = NewDefaultSourceCreatorRegistry()
)
// RegisterGlobalSourceCreator adds a SourceCreator which can be referred from
// alltopologies. SourceCreators registered after running topologies might not
// be seen by those topologies. Call it from init functions to avoid such
// conditions.
func RegisterGlobalSourceCreator(typeName string, c SourceCreator) error {
return globalSourceCreatorRegistry.Register(typeName, c)
}
// MustRegisterGlobalSourceCreator is like RegisterGlobalSourceCreator but
// panics if an error occurred.
func MustRegisterGlobalSourceCreator(typeName string, c SourceCreator) {
if err := globalSourceCreatorRegistry.Register(typeName, c); err != nil {
panic(fmt.Errorf("udf.MustRegisterGlobalSourceCreator: cannot register '%v': %v", typeName, err))
}
}
// CopyGlobalSourceCreatorRegistry creates a new independent copy of the global
// SourceCreatorRegistry.
func CopyGlobalSourceCreatorRegistry() (SourceCreatorRegistry, error) {
r := NewDefaultSourceCreatorRegistry()
m, err := globalSourceCreatorRegistry.List()
if err != nil {
return nil, err
}
for t, c := range m {
if err := r.Register(t, c); err != nil {
return nil, err
}
}
return r, nil
}