/
api.go
200 lines (164 loc) · 5.01 KB
/
api.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package config
import (
"context"
"fmt"
"io"
"path/filepath"
"sync"
)
type (
// ProtocolType protocol type enum
ProtocolType int32
// PathKey config path key type
PathKey string
// ConfigItem
ConfigItem string
)
const (
_rootPathTemp = "/%s/%s/"
)
var (
DefaultRootPath PathKey
DefaultTenantsPath PathKey
)
func initPath(root, version string) {
if root == "" {
root = "arana-db"
}
if version == "" {
version = "1.0"
}
DefaultRootPath = PathKey(fmt.Sprintf(_rootPathTemp, root, version))
DefaultTenantsPath = PathKey(filepath.Join(string(DefaultRootPath), "tenants"))
}
const (
Http ProtocolType = iota
MySQL
)
const (
_ DataSourceType = ""
DBMySQL DataSourceType = "mysql"
DBPostgreSQL DataSourceType = "postgresql"
)
const (
ConfigItemSpec = "spec"
ConfigItemSysDB = "sys_db"
ConfigItemUsers = "users"
ConfigItemClusters = "clusters"
ConfigItemShardingRule = "sharding_rule"
ConfigItemNodes = "nodes"
ConfigItemShadowRule = "shadow_rule"
)
var (
slots = make(map[string]StoreOperator)
storeOperate StoreOperator
once sync.Once
)
// Register register store plugin
func Register(s StoreOperator) error {
if _, ok := slots[s.Name()]; ok {
return fmt.Errorf("StoreOperator=[%s] already exist", s.Name())
}
slots[s.Name()] = s
return nil
}
type (
EventCallback func(e Event)
SubscribeResult struct {
EventChan <-chan Event
Cancel context.CancelFunc
}
subscriber struct {
watch EventCallback
ctx context.Context
}
Options struct {
StoreName string `yaml:"name"`
RootPath string `yaml:"root_path"`
Options map[string]interface{} `yaml:"options"`
}
// TenantOperator actions specific to tenant spaces
TenantOperator interface {
io.Closer
// ListTenants lists all tenants.
ListTenants() []string
// CreateTenant creates tenant.
CreateTenant(string) error
// UpdateTenant update tenant.
UpdateTenant(tenant string, newTenant string) error
// RemoveTenant removes tenant.
RemoveTenant(string) error
// RemoveTenantUser removes a tenant user.
RemoveTenantUser(tenant, username string) error
// CreateTenantUser creates a user.
CreateTenantUser(tenant, username, password string) error
// UpdateTenantUser update user by username.
UpdateTenantUser(tenant, newUsername, password, oldUsername string) error
// UpsertNode creates a node, or updates a node.
UpsertNode(tenant, node, name, host string, port int, username, password, database, weight string) error
// RemoveNode removes a node.
RemoveNode(tenant, name string) error
// Subscribe subscribes tenants change
Subscribe(ctx context.Context, c EventCallback) context.CancelFunc
}
// Center Configuration center for each tenant, tenant-level isolation
Center interface {
io.Closer
ConfigWriter
ConfigReader
ConfigWatcher
// Tenant tenant info
Tenant() string
}
ConfigReader interface {
io.Closer
// LoadAll loads the full Tenant configuration, the first time it will be loaded remotely,
// and then it will be directly assembled from the cache layer
LoadAll(ctx context.Context) (*Tenant, error)
// Load loads the full Tenant configuration, the first time it will be loaded remotely,
// and then it will be directly assembled from the cache layer
Load(ctx context.Context, item ConfigItem) (*Tenant, error)
}
ConfigWriter interface {
io.Closer
// Import imports the configuration information of a tenant
Import(ctx context.Context, cfg *Tenant) error
// Write imports the configuration information of a tenant
Write(ctx context.Context, item ConfigItem, cfg *Tenant) error
}
ConfigWatcher interface {
io.Closer
// Subscribe subscribes to all changes of an event by EventType
Subscribe(ctx context.Context, et EventType, c EventCallback) (context.CancelFunc, error)
}
// StoreOperator config storage related plugins
StoreOperator interface {
io.Closer
// Init plugin initialization
Init(options map[string]interface{}) error
// Save save a configuration data
Save(key PathKey, val []byte) error
// Get get a configuration
Get(key PathKey) ([]byte, error)
// Watch Monitor changes of the key
Watch(key PathKey) (<-chan []byte, error)
// Name plugin name
Name() string
}
)