-
Notifications
You must be signed in to change notification settings - Fork 762
/
store.go
141 lines (114 loc) · 3.25 KB
/
store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package store
import (
"fmt"
"net/url"
"strings"
"time"
"github.com/fagongzi/gateway/pkg/pb/metapb"
"github.com/fagongzi/gateway/pkg/pb/rpcpb"
"github.com/fagongzi/gateway/pkg/util"
)
var (
// TICKER ticket
TICKER = time.Second * 3
// TTL timeout
TTL = int64(5)
)
var (
supportSchema = make(map[string]func(string, string) (Store, error))
)
// EvtType event type
type EvtType int
// EvtSrc event src
type EvtSrc int
const (
// EventTypeNew event type new
EventTypeNew = EvtType(0)
// EventTypeUpdate event type update
EventTypeUpdate = EvtType(1)
// EventTypeDelete event type delete
EventTypeDelete = EvtType(2)
)
const (
// EventSrcCluster cluster event
EventSrcCluster = EvtSrc(0)
// EventSrcServer server event
EventSrcServer = EvtSrc(1)
// EventSrcBind bind event
EventSrcBind = EvtSrc(2)
// EventSrcAPI api event
EventSrcAPI = EvtSrc(3)
// EventSrcRouting routing event
EventSrcRouting = EvtSrc(4)
// EventSrcProxy routing event
EventSrcProxy = EvtSrc(5)
)
// Evt event
type Evt struct {
Src EvtSrc
Type EvtType
Key string
Value interface{}
}
func init() {
supportSchema["etcd"] = getEtcdStoreFrom
}
// GetStoreFrom returns a store implemention, if not support returns error
func GetStoreFrom(registryAddr, prefix string) (Store, error) {
u, err := url.Parse(registryAddr)
if err != nil {
panic(fmt.Sprintf("parse registry addr failed, errors:%+v", err))
}
schema := strings.ToLower(u.Scheme)
fn, ok := supportSchema[schema]
if ok {
return fn(u.Host, prefix)
}
return nil, fmt.Errorf("not support: %s", registryAddr)
}
func getEtcdStoreFrom(addr, prefix string) (Store, error) {
var addrs []string
values := strings.Split(addr, ",")
for _, value := range values {
addrs = append(addrs, fmt.Sprintf("http://%s", value))
}
return NewEtcdStore(addrs, prefix)
}
// Store store interface
type Store interface {
Raw() interface{}
AddBind(bind *metapb.Bind) error
RemoveBind(bind *metapb.Bind) error
RemoveClusterBind(id uint64) error
GetBindServers(id uint64) ([]uint64, error)
PutCluster(cluster *metapb.Cluster) (uint64, error)
RemoveCluster(id uint64) error
GetClusters(limit int64, fn func(interface{}) error) error
GetCluster(id uint64) (*metapb.Cluster, error)
PutServer(svr *metapb.Server) (uint64, error)
RemoveServer(id uint64) error
GetServers(limit int64, fn func(interface{}) error) error
GetServer(id uint64) (*metapb.Server, error)
PutAPI(api *metapb.API) (uint64, error)
RemoveAPI(id uint64) error
GetAPIs(limit int64, fn func(interface{}) error) error
GetAPI(id uint64) (*metapb.API, error)
PutRouting(routing *metapb.Routing) (uint64, error)
RemoveRouting(id uint64) error
GetRoutings(limit int64, fn func(interface{}) error) error
GetRouting(id uint64) (*metapb.Routing, error)
RegistryProxy(proxy *metapb.Proxy, ttl int64) error
GetProxies(limit int64, fn func(*metapb.Proxy) error) error
Watch(evtCh chan *Evt, stopCh chan bool) error
Clean() error
SetID(id uint64) error
BackupTo(to string) error
Batch(batch *rpcpb.BatchReq) (*rpcpb.BatchRsp, error)
System() (*metapb.System, error)
}
func getKey(prefix string, id uint64) string {
return fmt.Sprintf("%s/%020d", prefix, id)
}
func getAddrKey(prefix string, addr string) string {
return fmt.Sprintf("%s/%s", prefix, util.GetAddrFormat(addr))
}