From 6ae534f5965ce10b57819f6285ae96eca733262a Mon Sep 17 00:00:00 2001 From: liaochuntao Date: Sun, 4 Sep 2022 20:47:07 +0800 Subject: [PATCH] feat(config): support new config & Feat config listener (#374) * feat: add group api for UI module #344 (#373) * add: missing interfaces in proto files and discovery files * feat: group api in file db_groups.go (#344) * style: add license header and add tool: imports-formatter * style: fix import * feat: support notify event * refactor:config center * none * feat: none * refactor:split config and add change watch * style: fix code style * fix: fix func call error * fix: ci error * style:fix code style * fix: fix code style * refactor: rebase upstream * fix:add cluster info to list groups Co-authored-by: chovychan <51713304+chovychan@users.noreply.github.com> Co-authored-by: Jeffsky --- cmd/start/start.go | 16 +- cmd/tools/tools.go | 32 +- conf/bootstrap.yaml | 34 +- conf/config.yaml | 196 ++++---- example/import_config/main.go | 29 ++ integration_test/config/db/config.yaml | 144 +++--- integration_test/config/db_tbl/config.yaml | 158 +++---- integration_test/config/tbl/config.yaml | 82 ++-- pkg/admin/admin.api.yaml | 2 +- pkg/admin/router/db_groups.go | 111 +++++ pkg/admin/router/nodes.go | 8 +- pkg/boot/boot.go | 79 ++-- pkg/boot/discovery.go | 275 +++++++---- pkg/boot/discovery_import.go | 84 ++++ pkg/boot/discovery_test.go | 13 +- pkg/boot/discovery_watch.go | 135 ++++++ pkg/boot/options.go | 10 +- pkg/boot/proto.go | 54 ++- pkg/config/api.go | 125 +++-- pkg/config/api_test.go | 48 +- pkg/config/config.go | 516 ++++++++++++++++----- pkg/config/default.go | 52 +++ pkg/config/diff.go | 307 ++++++++++++ pkg/config/diff_test.go | 198 ++++++++ pkg/config/equals.go | 136 ++++++ pkg/config/etcd/etcd.go | 60 ++- pkg/config/etcd/etcd_test.go | 59 +-- pkg/config/event.go | 126 +++++ pkg/config/file/file.go | 189 ++++++-- pkg/config/file/file_test.go | 400 ++++++---------- pkg/config/model.go | 80 +++- pkg/config/model_test.go | 14 +- pkg/config/nacos/nacos.go | 92 ++-- pkg/config/nacos/nacos_test.go | 81 ++-- pkg/util/match/slice_match.go | 140 ++++++ test/suite.go | 46 +- testdata/fake_bootstrap.yaml | 76 +-- testdata/fake_config.yaml | 175 +++---- 38 files changed, 3055 insertions(+), 1327 deletions(-) create mode 100644 example/import_config/main.go create mode 100644 pkg/admin/router/db_groups.go create mode 100644 pkg/boot/discovery_import.go create mode 100644 pkg/boot/discovery_watch.go create mode 100644 pkg/config/default.go create mode 100644 pkg/config/diff.go create mode 100644 pkg/config/diff_test.go create mode 100644 pkg/config/equals.go create mode 100644 pkg/config/event.go create mode 100644 pkg/util/match/slice_match.go diff --git a/cmd/start/start.go b/cmd/start/start.go index 87b29943..523e1e00 100644 --- a/cmd/start/start.go +++ b/cmd/start/start.go @@ -33,7 +33,6 @@ import ( import ( "github.com/arana-db/arana/cmd/cmds" "github.com/arana-db/arana/pkg/boot" - "github.com/arana-db/arana/pkg/config" "github.com/arana-db/arana/pkg/constants" "github.com/arana-db/arana/pkg/executor" "github.com/arana-db/arana/pkg/mysql" @@ -85,13 +84,7 @@ func Run(bootstrapConfigPath string, importPath string) { } if len(importPath) > 0 { - c, err := config.Load(importPath) - if err != nil { - log.Fatal("failed to import configuration from %s: %v", importPath, err) - return - } - if err := discovery.GetConfigCenter().ImportConfiguration(c); err != nil { - log.Fatal("failed to import configuration from %s: %v", importPath, err) + if !boot.RunImport(bootstrapConfigPath, importPath) { return } } @@ -103,12 +96,7 @@ func Run(bootstrapConfigPath string, importPath string) { propeller := server.NewServer() - listenersConf, err := discovery.ListListeners(context.Background()) - if err != nil { - log.Fatal("start failed: %v", err) - return - } - + listenersConf := discovery.ListListeners(context.Background()) for _, listenerConf := range listenersConf { listener, err := mysql.NewListener(listenerConf) if err != nil { diff --git a/cmd/tools/tools.go b/cmd/tools/tools.go index ce2b9209..f5509456 100644 --- a/cmd/tools/tools.go +++ b/cmd/tools/tools.go @@ -18,7 +18,7 @@ package tools import ( - "context" + "github.com/arana-db/arana/pkg/boot" "os" ) @@ -28,10 +28,7 @@ import ( import ( "github.com/arana-db/arana/cmd/cmds" - "github.com/arana-db/arana/pkg/boot" - "github.com/arana-db/arana/pkg/config" "github.com/arana-db/arana/pkg/constants" - "github.com/arana-db/arana/pkg/util/log" ) var ( @@ -45,7 +42,7 @@ func init() { Use: "import", Short: "import arana config", Example: "./arana import -c ../docker/conf/bootstrap.yaml -s ../docker/conf/config.yaml", - Run: Run, + Run: run, } cmd.PersistentFlags(). @@ -58,25 +55,10 @@ func init() { }) } -func Run(cmd *cobra.Command, args []string) { - _, _ = cmd, args - - discovery := boot.NewDiscovery(importBootConfPath) - if err := discovery.Init(context.Background()); err != nil { - log.Fatal("init failed: %+v", err) - return - } - - cfg, err := config.Load(sourceConfigPath) - if err != nil { - log.Fatal("load config from %s failed: %+v", sourceConfigPath, err) - return - } - - c := discovery.GetConfigCenter() +func run(_ *cobra.Command, _ []string) { + Run(importBootConfPath, sourceConfigPath) +} - if err := c.ImportConfiguration(cfg); err != nil { - log.Fatal("persist config to config.store failed: %+v", err) - return - } +func Run(importConfPath, configPath string) { + boot.RunImport(importConfPath, configPath) } diff --git a/conf/bootstrap.yaml b/conf/bootstrap.yaml index dd15f743..4042c45c 100644 --- a/conf/bootstrap.yaml +++ b/conf/bootstrap.yaml @@ -15,19 +15,29 @@ # limitations under the License. # +kind: ConfigMap +apiVersion: "1.0" +listeners: + - protocol_type: mysql + server_version: 5.7.0 + socket_address: + address: 0.0.0.0 + port: 13306 config: name: file options: - # name: etcd - # options: - # endpoints: "http://localhost:2379" - # name: nacos - # options: - # endpoints: "localhost:8080" - # namespace: arana - # group: arana - # contextPath: /nacos - # scheme: http - # username: nacos - # password: nacos \ No newline at end of file +# name: etcd +# root_path: arana +# options: +# endpoints: "http://127.0.0.1:2379" + +# name: nacos +# options: +# endpoints: "127.0.0.1:8848" +# namespace: arana +# group: arana +# contextPath: /nacos +# scheme: http +# username: nacos +# password: nacos diff --git a/conf/config.yaml b/conf/config.yaml index f5b016fe..eade8b61 100644 --- a/conf/config.yaml +++ b/conf/config.yaml @@ -20,13 +20,6 @@ apiVersion: "1.0" metadata: name: arana-config data: - listeners: - - protocol_type: mysql - server_version: 5.7.0 - socket_address: - address: 0.0.0.0 - port: 13306 - tenants: - name: arana users: @@ -34,107 +27,92 @@ data: password: "123456" - username: arana password: "123456" - - clusters: - - name: employees - type: mysql - sql_max_limit: -1 - tenant: arana - parameters: - max_allowed_packet: 256M - groups: - - name: employees_0000 - nodes: - - name: node0 - host: arana-mysql - port: 3306 - username: root - password: "123456" - database: employees_0000 - weight: r10w10 - parameters: - - name: node0_r_0 - host: arana-mysql - port: 3306 - username: root - password: "123456" - database: employees_0000_r - weight: r0w0 - - name: employees_0001 - nodes: - - name: node1 - host: arana-mysql - port: 3306 - username: root - password: "123456" - database: employees_0001 - weight: r10w10 - - name: employees_0002 - nodes: - - name: node2 - host: arana-mysql - port: 3306 - username: root - password: "123456" - database: employees_0002 - weight: r10w10 - - name: employees_0003 - nodes: - - name: node3 - host: arana-mysql - port: 3306 - username: root - password: "123456" - database: employees_0003 - weight: r10w10 - - name: employees_shadow - nodes: - - name: node_shadow - host: arana-mysql - port: 3306 - username: root - password: "123456" - database: employees_show - weight: r10w10 - sharding_rule: - tables: - - name: employees.student - allow_full_scan: true - sequence: - type: snowflake - option: - db_rules: - - column: uid - type: scriptExpr - expr: parseInt($value % 32 / 8) - tbl_rules: - - column: uid - type: scriptExpr - expr: $value % 32 - step: 32 - topology: - db_pattern: employees_${0000..0003} - tbl_pattern: student_${0000..0031} - attributes: - sqlMaxLimit: -1 - - shadow_rule: - tables: - - name: student - enable: false - group_node: employees_shadow - match_rules: - - operation: [insert,update] - match_type: value - attributes: + clusters: + - name: employees + type: mysql + sql_max_limit: -1 + tenant: arana + parameters: + max_allowed_packet: 256M + groups: + - name: employees_0000 + nodes: + - node0 + - node0_r_0 + - name: employees_0001 + nodes: + - node1 + - name: employees_0002 + nodes: + - node2 + - name: employees_0003 + nodes: + - node3 + sharding_rule: + tables: + - name: employees.student + allow_full_scan: true + sequence: + type: snowflake + option: + db_rules: - column: uid - value: 10000 - - operation: [delete] - match_type: regex - attributes: - - column: name - regex: "^hanmeimei$" - - operation: [select] - match_type: hint + type: scriptExpr + expr: parseInt($value % 32 / 8) + tbl_rules: + - column: uid + type: scriptExpr + expr: $value % 32 + step: 32 + topology: + db_pattern: employees_${0000..0003} + tbl_pattern: student_${0000..0031} attributes: - - shadow: true + sqlMaxLimit: -1 + nodes: + node0: + name: node0 + host: arana-mysql + port: 3306 + username: root + password: "123456" + database: employees_0000 + weight: r10w10 + parameters: + node0_r_0: + name: node0_r_0 + host: arana-mysql + port: 3306 + username: root + password: "123456" + database: employees_0000_r + weight: r0w0 + parameters: + node1: + name: node1 + host: arana-mysql + port: 3306 + username: root + password: "123456" + database: employees_0001 + weight: r10w10 + parameters: + node2: + name: node2 + host: arana-mysql + port: 3306 + username: root + password: "123456" + database: employees_0002 + weight: r10w10 + parameters: + node3: + name: node3 + host: arana-mysql + port: 3306 + username: root + password: "123456" + database: employees_0003 + weight: r10w10 + parameters: + diff --git a/example/import_config/main.go b/example/import_config/main.go new file mode 100644 index 00000000..6e2d882d --- /dev/null +++ b/example/import_config/main.go @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "github.com/arana-db/arana/cmd/tools" + "github.com/arana-db/arana/testdata" +) + +func main() { + bootstrap := testdata.Path("../conf/bootstrap.yaml") + config := testdata.Path("../conf/config.yaml") + tools.Run(bootstrap, config) +} diff --git a/integration_test/config/db/config.yaml b/integration_test/config/db/config.yaml index 458751b9..8637ff65 100644 --- a/integration_test/config/db/config.yaml +++ b/integration_test/config/db/config.yaml @@ -20,13 +20,6 @@ apiVersion: "1.0" metadata: name: arana-config data: - listeners: - - protocol_type: mysql - server_version: 5.7.0 - socket_address: - address: 0.0.0.0 - port: 13306 - tenants: - name: arana users: @@ -34,68 +27,75 @@ data: password: "123456" - username: arana password: "123456" - - clusters: - - name: employees - type: mysql - sql_max_limit: -1 - tenant: arana - groups: - - name: employees_0000 - nodes: - - name: node0 - host: arana-mysql - port: 3306 - username: root - password: "123456" - database: employees_0000 - weight: r10w10 - - name: employees_0001 - nodes: - - name: node1 - host: arana-mysql - port: 3306 - username: root - password: "123456" - database: employees_0001 - weight: r10w10 - - name: employees_0002 - nodes: - - name: node2 - host: arana-mysql - port: 3306 - username: root - password: "123456" - database: employees_0002 - weight: r10w10 - - name: employees_0003 - nodes: - - name: node3 - host: arana-mysql - port: 3306 - username: root - password: "123456" - database: employees_0003 - weight: r10w10 - - sharding_rule: - tables: - - name: employees.student - allow_full_scan: true - sequence: - type: snowflake - option: - db_rules: - - column: uid - type: scriptExpr - expr: parseInt($value % 32 / 8) - step: 32 - tbl_rules: - - column: uid - type: scriptExpr - expr: parseInt(0) - topology: - db_pattern: employees_${0000...0003} - tbl_pattern: student_0000 - attributes: - sqlMaxLimit: -1 + clusters: + - name: employees + type: mysql + sql_max_limit: -1 + tenant: arana + groups: + - name: employees_0000 + nodes: + - node0 + - name: employees_0001 + nodes: + - node1 + - name: employees_0002 + nodes: + - node2 + - name: employees_0003 + nodes: + - node3 + sharding_rule: + tables: + - name: employees.student + allow_full_scan: true + sequence: + type: snowflake + option: + db_rules: + - column: uid + type: scriptExpr + expr: parseInt($value % 32 / 8) + step: 32 + tbl_rules: + - column: uid + type: scriptExpr + expr: parseInt(0) + topology: + db_pattern: employees_${0000...0003} + tbl_pattern: student_0000 + attributes: + sqlMaxLimit: -1 + nodes: + node0: + name: node0 + host: arana-mysql + port: 3306 + username: root + password: "123456" + database: employees_0000 + weight: r10w10 + node1: + name: node1 + host: arana-mysql + port: 3306 + username: root + password: "123456" + database: employees_0001 + weight: r10w10 + node2: + name: node2 + host: arana-mysql + port: 3306 + username: root + password: "123456" + database: employees_0002 + weight: r10w10 + node3: + name: node3 + host: arana-mysql + port: 3306 + username: root + password: "123456" + database: employees_0003 + weight: r10w10 diff --git a/integration_test/config/db_tbl/config.yaml b/integration_test/config/db_tbl/config.yaml index b3d9702a..d697962a 100644 --- a/integration_test/config/db_tbl/config.yaml +++ b/integration_test/config/db_tbl/config.yaml @@ -20,13 +20,6 @@ apiVersion: "1.0" metadata: name: arana-config data: - listeners: - - protocol_type: mysql - server_version: 5.7.0 - socket_address: - address: 0.0.0.0 - port: 13306 - tenants: - name: arana users: @@ -34,74 +27,83 @@ data: password: "123456" - username: arana password: "123456" - - clusters: - - name: employees - type: mysql - sql_max_limit: -1 - tenant: arana - groups: - - name: employees_0000 - nodes: - - name: node0 - host: arana-mysql - port: 3306 - username: root - password: "123456" - database: employees_0000 - weight: r10w10 - - name: node0_r_0 - host: arana-mysql - port: 3306 - username: root - password: "123456" - database: employees_0000_r - weight: r0w0 - - name: employees_0001 - nodes: - - name: node1 - host: arana-mysql - port: 3306 - username: root - password: "123456" - database: employees_0001 - weight: r10w10 - - name: employees_0002 - nodes: - - name: node2 - host: arana-mysql - port: 3306 - username: root - password: "123456" - database: employees_0002 - weight: r10w10 - - name: employees_0003 - nodes: - - name: node3 - host: arana-mysql - port: 3306 - username: root - password: "123456" - database: employees_0003 - weight: r10w10 - - sharding_rule: - tables: - - name: employees.student - allow_full_scan: true - sequence: - type: snowflake - option: - db_rules: - - column: uid - type: scriptExpr - expr: parseInt($value % 32 / 8) - tbl_rules: - - column: uid - type: scriptExpr - expr: $value % 32 - topology: - db_pattern: employees_${0000..0003} - tbl_pattern: student_${0000..0031} - attributes: - sqlMaxLimit: -1 + clusters: + - name: employees + type: mysql + sql_max_limit: -1 + tenant: arana + groups: + - name: employees_0000 + nodes: + - node0 + - node0_r_0 + - name: employees_0001 + nodes: + - node1 + - name: employees_0002 + nodes: + - node2 + - name: employees_0003 + nodes: + - node3 + sharding_rule: + tables: + - name: employees.student + allow_full_scan: true + sequence: + type: snowflake + option: + db_rules: + - column: uid + type: scriptExpr + expr: parseInt($value % 32 / 8) + tbl_rules: + - column: uid + type: scriptExpr + expr: $value % 32 + topology: + db_pattern: employees_${0000..0003} + tbl_pattern: student_${0000..0031} + attributes: + sqlMaxLimit: -1 + nodes: + node0: + name: node0 + host: arana-mysql + port: 3306 + username: root + password: "123456" + database: employees_0000 + weight: r10w10 + node0_r_0: + name: node0_r_0 + host: arana-mysql + port: 3306 + username: root + password: "123456" + database: employees_0000_r + weight: r0w0 + node1: + name: node1 + host: arana-mysql + port: 3306 + username: root + password: "123456" + database: employees_0001 + weight: r10w10 + node2: + name: node2 + host: arana-mysql + port: 3306 + username: root + password: "123456" + database: employees_0002 + weight: r10w10 + node3: + name: node3 + host: arana-mysql + port: 3306 + username: root + password: "123456" + database: employees_0003 + weight: r10w10 diff --git a/integration_test/config/tbl/config.yaml b/integration_test/config/tbl/config.yaml index b4698084..2cee24b6 100644 --- a/integration_test/config/tbl/config.yaml +++ b/integration_test/config/tbl/config.yaml @@ -20,13 +20,6 @@ apiVersion: "1.0" metadata: name: arana-config data: - listeners: - - protocol_type: mysql - server_version: 5.7.0 - socket_address: - address: 0.0.0.0 - port: 13306 - tenants: - name: arana users: @@ -34,40 +27,41 @@ data: password: "123456" - username: arana password: "123456" - - clusters: - - name: employees - type: mysql - sql_max_limit: -1 - tenant: arana - groups: - - name: employees_0000 - nodes: - - name: node0 - host: arana-mysql - port: 3306 - username: root - password: "123456" - database: employees_0000 - weight: r10w10 - - sharding_rule: - tables: - - name: employees.student - allow_full_scan: true - sequence: - type: snowflake - option: - db_rules: - - column: uid - type: scriptExpr - expr: parseInt(0) - tbl_rules: - - column: uid - type: scriptExpr - expr: $value % 32 - topology: - db_pattern: employees_0000 - tbl_pattern: student_${0000..0031} - attributes: - sqlMaxLimit: -1 + clusters: + - name: employees + type: mysql + sql_max_limit: -1 + tenant: arana + groups: + - name: employees_0000 + nodes: + - node0 + sharding_rule: + tables: + - name: employees.student + allow_full_scan: true + sequence: + type: snowflake + option: + db_rules: + - column: uid + type: scriptExpr + expr: parseInt(0) + tbl_rules: + - column: uid + type: scriptExpr + expr: $value % 32 + topology: + db_pattern: employees_0000 + tbl_pattern: student_${0000..0031} + attributes: + sqlMaxLimit: -1 + nodes: + node0: + name: node0 + host: arana-mysql + port: 3306 + username: root + password: "123456" + database: employees_0000 + weight: r10w10 diff --git a/pkg/admin/admin.api.yaml b/pkg/admin/admin.api.yaml index 8c8dda9a..25060888 100644 --- a/pkg/admin/admin.api.yaml +++ b/pkg/admin/admin.api.yaml @@ -358,4 +358,4 @@ components: Clusters: type: array items: - $ref: '#/components/schemas/Cluster' \ No newline at end of file + $ref: '#/components/schemas/Cluster' diff --git a/pkg/admin/router/db_groups.go b/pkg/admin/router/db_groups.go new file mode 100644 index 00000000..e94c2953 --- /dev/null +++ b/pkg/admin/router/db_groups.go @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package router + +import ( + "context" + "net/http" +) + +import ( + "github.com/gin-gonic/gin" +) + +import ( + "github.com/arana-db/arana/pkg/admin" + "github.com/arana-db/arana/pkg/boot" +) + +func init() { + admin.Register(func(router gin.IRoutes) { + router.POST("/tenants/:tenant/groups", CreateGroup) + router.GET("/tenants/:tenant/groups", ListGroups) + router.GET("/tenants/:tenant/groups/:group", GetGroup) + router.PUT("/tenants/:tenant/groups/:group", UpdateGroup) + router.DELETE("/tenants/:tenant/groups/:group", RemoveGroup) + }) +} + +func CreateGroup(c *gin.Context) { + service := admin.GetService(c) + tenantName := c.Param("tenant") + var group *boot.GroupBody + if err := c.ShouldBindJSON(&group); err == nil { + err := service.UpsertGroup(context.Background(), tenantName, "", "", group) + if err != nil { + _ = c.Error(err) + return + } + c.JSON(http.StatusOK, nil) + } else { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + } +} + +func ListGroups(c *gin.Context) { + service := admin.GetService(c) + tenantName := c.Param("tenant") + cluster := c.Param("cluster") + groups, err := service.ListGroups(context.Background(), tenantName, cluster) + if err != nil { + _ = c.Error(err) + return + } + c.JSON(http.StatusOK, groups) +} + +func GetGroup(c *gin.Context) { + service := admin.GetService(c) + tenant := c.Param("tenant") + group := c.Param("group") + data, err := service.GetGroup(context.Background(), tenant, "", group) + if err != nil { + _ = c.Error(err) + return + } + c.JSON(http.StatusOK, data) +} + +func UpdateGroup(c *gin.Context) { + service := admin.GetService(c) + tenant := c.Param("tenant") + group := c.Param("group") + var groupBody *boot.GroupBody + if err := c.ShouldBindJSON(&groupBody); err == nil { + err := service.UpsertGroup(context.Background(), tenant, "", group, groupBody) + if err != nil { + _ = c.Error(err) + return + } + c.JSON(http.StatusOK, nil) + } else { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + } +} + +func RemoveGroup(c *gin.Context) { + service := admin.GetService(c) + tenant, group := c.Param("tenant"), c.Param("group") + + err := service.RemoveGroup(context.Background(), tenant, "", group) + if err != nil { + _ = c.Error(err) + return + } + c.JSON(http.StatusOK, nil) +} diff --git a/pkg/admin/router/nodes.go b/pkg/admin/router/nodes.go index fb6c8547..14192a14 100644 --- a/pkg/admin/router/nodes.go +++ b/pkg/admin/router/nodes.go @@ -51,13 +51,13 @@ func ListNodes(c *gin.Context) { return } for _, cluster := range clusters { - groups, err := service.ListGroups(c, cluster) + groups, err := service.ListGroups(c, tenantName, cluster) if err != nil { _ = c.Error(err) return } for _, group := range groups { - nodesArray, err := service.ListNodes(c, cluster, group) + nodesArray, err := service.ListNodes(c, tenantName, cluster, group) if err != nil { _ = c.Error(err) return @@ -86,13 +86,13 @@ func GetNode(c *gin.Context) { } var data *config.Node for _, cluster := range clusters { - groups, err := service.ListGroups(c, cluster) + groups, err := service.ListGroups(c, tenant, cluster) if err != nil { _ = c.Error(err) return } for _, group := range groups { - data, err = service.GetNode(c, cluster, group, node) + data, err = service.GetNode(c, tenant, cluster, group, node) if err != nil { _ = c.Error(err) continue diff --git a/pkg/boot/boot.go b/pkg/boot/boot.go index 08696dcc..47ffe1d3 100644 --- a/pkg/boot/boot.go +++ b/pkg/boot/boot.go @@ -29,7 +29,6 @@ import ( "github.com/arana-db/arana/pkg/config" "github.com/arana-db/arana/pkg/proto/rule" "github.com/arana-db/arana/pkg/runtime" - rcontext "github.com/arana-db/arana/pkg/runtime/context" "github.com/arana-db/arana/pkg/runtime/namespace" _ "github.com/arana-db/arana/pkg/schema" "github.com/arana-db/arana/pkg/security" @@ -41,62 +40,62 @@ func Boot(ctx context.Context, provider Discovery) error { return err } - clusters, err := provider.ListClusters(ctx, "") - if err != nil { - return err + var ( + err error + tenants []string + ) + if tenants, err = provider.ListTenants(ctx); err != nil { + return errors.Wrap(err, "no tenants found") } - for _, cluster := range clusters { - var ( - c *Cluster - ns *namespace.Namespace - ) - - if c, err = provider.GetCluster(ctx, "", cluster); err != nil { - continue + for _, tenant := range tenants { + clusters, err := provider.ListClusters(ctx, tenant) + if err != nil { + return err } - ctx = rcontext.WithTenant(ctx, c.Tenant) + for _, cluster := range clusters { + var ( + ns *namespace.Namespace + ) - if ns, err = buildNamespace(ctx, provider, cluster); err != nil { - log.Errorf("build namespace %s failed: %v", cluster, err) - continue - } - if err = namespace.Register(ns); err != nil { - log.Errorf("register namespace %s failed: %v", cluster, err) - continue - } - log.Infof("register namespace %s successfully", cluster) - security.DefaultTenantManager().PutCluster(c.Tenant, cluster) - } + if _, err = provider.GetCluster(ctx, tenant, cluster); err != nil { + continue + } - var tenants []string - if tenants, err = provider.ListTenants(ctx); err != nil { - return errors.Wrap(err, "no tenants found") - } + if ns, err = buildNamespace(ctx, tenant, provider, cluster); err != nil { + log.Errorf("build namespace %s failed: %v", cluster, err) + continue + } + if err = namespace.Register(ns); err != nil { + log.Errorf("register namespace %s failed: %v", cluster, err) + continue + } + log.Infof("register namespace %s successfully", cluster) + security.DefaultTenantManager().PutCluster(tenant, cluster) + } - for _, tenant := range tenants { - var t *config.Tenant - if t, err = provider.GetTenant(ctx, tenant); err != nil { + var users config.Users + if users, err = provider.ListUsers(ctx, tenant); err != nil { log.Errorf("failed to get tenant %s: %v", tenant, err) continue } - for _, it := range t.Users { - security.DefaultTenantManager().PutUser(tenant, it) + for i := range users { + security.DefaultTenantManager().PutUser(tenant, users[i]) } } return nil } -func buildNamespace(ctx context.Context, provider Discovery, clusterName string) (*namespace.Namespace, error) { +func buildNamespace(ctx context.Context, tenant string, provider Discovery, clusterName string) (*namespace.Namespace, error) { var ( cluster *config.DataSourceCluster groups []string err error ) - cluster, err = provider.GetDataSourceCluster(ctx, clusterName) + cluster, err = provider.GetDataSourceCluster(ctx, tenant, clusterName) if err != nil { return nil, err } @@ -105,19 +104,19 @@ func buildNamespace(ctx context.Context, provider Discovery, clusterName string) parameters = cluster.Parameters } - if groups, err = provider.ListGroups(ctx, clusterName); err != nil { + if groups, err = provider.ListGroups(ctx, tenant, clusterName); err != nil { return nil, err } var initCmds []namespace.Command for _, group := range groups { var nodes []string - if nodes, err = provider.ListNodes(ctx, clusterName, group); err != nil { + if nodes, err = provider.ListNodes(ctx, tenant, clusterName, group); err != nil { return nil, err } for _, it := range nodes { var node *config.Node - if node, err = provider.GetNode(ctx, clusterName, group, it); err != nil { + if node, err = provider.GetNode(ctx, tenant, clusterName, group, it); err != nil { return nil, errors.WithStack(err) } if node.Parameters == nil { @@ -129,14 +128,14 @@ func buildNamespace(ctx context.Context, provider Discovery, clusterName string) } var tables []string - if tables, err = provider.ListTables(ctx, clusterName); err != nil { + if tables, err = provider.ListTables(ctx, tenant, clusterName); err != nil { return nil, errors.WithStack(err) } var ru rule.Rule for _, table := range tables { var vt *rule.VTable - if vt, err = provider.GetTable(ctx, clusterName, table); err != nil { + if vt, err = provider.GetTable(ctx, tenant, clusterName, table); err != nil { return nil, err } if vt == nil { diff --git a/pkg/boot/discovery.go b/pkg/boot/discovery.go index fdf75899..90fb8dea 100644 --- a/pkg/boot/discovery.go +++ b/pkg/boot/discovery.go @@ -23,7 +23,6 @@ import ( "io/ioutil" "path/filepath" "regexp" - "sort" "strconv" "strings" "sync" @@ -57,6 +56,11 @@ var ( _regexpRuleExprSync sync.Once ) +var ( + ErrorNoTenant = errors.New("no tenant") + ErrorNoDataSourceCluster = errors.New("no datasourceCluster") +) + func getTableRegexp() *regexp.Regexp { _regexpTableOnce.Do(func() { _regexpTable = regexp.MustCompile("([a-zA-Z0-9\\-_]+)\\.([a-zA-Z0-9\\\\-_]+)") @@ -75,7 +79,9 @@ type discovery struct { inited uatomic.Bool path string options *BootOptions - c *config.Center + + tenantOp config.TenantOperator + centers map[string]config.Center } func (fp *discovery) UpsertTenant(ctx context.Context, tenant string, body *TenantBody) error { @@ -138,133 +144,167 @@ func (fp *discovery) RemoveTable(ctx context.Context, tenant, cluster, table str panic("implement me") } +func (fp *discovery) Import(ctx context.Context, info *config.Tenant) error { + op, ok := fp.centers[info.Name] + if !ok { + return ErrorNoTenant + } + + return op.Import(ctx, info) +} + func (fp *discovery) Init(ctx context.Context) error { if !fp.inited.CAS(false, true) { return nil } - if err := fp.loadBootOptions(); err != nil { + cfg, err := LoadBootOptions(fp.path) + if err != nil { return err } + fp.options = cfg - if err := fp.initConfigCenter(); err != nil { + if err := config.Init(*fp.options.Config, fp.options.Spec.APIVersion); err != nil { return err } + fp.tenantOp, err = config.NewTenantOperator(config.GetStoreOperate()) + if err != nil { + return err + } + if err := fp.initAllConfigCenter(); err != nil { + return err + } return nil } -func (fp *discovery) loadBootOptions() error { - content, err := ioutil.ReadFile(fp.path) +func LoadBootOptions(path string) (*BootOptions, error) { + content, err := ioutil.ReadFile(path) if err != nil { err = errors.Wrap(err, "failed to load config") - return err + return nil, err } - if !file.IsYaml(fp.path) { - err = errors.Errorf("invalid config file format: %s", filepath.Ext(fp.path)) - return err + if !file.IsYaml(path) { + err = errors.Errorf("invalid config file format: %s", filepath.Ext(path)) + return nil, err } var cfg BootOptions if err = yaml.Unmarshal(content, &cfg); err != nil { err = errors.Wrapf(err, "failed to unmarshal config") - return err + return nil, err } - fp.options = &cfg - return nil + return &cfg, nil } -func (fp *discovery) initConfigCenter() error { - c, err := config.NewCenter(*fp.options.Config) - if err != nil { - return err - } +func (fp *discovery) initAllConfigCenter() error { + + tenants := fp.tenantOp.ListTenants() + for i := range tenants { + tenant := tenants[i] + + options := *fp.options.Config + if len(options.Options) == 0 { + options.Options = map[string]interface{}{} + } + options.Options["tenant"] = tenant - fp.c = c + fp.centers[tenant] = config.NewCenter(tenant, config.GetStoreOperate()) + } return nil } -func (fp *discovery) GetConfigCenter() *config.Center { - return fp.c +func (fp *discovery) GetDataSourceCluster(ctx context.Context, tenant, cluster string) (*config.DataSourceCluster, error) { + dataSourceCluster, err := fp.loadCluster(tenant, cluster) + if err != nil { + return nil, err + } + return dataSourceCluster, nil } -func (fp *discovery) GetDataSourceCluster(ctx context.Context, cluster string) (*config.DataSourceCluster, error) { - dataSourceCluster, ok := fp.loadCluster(cluster) +func (fp *discovery) GetGroup(ctx context.Context, tenant, cluster, group string) (*config.Group, error) { + exist, ok := fp.loadGroup(tenant, cluster, group) if !ok { return nil, nil } - return dataSourceCluster, nil + + return exist, nil } func (fp *discovery) GetCluster(ctx context.Context, tenant, cluster string) (*Cluster, error) { - exist, ok := fp.loadCluster(cluster) - if !ok { - return nil, nil + exist, err := fp.loadCluster(tenant, cluster) + if err != nil { + return nil, err } return &Cluster{ - Tenant: exist.Tenant, - Type: exist.Type, + Type: exist.Type, }, nil } func (fp *discovery) ListTenants(ctx context.Context) ([]string, error) { - cfg, err := fp.c.Load() - if err != nil { - return nil, err - } - var tenants []string - for _, it := range cfg.Data.Tenants { - tenants = append(tenants, it.Name) - } - return tenants, nil + return fp.tenantOp.ListTenants(), nil } func (fp *discovery) GetTenant(ctx context.Context, tenant string) (*config.Tenant, error) { - cfg, err := fp.c.Load() + op, ok := fp.centers[tenant] + if !ok { + return nil, ErrorNoTenant + } + + cfg, err := op.Load(context.Background()) if err != nil { return nil, err } - for _, it := range cfg.Data.Tenants { - if it.Name == tenant { - return it, nil - } - } - return nil, nil + return cfg, nil } -func (fp *discovery) ListListeners(ctx context.Context) ([]*config.Listener, error) { - cfg, err := fp.c.Load() +func (fp *discovery) ListUsers(ctx context.Context, tenant string) (config.Users, error) { + op, ok := fp.centers[tenant] + if !ok { + return nil, ErrorNoTenant + } + + cfg, err := op.Load(context.Background()) if err != nil { return nil, err } - return cfg.Data.Listeners, nil + return cfg.Users, nil +} + +func (fp *discovery) ListListeners(ctx context.Context) []*config.Listener { + return fp.options.Listeners } func (fp *discovery) ListClusters(ctx context.Context, tenant string) ([]string, error) { - cfg, err := fp.c.Load() + op, ok := fp.centers[tenant] + if !ok { + return nil, ErrorNoTenant + } + + cfg, err := op.Load(context.Background()) if err != nil { return nil, err } - clusters := make([]string, 0, len(cfg.Data.DataSourceClusters)) - for _, it := range cfg.Data.DataSourceClusters { - clusters = append(clusters, it.Name) - } + ret := make([]string, 0, 4) - return clusters, nil + for _, it := range cfg.DataSourceClusters { + ret = append(ret, it.Name) + } + return ret, nil } -func (fp *discovery) ListGroups(ctx context.Context, cluster string) ([]string, error) { - bingo, ok := fp.loadCluster(cluster) - if !ok { - return nil, nil +func (fp *discovery) ListGroups(ctx context.Context, tenant, cluster string) ([]string, error) { + bingo, err := fp.loadCluster(tenant, cluster) + if err != nil { + return nil, err } groups := make([]string, 0, len(bingo.Groups)) for _, it := range bingo.Groups { @@ -274,54 +314,90 @@ func (fp *discovery) ListGroups(ctx context.Context, cluster string) ([]string, return groups, nil } -func (fp *discovery) ListNodes(ctx context.Context, cluster, group string) ([]string, error) { - bingo, ok := fp.loadGroup(cluster, group) +func (fp *discovery) ListNodes(ctx context.Context, tenant, cluster, group string) ([]string, error) { + + bingo, ok := fp.loadGroup(tenant, cluster, group) if !ok { return nil, nil } var nodes []string - for _, it := range bingo.Nodes { - nodes = append(nodes, it.Name) + for i := range bingo.Nodes { + nodes = append(nodes, bingo.Nodes[i]) } return nodes, nil } -func (fp *discovery) ListTables(ctx context.Context, cluster string) ([]string, error) { - cfg, err := fp.c.Load() +func (fp *discovery) ListTables(ctx context.Context, tenant, cluster string) ([]string, error) { + op, ok := fp.centers[tenant] + if !ok { + return nil, ErrorNoTenant + } + + cfg, err := op.Load(context.Background()) if err != nil { return nil, err } - var tables []string - for tb := range fp.loadTables(cfg, cluster) { + rule := cfg.ShardingRule + tables := make([]string, 0, 4) + + for i := range rule.Tables { + db, tb, err := parseTable(rule.Tables[i].Name) + if err != nil { + return nil, err + } + if db != cluster { + continue + } + tables = append(tables, tb) } - sort.Strings(tables) + return tables, nil } -func (fp *discovery) GetNode(ctx context.Context, cluster, group, node string) (*config.Node, error) { - bingo, ok := fp.loadGroup(cluster, group) +func (fp *discovery) GetNode(ctx context.Context, tenant, cluster, group, node string) (*config.Node, error) { + + op, ok := fp.centers[tenant] + if !ok { + return nil, ErrorNoTenant + } + + var nodeId string + + bingo, ok := fp.loadGroup(tenant, cluster, group) if !ok { return nil, nil } - for _, it := range bingo.Nodes { - if it.Name == node { - return it, nil + + for i := range bingo.Nodes { + if bingo.Nodes[i] == node { + nodeId = node + break } } - return nil, nil -} -func (fp *discovery) GetTable(ctx context.Context, cluster, tableName string) (*rule.VTable, error) { - cfg, err := fp.c.Load() + if nodeId == "" { + return nil, nil + } + + nodes, err := fp.loadNodes(op) if err != nil { return nil, err } - table, ok := fp.loadTables(cfg, cluster)[tableName] + return nodes[nodeId], nil +} + +func (fp *discovery) GetTable(ctx context.Context, tenant, cluster, tableName string) (*rule.VTable, error) { + op, ok := fp.centers[tenant] + if !ok { + return nil, ErrorNoTenant + } + + table, ok := fp.loadTables(cluster, op)[tableName] if !ok { return nil, nil } @@ -332,6 +408,7 @@ func (fp *discovery) GetTable(ctx context.Context, cluster, tableName string) (* dbFormat, tbFormat string dbBegin, tbBegin int dbEnd, tbEnd int + err error ) if table.Topology != nil { @@ -469,23 +546,37 @@ func (fp *discovery) GetTable(ctx context.Context, cluster, tableName string) (* return &vt, nil } -func (fp *discovery) loadCluster(cluster string) (*config.DataSourceCluster, bool) { - cfg, err := fp.c.Load() +func (fp *discovery) loadCluster(tenant, cluster string) (*config.DataSourceCluster, error) { + op, ok := fp.centers[tenant] + if !ok { + return nil, ErrorNoTenant + } + + cfg, err := op.Load(context.Background()) if err != nil { - return nil, false + return nil, err } - for _, it := range cfg.Data.DataSourceClusters { + for _, it := range cfg.DataSourceClusters { if it.Name == cluster { - return it, true + return it, nil } } - return nil, false + return nil, ErrorNoDataSourceCluster } -func (fp *discovery) loadGroup(cluster, group string) (*config.Group, bool) { - bingo, ok := fp.loadCluster(cluster) - if !ok { +func (fp *discovery) loadNodes(op config.Center) (config.Nodes, error) { + cfg, err := op.Load(context.Background()) + if err != nil { + return nil, err + } + + return cfg.Nodes, nil +} + +func (fp *discovery) loadGroup(tenant, cluster, group string) (*config.Group, bool) { + bingo, err := fp.loadCluster(tenant, cluster) + if err != nil { return nil, false } for _, it := range bingo.Groups { @@ -496,9 +587,14 @@ func (fp *discovery) loadGroup(cluster, group string) (*config.Group, bool) { return nil, false } -func (fp *discovery) loadTables(cfg *config.Configuration, cluster string) map[string]*config.Table { +func (fp *discovery) loadTables(cluster string, op config.Center) map[string]*config.Table { + cfg, err := op.Load(context.Background()) + if err != nil { + return nil + } + var tables map[string]*config.Table - for _, it := range cfg.Data.ShardingRule.Tables { + for _, it := range cfg.ShardingRule.Tables { db, tb, err := parseTable(it.Name) if err != nil { log.Warnf("skip parsing table rule: %v", err) @@ -617,6 +713,7 @@ func parseTable(input string) (db, tbl string, err error) { func NewDiscovery(path string) Discovery { return &discovery{ - path: path, + path: path, + centers: map[string]config.Center{}, } } diff --git a/pkg/boot/discovery_import.go b/pkg/boot/discovery_import.go new file mode 100644 index 00000000..c0a427ac --- /dev/null +++ b/pkg/boot/discovery_import.go @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package boot + +import ( + "context" + "github.com/arana-db/arana/pkg/config" + "github.com/arana-db/arana/pkg/util/log" +) + +func RunImport(importConfPath, configPath string) bool { + bootCfg, err := LoadBootOptions(importConfPath) + if err != nil { + log.Fatalf("load bootstrap config failed: %+v", err) + } + + if err := config.Init(*bootCfg.Config, bootCfg.APIVersion); err != nil { + log.Fatal() + } + + cfg, err := config.Load(configPath) + if err != nil { + log.Fatal("load config from %s failed: %+v", configPath, err) + return false + } + + tenantOp, err := config.NewTenantOperator(config.GetStoreOperate()) + if err != nil { + log.Fatal("build tenant operator failed: %+v", configPath, err) + return false + } + + defer tenantOp.Close() + + for i := range cfg.Data.Tenants { + if err := tenantOp.CreateTenant(cfg.Data.Tenants[i].Name); err != nil { + log.Fatal("create tenant failed: %+v", configPath, err) + return false + } + } + + for i := range cfg.Data.Tenants { + + tenant := cfg.Data.Tenants[i] + + tenant.APIVersion = cfg.APIVersion + tenant.Metadata = cfg.Metadata + + ok := func() bool { + op := config.NewCenter(tenant.Name, config.GetStoreOperate()) + defer op.Close() + + if err := op.Import(context.Background(), tenant); err != nil { + log.Fatalf("persist config to config.store failed: %+v", err) + return false + } + + return true + }() + + if !ok { + return false + } + } + + log.Infof("finish import config into config_center") + return true +} diff --git a/pkg/boot/discovery_test.go b/pkg/boot/discovery_test.go index 67d4fb53..fda85294 100644 --- a/pkg/boot/discovery_test.go +++ b/pkg/boot/discovery_test.go @@ -19,6 +19,7 @@ package boot import ( "context" + "os" "testing" ) @@ -27,10 +28,12 @@ import ( ) import ( + "github.com/arana-db/arana/pkg/constants" "github.com/arana-db/arana/testdata" ) func TestFileProvider(t *testing.T) { + os.Setenv(constants.EnvConfigPath, testdata.Path("fake_config.yaml")) provider := NewDiscovery(testdata.Path("fake_bootstrap.yaml")) err := Boot(context.Background(), provider) @@ -41,25 +44,25 @@ func TestFileProvider(t *testing.T) { assert.NotEmpty(t, clusters, "clusters should not be empty") t.Logf("clusters: %v\n", clusters) - groups, err := provider.ListGroups(context.Background(), clusters[0]) + groups, err := provider.ListGroups(context.Background(), "arana", clusters[0]) assert.NoError(t, err) assert.NotEmpty(t, groups, "groups should not be empty") t.Logf("groups: %v\n", groups) - nodes, err := provider.ListNodes(context.Background(), clusters[0], groups[0]) + nodes, err := provider.ListNodes(context.Background(), "arana", clusters[0], groups[0]) assert.NoError(t, err) assert.NotEmpty(t, nodes, "nodes should not be empty") - node, err := provider.GetNode(context.Background(), clusters[0], groups[0], nodes[0]) + node, err := provider.GetNode(context.Background(), "arana", clusters[0], groups[0], nodes[0]) assert.NoError(t, err) t.Logf("node: %s\n", node) - tables, err := provider.ListTables(context.Background(), clusters[0]) + tables, err := provider.ListTables(context.Background(), "arana", clusters[0]) assert.NoError(t, err) assert.NotEmpty(t, tables, "tables should not be empty") t.Logf("tables: %v\n", tables) - table, err := provider.GetTable(context.Background(), clusters[0], tables[0]) + table, err := provider.GetTable(context.Background(), "arana", clusters[0], tables[0]) assert.NoError(t, err) assert.True(t, table.AllowFullScan()) t.Logf("vtable: %v\n", table) diff --git a/pkg/boot/discovery_watch.go b/pkg/boot/discovery_watch.go new file mode 100644 index 00000000..d54a5c50 --- /dev/null +++ b/pkg/boot/discovery_watch.go @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package boot + +import ( + "context" + "time" +) + +import ( + "github.com/arana-db/arana/pkg/config" +) + +func (fp *discovery) WatchTenants(ctx context.Context) (<-chan config.TenantsEvent, context.CancelFunc, error) { + ch := make(chan config.TenantsEvent) + + cancel := fp.tenantOp.Subscribe(ctx, func(e config.Event) { + ch <- e.(config.TenantsEvent) + }) + + return ch, wrapWatchCancel(cancel, func() { + close(ch) + }), nil +} + +func (fp *discovery) WatchNodes(ctx context.Context, tenant string) (<-chan config.NodesEvent, context.CancelFunc, error) { + op, ok := fp.centers[tenant] + if !ok { + return nil, nil, ErrorNoTenant + } + + ch := make(chan config.NodesEvent) + + cancel := op.Subscribe(ctx, config.EventTypeNodes, func(e config.Event) { + ch <- e.(config.NodesEvent) + }) + + return ch, wrapWatchCancel(cancel, func() { + close(ch) + }), nil +} + +func (fp *discovery) WatchUsers(ctx context.Context, tenant string) (<-chan config.UsersEvent, context.CancelFunc, error) { + op, ok := fp.centers[tenant] + if !ok { + return nil, nil, ErrorNoTenant + } + + ch := make(chan config.UsersEvent) + + cancel := op.Subscribe(ctx, config.EventTypeUsers, func(e config.Event) { + ch <- e.(config.UsersEvent) + }) + + return ch, wrapWatchCancel(cancel, func() { + close(ch) + }), nil +} + +func (fp *discovery) WatchClusters(ctx context.Context, tenant string) (<-chan config.ClustersEvent, context.CancelFunc, error) { + op, ok := fp.centers[tenant] + if !ok { + return nil, nil, ErrorNoTenant + } + + ch := make(chan config.ClustersEvent) + + cancel := op.Subscribe(ctx, config.EventTypeClusters, func(e config.Event) { + ch <- e.(config.ClustersEvent) + }) + + return ch, wrapWatchCancel(cancel, func() { + close(ch) + }), nil +} + +func (fp *discovery) WatchShardingRule(ctx context.Context, tenant string) (<-chan config.ShardingRuleEvent, context.CancelFunc, error) { + op, ok := fp.centers[tenant] + if !ok { + return nil, nil, ErrorNoTenant + } + + ch := make(chan config.ShardingRuleEvent) + + cancel := op.Subscribe(ctx, config.EventTypeShardingRule, func(e config.Event) { + ch <- e.(config.ShardingRuleEvent) + }) + + return ch, wrapWatchCancel(cancel, func() { + close(ch) + }), nil +} + +func (fp *discovery) WatchShadowRule(ctx context.Context, tenant string) (<-chan config.ShadowRuleEvent, context.CancelFunc, error) { + op, ok := fp.centers[tenant] + if !ok { + return nil, nil, ErrorNoTenant + } + + ch := make(chan config.ShadowRuleEvent) + + cancel := op.Subscribe(ctx, config.EventTypeShadowRule, func(e config.Event) { + ch <- e.(config.ShadowRuleEvent) + }) + + return ch, wrapWatchCancel(cancel, func() { + close(ch) + }), nil +} + +func wrapWatchCancel(cancel context.CancelFunc, closeChan func()) context.CancelFunc { + return func() { + timer := time.NewTimer(100 * time.Millisecond) + defer timer.Stop() + cancel() + <-timer.C + closeChan() + } +} diff --git a/pkg/boot/options.go b/pkg/boot/options.go index 6bcf8f09..5437b218 100644 --- a/pkg/boot/options.go +++ b/pkg/boot/options.go @@ -21,6 +21,10 @@ import ( "github.com/arana-db/arana/pkg/config" ) -type BootOptions struct { - Config *config.ConfigOptions `yaml:"config"` -} +type ( + BootOptions struct { + config.Spec `yaml:",inline"` + Config *config.Options `yaml:"config"` + Listeners []*config.Listener `validate:"required,dive" yaml:"listeners" json:"listeners"` + } +) diff --git a/pkg/boot/proto.go b/pkg/boot/proto.go index d0a6cc71..38fc2e7d 100644 --- a/pkg/boot/proto.go +++ b/pkg/boot/proto.go @@ -77,29 +77,38 @@ type ConfigProvider interface { // GetTenant returns the tenant info GetTenant(ctx context.Context, tenant string) (*config.Tenant, error) + // ListUsers returns the user list + ListUsers(ctx context.Context, tenant string) (config.Users, error) + // ListClusters lists the cluster names. ListClusters(ctx context.Context, tenant string) ([]string, error) // GetDataSourceCluster returns the dataSourceCluster object - GetDataSourceCluster(ctx context.Context, cluster string) (*config.DataSourceCluster, error) + GetDataSourceCluster(ctx context.Context, tenant, cluster string) (*config.DataSourceCluster, error) + + // GetGroup returns the cluster info + GetGroup(ctx context.Context, tenant, cluster, group string) (*config.Group, error) // GetCluster returns the cluster info GetCluster(ctx context.Context, tenant, cluster string) (*Cluster, error) // ListGroups lists the group names. - ListGroups(ctx context.Context, cluster string) ([]string, error) + ListGroups(ctx context.Context, tenant, cluster string) ([]string, error) // ListNodes lists the node names. - ListNodes(ctx context.Context, cluster, group string) ([]string, error) + ListNodes(ctx context.Context, tenant, cluster, group string) ([]string, error) // GetNode returns the node info. - GetNode(ctx context.Context, cluster, group, node string) (*config.Node, error) + GetNode(ctx context.Context, tenant, cluster, group, node string) (*config.Node, error) // ListTables lists the table names. - ListTables(ctx context.Context, cluster string) ([]string, error) + ListTables(ctx context.Context, tenant, cluster string) ([]string, error) // GetTable returns the table info. - GetTable(ctx context.Context, cluster, table string) (*rule.VTable, error) + GetTable(ctx context.Context, tenant, cluster, table string) (*rule.VTable, error) + + // Import import config into config_center + Import(ctx context.Context, info *config.Tenant) error } // ConfigUpdater represents the mutations of configurations. @@ -166,13 +175,38 @@ type ConfigUpdater interface { RemoveTable(ctx context.Context, tenant, cluster, table string) error } +// ConfigWatcher listens for changes in related configuration +type ConfigWatcher interface { + // WatchTenants watches tenant change + // return <-chan config.TenantsEvent: listen to this chan to get related event + // return context.CancelFunc: used to cancel this monitoring, after execution, chan(<-chan config.TenantsEvent) will be closed + WatchTenants(ctx context.Context) (<-chan config.TenantsEvent, context.CancelFunc, error) + // WatchNodes watches nodes change + // return <-chan config.TenantsEvent: listen to this chan to get related event + // return context.CancelFunc: used to cancel this monitoring, after execution, chan(<-chan config.TenantsEvent) will be closed + WatchNodes(ctx context.Context, tenant string) (<-chan config.NodesEvent, context.CancelFunc, error) + // WatchUsers watches users change + // return <-chan config.TenantsEvent: listen to this chan to get related event + // return context.CancelFunc: used to cancel this monitoring, after execution, chan(<-chan config.TenantsEvent) will be closed + WatchUsers(ctx context.Context, tenant string) (<-chan config.UsersEvent, context.CancelFunc, error) + // WatchClusters watches cluster change + // return <-chan config.TenantsEvent: listen to this chan to get related event + // return context.CancelFunc: used to cancel this monitoring, after execution, chan(<-chan config.TenantsEvent) will be closed + WatchClusters(ctx context.Context, tenant string) (<-chan config.ClustersEvent, context.CancelFunc, error) + // WatchShardingRule watches sharding rule change + // return <-chan config.TenantsEvent: listen to this chan to get related event + // return context.CancelFunc: used to cancel this monitoring, after execution, chan(<-chan config.TenantsEvent) will be closed + WatchShardingRule(ctx context.Context, tenant string) (<-chan config.ShardingRuleEvent, context.CancelFunc, error) + // WatchShadowRule watches shadow rule change + // return <-chan config.TenantsEvent: listen to this chan to get related event + // return context.CancelFunc: used to cancel this monitoring, after execution, chan(<-chan config.TenantsEvent) will be closed + WatchShadowRule(ctx context.Context, tenant string) (<-chan config.ShadowRuleEvent, context.CancelFunc, error) +} + type Discovery interface { ConfigProvider // ListListeners lists the listener names - ListListeners(ctx context.Context) ([]*config.Listener, error) - - // GetConfigCenter returns the config center. - GetConfigCenter() *config.Center + ListListeners(ctx context.Context) []*config.Listener // Init initializes discovery with context Init(ctx context.Context) error diff --git a/pkg/config/api.go b/pkg/config/api.go index 5fdfa60b..66d45a65 100644 --- a/pkg/config/api.go +++ b/pkg/config/api.go @@ -18,9 +18,11 @@ package config import ( - "errors" + "context" "fmt" "io" + "path/filepath" + "sync" ) type ( @@ -32,14 +34,25 @@ type ( ) const ( - DefaultConfigPath PathKey = "/arana-db/config" - DefaultConfigMetadataPath PathKey = "/arana-db/config/metadata" - DefaultConfigDataListenersPath PathKey = "/arana-db/config/data/listeners" - DefaultConfigDataSourceClustersPath PathKey = "/arana-db/config/data/dataSourceClusters" - DefaultConfigDataShardingRulePath PathKey = "/arana-db/config/data/shardingRule" - DefaultConfigDataTenantsPath PathKey = "/arana-db/config/data/tenants" + _rootPathTemp = "/%s/%s/" ) +var ( + DefaultRootPath PathKey + DefaultTenantsPath PathKey +) + +func initPath(root, version string) { + if root == "" { + root = "arana-db" + } + if version == "" { + version = "1.0" + } + DefaultRootPath = PathKey(fmt.Sprintf(_rootPathTemp, root, version)) + DefaultTenantsPath = PathKey(filepath.Join(string(DefaultRootPath), "tenants")) +} + const ( Http ProtocolType = iota MySQL @@ -52,54 +65,80 @@ const ( ) var ( - slots = make(map[string]StoreOperate) - storeOperate StoreOperate -) - -func GetStoreOperate() (StoreOperate, error) { - if storeOperate != nil { - return storeOperate, nil - } - - return nil, errors.New("StoreOperate not init") -} - -func Init(name string, options map[string]interface{}) error { - s, exist := slots[name] - if !exist { - return fmt.Errorf("StoreOperate solt=[%s] not exist", name) - } + slots = make(map[string]StoreOperator) - storeOperate = s + storeOperate StoreOperator - return storeOperate.Init(options) -} + once sync.Once +) // Register register store plugin -func Register(s StoreOperate) { +func Register(s StoreOperator) { if _, ok := slots[s.Name()]; ok { - panic(fmt.Errorf("StoreOperate=[%s] already exist", s.Name())) + panic(fmt.Errorf("StoreOperator=[%s] already exist", s.Name())) } slots[s.Name()] = s } -// StoreOperate config storage related plugins -type StoreOperate interface { - io.Closer +type ( + callback func(e Event) - // Init plugin initialization - Init(options map[string]interface{}) error + SubscribeResult struct { + EventChan <-chan Event + Cancel context.CancelFunc + } - // Save save a configuration data - Save(key PathKey, val []byte) error + subscriber struct { + watch callback + ctx context.Context + } - // Get get a configuration - Get(key PathKey) ([]byte, error) + Options struct { + StoreName string `yaml:"name"` + RootPath string `yaml:"root_path"` + Options map[string]interface{} `yaml:"options"` + } - // Watch Monitor changes of the key - Watch(key PathKey) (<-chan []byte, error) + //TenantOperator actions specific to tenant spaces + TenantOperator interface { + io.Closer + //ListTenants lists all tenants + ListTenants() []string + //CreateTenant creates tenant + CreateTenant(string) error + //RemoveTenant removes tenant + RemoveTenant(string) error + //Subscribe subscribes tenants change + Subscribe(ctx context.Context, c callback) context.CancelFunc + } - // Name plugin name - Name() string -} + // Center Configuration center for each tenant, tenant-level isolation + Center interface { + io.Closer + // Load loads the full Tenant configuration, the first time it will be loaded remotely, + // and then it will be directly assembled from the cache layer + Load(ctx context.Context) (*Tenant, error) + // Import imports the configuration information of a tenant + Import(ctx context.Context, cfg *Tenant) error + // Subscribe subscribes to all changes of an event by EventType + Subscribe(ctx context.Context, et EventType, c callback) context.CancelFunc + // Tenant tenant info + Tenant() string + } + + // StoreOperator config storage related plugins + StoreOperator interface { + io.Closer + // Init plugin initialization + Init(options map[string]interface{}) error + // Save save a configuration data + Save(key PathKey, val []byte) error + // Get get a configuration + Get(key PathKey) ([]byte, error) + // Watch Monitor changes of the key + Watch(key PathKey) (<-chan []byte, error) + // Name plugin name + Name() string + } +) diff --git a/pkg/config/api_test.go b/pkg/config/api_test.go index b193df11..e5fc07f0 100644 --- a/pkg/config/api_test.go +++ b/pkg/config/api_test.go @@ -30,46 +30,27 @@ import ( import ( "github.com/arana-db/arana/pkg/config" + _ "github.com/arana-db/arana/pkg/config/etcd" + _ "github.com/arana-db/arana/pkg/config/file" + _ "github.com/arana-db/arana/pkg/config/nacos" "github.com/arana-db/arana/testdata" ) -func TestGetStoreOperate(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - // mockStore := NewMockStoreOperate(ctrl) - tests := []struct { - name string - want config.StoreOperate - wantErr assert.ErrorAssertionFunc - }{ - {"GetStoreOperate_1", nil, assert.Error}, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := config.GetStoreOperate() - if !tt.wantErr(t, err, fmt.Sprintf("GetStoreOperate()")) { - return - } - assert.Equalf(t, tt.want, got, "GetStoreOperate()") - }) - } -} - func TestInit(t *testing.T) { type args struct { - name string - options map[string]interface{} + version string + options config.Options } tests := []struct { name string args args wantErr assert.ErrorAssertionFunc }{ - {"Init_1", args{"file", nil}, assert.Error}, + {"Init_1", args{"file", config.Options{}}, assert.Error}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - tt.wantErr(t, config.Init(tt.args.name, tt.args.options), fmt.Sprintf("Init(%v, %v)", tt.args.name, tt.args.options)) + tt.wantErr(t, config.Init(tt.args.options, tt.args.version), fmt.Sprintf("Init(%v, %v)", tt.args.options, tt.args.version)) }) } } @@ -80,7 +61,7 @@ func TestRegister(t *testing.T) { mockStore := testdata.NewMockStoreOperate(ctrl) mockStore.EXPECT().Name().Times(2).Return("nacos") type args struct { - s config.StoreOperate + s config.StoreOperator } tests := []struct { name string @@ -113,20 +94,23 @@ func Test_api(t *testing.T) { } func Test_Init(t *testing.T) { - options := make(map[string]interface{}, 0) + options := config.Options{ + StoreName: "fake", + RootPath: "", + Options: nil, + } ctrl := gomock.NewController(t) defer ctrl.Finish() mockFileStore := testdata.NewMockStoreOperate(ctrl) mockFileStore.EXPECT().Name().Times(2).Return("fake") mockFileStore.EXPECT().Init(options).Return(nil) - err := config.Init("fake", options) + err := config.Init(options, "fake") assert.Error(t, err) config.Register(mockFileStore) - err = config.Init("fake", options) + err = config.Init(options, "fake") assert.NoError(t, err) - store, err := config.GetStoreOperate() - assert.NoError(t, err) + store := config.GetStoreOperate() assert.NotNil(t, store) } diff --git a/pkg/config/config.go b/pkg/config/config.go index af6aa238..f1ea4f8b 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -23,175 +23,429 @@ package config import ( "context" "encoding/json" - "errors" "fmt" + "path/filepath" "sync" "sync/atomic" ) import ( + "github.com/pkg/errors" + "github.com/tidwall/gjson" "gopkg.in/yaml.v3" ) import ( - "github.com/arana-db/arana/pkg/util/env" "github.com/arana-db/arana/pkg/util/log" ) -var ( - ConfigKeyMapping map[PathKey]string = map[PathKey]string{ - DefaultConfigMetadataPath: "metadata", - DefaultConfigDataTenantsPath: "data.tenants", - DefaultConfigDataListenersPath: "data.listeners", - DefaultConfigDataSourceClustersPath: "data.clusters", - DefaultConfigDataShardingRulePath: "data.sharding_rule", +type PathInfo struct { + DefaultConfigSpecPath PathKey + DefaultTenantBaseConfigPath PathKey + DefaultConfigDataNodesPath PathKey + DefaultConfigDataUsersPath PathKey + DefaultConfigDataSourceClustersPath PathKey + DefaultConfigDataShardingRulePath PathKey + DefaultConfigDataShadowRulePath PathKey + + ConfigKeyMapping map[PathKey]string + ConfigEventMapping map[PathKey]EventType + BuildEventMapping map[EventType]func(pre, cur *Tenant) Event + ConfigValSupplier map[PathKey]func(cfg *Tenant) interface{} +} + +func NewPathInfo(tenant string) *PathInfo { + + p := &PathInfo{} + + p.DefaultTenantBaseConfigPath = PathKey(filepath.Join(string(DefaultRootPath), fmt.Sprintf("tenants/%s", tenant))) + p.DefaultConfigSpecPath = PathKey(filepath.Join(string(p.DefaultTenantBaseConfigPath), "spec")) + p.DefaultConfigDataNodesPath = PathKey(filepath.Join(string(p.DefaultTenantBaseConfigPath), "nodes")) + p.DefaultConfigDataUsersPath = PathKey(filepath.Join(string(p.DefaultTenantBaseConfigPath), "users")) + p.DefaultConfigDataSourceClustersPath = PathKey(filepath.Join(string(p.DefaultTenantBaseConfigPath), "dataSourceClusters")) + p.DefaultConfigDataShardingRulePath = PathKey(filepath.Join(string(p.DefaultConfigDataSourceClustersPath), "shardingRule")) + p.DefaultConfigDataShadowRulePath = PathKey(filepath.Join(string(p.DefaultConfigDataSourceClustersPath), "shadowRule")) + + p.ConfigEventMapping = map[PathKey]EventType{ + p.DefaultConfigDataUsersPath: EventTypeUsers, + p.DefaultConfigDataNodesPath: EventTypeNodes, + p.DefaultConfigDataSourceClustersPath: EventTypeClusters, + p.DefaultConfigDataShardingRulePath: EventTypeShardingRule, + p.DefaultConfigDataShadowRulePath: EventTypeShadowRule, } - _configValSupplier map[PathKey]func(cfg *Configuration) interface{} = map[PathKey]func(cfg *Configuration) interface{}{ - DefaultConfigMetadataPath: func(cfg *Configuration) interface{} { - return &cfg.Metadata + p.ConfigValSupplier = map[PathKey]func(cfg *Tenant) interface{}{ + p.DefaultConfigSpecPath: func(cfg *Tenant) interface{} { + return &cfg.Spec }, - DefaultConfigDataTenantsPath: func(cfg *Configuration) interface{} { - return &cfg.Data.Tenants + p.DefaultConfigDataUsersPath: func(cfg *Tenant) interface{} { + return &cfg.Users }, - DefaultConfigDataListenersPath: func(cfg *Configuration) interface{} { - return &cfg.Data.Listeners + p.DefaultConfigDataSourceClustersPath: func(cfg *Tenant) interface{} { + return &cfg.DataSourceClusters }, - DefaultConfigDataSourceClustersPath: func(cfg *Configuration) interface{} { - return &cfg.Data.DataSourceClusters + p.DefaultConfigDataNodesPath: func(cfg *Tenant) interface{} { + return &cfg.Nodes }, - DefaultConfigDataShardingRulePath: func(cfg *Configuration) interface{} { - return &cfg.Data.ShardingRule + p.DefaultConfigDataShardingRulePath: func(cfg *Tenant) interface{} { + return cfg.ShardingRule + }, + p.DefaultConfigDataShadowRulePath: func(cfg *Tenant) interface{} { + return cfg.ShadowRule }, } -) -type Changeable interface { - Name() string - Sign() string + p.ConfigKeyMapping = map[PathKey]string{ + p.DefaultConfigSpecPath: "spec", + p.DefaultConfigDataUsersPath: "users", + p.DefaultConfigDataSourceClustersPath: "clusters", + p.DefaultConfigDataShardingRulePath: "sharding_rule", + p.DefaultConfigDataNodesPath: "nodes", + p.DefaultConfigDataShadowRulePath: "shadow_rule", + } + + p.BuildEventMapping = map[EventType]func(pre *Tenant, cur *Tenant) Event{ + EventTypeNodes: func(pre, cur *Tenant) Event { + return Nodes(cur.Nodes).Diff(pre.Nodes) + }, + EventTypeUsers: func(pre, cur *Tenant) Event { + return Users(cur.Users).Diff(pre.Users) + }, + EventTypeClusters: func(pre, cur *Tenant) Event { + return Clusters(cur.DataSourceClusters).Diff(pre.DataSourceClusters) + }, + EventTypeShardingRule: func(pre, cur *Tenant) Event { + return cur.ShardingRule.Diff(pre.ShardingRule) + }, + EventTypeShadowRule: func(pre, cur *Tenant) Event { + return cur.ShadowRule.Diff(pre.ShadowRule) + }, + } + + return p } -type Observer func() +func NewTenantOperator(op StoreOperator) (TenantOperator, error) { + tenantOp := &tenantOperate{ + op: op, + tenants: map[string]struct{}{}, + cancels: []context.CancelFunc{}, + observers: &observerBucket{observers: map[EventType][]*subscriber{}}, + } + + if err := tenantOp.init(); err != nil { + return nil, err + } -type ConfigOptions struct { - StoreName string `yaml:"name"` - Options map[string]interface{} `yaml:"options"` + return tenantOp, nil } -type Center struct { - initialize int32 - storeOperate StoreOperate - confHolder atomic.Value // 里面持有了最新的 *Configuration 对象 - lock sync.RWMutex - observers []Observer - watchCancels []context.CancelFunc +type tenantOperate struct { + op StoreOperator + lock sync.RWMutex + + tenants map[string]struct{} + observers *observerBucket + + cancels []context.CancelFunc } -func NewCenter(options ConfigOptions) (*Center, error) { - if err := Init(options.StoreName, options.Options); err != nil { - return nil, err +func (tp *tenantOperate) Subscribe(ctx context.Context, c callback) context.CancelFunc { + return tp.observers.add(EventTypeTenants, c) +} + +func (tp *tenantOperate) init() error { + tp.lock.Lock() + defer tp.lock.Unlock() + + if len(tp.tenants) == 0 { + val, err := tp.op.Get(DefaultTenantsPath) + if err != nil { + return err + } + + tenants := make([]string, 0, 4) + if err := yaml.Unmarshal(val, &tenants); err != nil { + return err + } + + for i := range tenants { + tp.tenants[tenants[i]] = struct{}{} + } } - operate, err := GetStoreOperate() + ctx, cancel := context.WithCancel(context.Background()) + tp.cancels = append(tp.cancels, cancel) + + return tp.watchTenants(ctx) +} + +func (tp *tenantOperate) watchTenants(ctx context.Context) error { + ch, err := tp.op.Watch(DefaultTenantsPath) if err != nil { - return nil, err + return err } - return &Center{ - storeOperate: operate, - observers: make([]Observer, 0, 2), - }, nil + go func(ctx context.Context) { + consumer := func(ret []byte) { + tenants := make([]string, 0, 4) + if err := yaml.Unmarshal(ret, &tenants); err != nil { + log.Errorf("marshal tenants content : %v", err) + return + } + + event := Tenants(tenants).Diff(tp.ListTenants()) + log.Infof("receive tenants change event : %#v", event) + tp.observers.notify(EventTypeTenants, event) + + tp.lock.Lock() + defer tp.lock.Unlock() + + tp.tenants = map[string]struct{}{} + for i := range tenants { + tp.tenants[tenants[i]] = struct{}{} + } + } + + for { + select { + case ret := <-ch: + consumer(ret) + case <-ctx.Done(): + log.Infof("stop watch : %s", DefaultTenantsPath) + } + } + }(ctx) + + return nil } -func (c *Center) GetStoreOperate() StoreOperate { - return c.storeOperate +func (tp *tenantOperate) ListTenants() []string { + tp.lock.RLock() + defer tp.lock.RUnlock() + + ret := make([]string, 0, len(tp.tenants)) + + for i := range tp.tenants { + ret = append(ret, i) + } + + return ret } -func (c *Center) Close() error { - if err := c.storeOperate.Close(); err != nil { +func (tp *tenantOperate) CreateTenant(name string) error { + tp.lock.Lock() + defer tp.lock.Unlock() + + if _, ok := tp.tenants[name]; ok { + return nil + } + + tp.tenants[name] = struct{}{} + ret := make([]string, 0, len(tp.tenants)) + for i := range tp.tenants { + ret = append(ret, i) + } + + data, err := yaml.Marshal(ret) + if err != nil { return err } - for i := range c.watchCancels { - c.watchCancels[i]() + if err := tp.op.Save(DefaultTenantsPath, data); err != nil { + return errors.Wrap(err, "create tenant name") + } + + //need to insert the relevant configuration data under the relevant tenant + tenantPathInfo := NewPathInfo(name) + for i := range tenantPathInfo.ConfigKeyMapping { + if err := tp.op.Save(i, []byte("")); err != nil { + return errors.Wrap(err, fmt.Sprintf("create tenant resource : %s", i)) + } } return nil } -func (c *Center) Load() (*Configuration, error) { - return c.LoadContext(context.Background()) +func (tp *tenantOperate) RemoveTenant(name string) error { + tp.lock.Lock() + defer tp.lock.Unlock() + + delete(tp.tenants, name) + + ret := make([]string, 0, len(tp.tenants)) + for i := range tp.tenants { + ret = append(ret, i) + } + + data, err := yaml.Marshal(ret) + if err != nil { + return err + } + + return tp.op.Save(DefaultTenantsPath, data) +} + +func (tp *tenantOperate) Close() error { + for i := range tp.cancels { + tp.cancels[i]() + } + return nil } -func (c *Center) LoadContext(ctx context.Context) (*Configuration, error) { - val := c.confHolder.Load() - if val == nil { - cfg, err := c.loadFromStore(ctx) - if err != nil { +type observerBucket struct { + lock sync.RWMutex + observers map[EventType][]*subscriber +} + +func (b *observerBucket) notify(et EventType, val Event) { + b.lock.RLock() + defer b.lock.RUnlock() + + v := b.observers[et] + for i := range v { + item := v[i] + select { + case <-item.ctx.Done(): + default: + item.watch(val) + } + } +} + +func (b *observerBucket) add(et EventType, f callback) context.CancelFunc { + b.lock.Lock() + defer b.lock.Unlock() + + if _, ok := b.observers[et]; !ok { + b.observers[et] = make([]*subscriber, 0, 4) + } + + ctx, cancel := context.WithCancel(context.Background()) + + v := b.observers[et] + v = append(v, &subscriber{ + watch: f, + ctx: ctx, + }) + + b.observers[et] = v + + return cancel +} + +type center struct { + tenant string + initialize int32 + + storeOperate StoreOperator + pathInfo *PathInfo + holders map[PathKey]*atomic.Value + + observers *observerBucket + watchCancels []context.CancelFunc +} + +func NewCenter(tenant string, op StoreOperator) Center { + + p := NewPathInfo(tenant) + + holders := map[PathKey]*atomic.Value{} + for k := range p.ConfigKeyMapping { + holders[k] = &atomic.Value{} + holders[k].Store(NewEmptyTenant()) + } + + return ¢er{ + pathInfo: p, + tenant: tenant, + holders: holders, + storeOperate: op, + observers: &observerBucket{observers: map[EventType][]*subscriber{}}, + } +} + +func (c *center) Close() error { + for i := range c.watchCancels { + c.watchCancels[i]() + } + return nil +} + +func (c *center) Load(ctx context.Context) (*Tenant, error) { + if atomic.CompareAndSwapInt32(&c.initialize, 0, 1) { + if err := c.loadFromStore(ctx); err != nil { return nil, err } - c.confHolder.Store(cfg) - out, _ := yaml.Marshal(cfg) - if env.IsDevelopEnvironment() { - log.Infof("load configuration:\n%s", string(out)) + if err := c.watchFromStore(); err != nil { + return nil, err } } - val = c.confHolder.Load() + return c.compositeConfiguration(), nil +} + +func (c *center) compositeConfiguration() *Tenant { + conf := &Tenant{} + + if val := c.holders[c.pathInfo.DefaultConfigDataUsersPath].Load(); val != nil { + conf.Users = val.(*Tenant).Users + } + if val := c.holders[c.pathInfo.DefaultConfigDataNodesPath].Load(); val != nil { + conf.Nodes = val.(*Tenant).Nodes + } + if val := c.holders[c.pathInfo.DefaultConfigDataSourceClustersPath].Load(); val != nil { + conf.DataSourceClusters = val.(*Tenant).DataSourceClusters + } + if val := c.holders[c.pathInfo.DefaultConfigDataShardingRulePath].Load(); val != nil { + conf.ShardingRule = val.(*Tenant).ShardingRule + } + if val := c.holders[c.pathInfo.DefaultConfigDataShadowRulePath].Load(); val != nil { + conf.ShadowRule = val.(*Tenant).ShadowRule + } - return val.(*Configuration), nil + if conf.Empty() { + return nil + } + return conf } -func (c *Center) ImportConfiguration(cfg *Configuration) error { - c.confHolder.Store(cfg) - return c.Persist() +func (c *center) Import(ctx context.Context, cfg *Tenant) error { + return c.doPersist(ctx, cfg) } -func (c *Center) loadFromStore(ctx context.Context) (*Configuration, error) { +func (c *center) loadFromStore(ctx context.Context) error { operate := c.storeOperate - cfg := &Configuration{ - Metadata: make(map[string]interface{}), - Data: &Data{ - Listeners: make([]*Listener, 0), - Tenants: make([]*Tenant, 0), - DataSourceClusters: make([]*DataSourceCluster, 0), - ShardingRule: &ShardingRule{}, - }, - } - - for k := range ConfigKeyMapping { + for k := range c.pathInfo.ConfigKeyMapping { val, err := operate.Get(k) if err != nil { - return nil, err + return err } - supplier, ok := _configValSupplier[k] - + holder := c.holders[k] + supplier, ok := c.pathInfo.ConfigValSupplier[k] if !ok { - return nil, fmt.Errorf("%s not register val supplier", k) + return fmt.Errorf("%s not register val supplier", k) } if len(val) != 0 { - if err := json.Unmarshal(val, supplier(cfg)); err != nil { - return nil, err + exp := supplier(holder.Load().(*Tenant)) + if err := yaml.Unmarshal(val, exp); err != nil { + return err } } } - return cfg, nil + return nil } -func (c *Center) watchFromStore() error { - if !atomic.CompareAndSwapInt32(&c.initialize, 0, 1) { - return nil - } - - cancels := make([]context.CancelFunc, 0, len(ConfigKeyMapping)) +func (c *center) watchFromStore() error { + cancels := make([]context.CancelFunc, 0, len(c.pathInfo.ConfigKeyMapping)) - for k := range ConfigKeyMapping { + for k := range c.pathInfo.ConfigKeyMapping { ctx, cancel := context.WithCancel(context.Background()) cancels = append(cancels, cancel) ch, err := c.storeOperate.Watch(k) @@ -205,26 +459,31 @@ func (c *Center) watchFromStore() error { return nil } -func (c *Center) watchKey(ctx context.Context, key PathKey, ch <-chan []byte) { +func (c *center) watchKey(ctx context.Context, key PathKey, ch <-chan []byte) { consumer := func(ret []byte) { - c.lock.Lock() - defer c.lock.Unlock() - - supplier, ok := _configValSupplier[key] + supplier, ok := c.pathInfo.ConfigValSupplier[key] if !ok { log.Errorf("%s not register val supplier", key) return } + if len(ret) == 0 { + log.Errorf("%s receive empty content, ignore", key) + return + } - cfg := c.confHolder.Load().(*Configuration) - - if len(ret) != 0 { - if err := json.Unmarshal(ret, supplier(cfg)); err != nil { - log.Errorf("", err) - } + cur := NewEmptyTenant() + if err := yaml.Unmarshal(ret, supplier(cur)); err != nil { + log.Errorf("%s marshal new content : %v", key, err) + return } - c.confHolder.Store(cfg) + pre := c.holders[key].Load().(*Tenant) + et := c.pathInfo.ConfigEventMapping[key] + event := c.pathInfo.BuildEventMapping[et](pre, cur) + log.Infof("%s receive change event : %#v", key, event) + + c.observers.notify(et, event) + c.holders[key].Store(cur) } for { @@ -237,27 +496,54 @@ func (c *Center) watchKey(ctx context.Context, key PathKey, ch <-chan []byte) { } } -func (c *Center) Persist() error { - return c.PersistContext(context.Background()) +func (c *center) PersistContext(ctx context.Context) error { + return c.doPersist(ctx, c.compositeConfiguration()) } -func (c *Center) PersistContext(ctx context.Context) error { - val := c.confHolder.Load() - if val == nil { - return errors.New("ConfHolder.load is nil") - } - - conf := val.(*Configuration) +func (c *center) doPersist(ctx context.Context, conf *Tenant) error { configJson, err := json.Marshal(conf) if err != nil { return fmt.Errorf("config json.marshal failed %v err:", err) } - for k, v := range ConfigKeyMapping { - if err := c.storeOperate.Save(k, []byte(gjson.GetBytes(configJson, v).String())); err != nil { + for k, v := range c.pathInfo.ConfigKeyMapping { + + ret, err := JSONToYAML(gjson.GetBytes(configJson, v).String()) + if err != nil { + return err + } + + if err := c.storeOperate.Save(k, ret); err != nil { return err } } return nil } + +//Subscribe +func (c *center) Subscribe(ctx context.Context, et EventType, f callback) context.CancelFunc { + + return c.observers.add(et, f) +} + +func (c *center) Tenant() string { + return c.tenant +} + +func JSONToYAML(j string) ([]byte, error) { + // Convert the JSON to an object. + var jsonObj interface{} + // We are using yaml.Unmarshal here (instead of json.Unmarshal) because the + // Go JSON library doesn't try to pick the right number type (int, float, + // etc.) when unmarshalling to interface{}, it just picks float64 + // universally. go-yaml does go through the effort of picking the right + // number type, so we can preserve number type throughout this process. + err := yaml.Unmarshal([]byte(j), &jsonObj) + if err != nil { + return nil, err + } + + // Marshal this object into YAML. + return yaml.Marshal(jsonObj) +} diff --git a/pkg/config/default.go b/pkg/config/default.go new file mode 100644 index 00000000..e9ffe1d3 --- /dev/null +++ b/pkg/config/default.go @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package config + +import ( + "github.com/pkg/errors" +) + +import ( + "github.com/arana-db/arana/pkg/util/log" +) + +var ( + ErrorNoStoreOperate = errors.New("no store operate") +) + +func GetStoreOperate() StoreOperator { + return storeOperate +} + +func Init(options Options, version string) error { + initPath(options.RootPath, version) + + once.Do(func() { + op, ok := slots[options.StoreName] + if !ok { + log.Panic(ErrorNoStoreOperate) + } + if err := op.Init(options.Options); err != nil { + log.Panic(err) + } + log.Infof("[StoreOperate] use plugin : %s", options.StoreName) + storeOperate = op + }) + return nil +} diff --git a/pkg/config/diff.go b/pkg/config/diff.go new file mode 100644 index 00000000..fb47ad39 --- /dev/null +++ b/pkg/config/diff.go @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package config + +import ( + "reflect" +) + +func (t Tenants) Diff(old Tenants) *TenantsEvent { + addTenants := make([]string, 0, 4) + deleteTenants := make([]string, 0, 4) + + newTmp := map[string]struct{}{} + oldTmp := map[string]struct{}{} + + for i := range t { + newTmp[t[i]] = struct{}{} + } + + for i := range old { + oldTmp[old[i]] = struct{}{} + } + + for i := range newTmp { + if _, ok := oldTmp[i]; !ok { + addTenants = append(addTenants, i) + } + } + + for i := range oldTmp { + if _, ok := newTmp[i]; !ok { + deleteTenants = append(deleteTenants, i) + } + } + + return &TenantsEvent{ + AddTenants: addTenants, + DeleteTenants: deleteTenants, + } +} + +func (n Nodes) Diff(old Nodes) *NodesEvent { + addNodes := make([]*Node, 0, 4) + updateNodes := make([]*Node, 0, 4) + deleteNodes := make([]*Node, 0, 4) + + for i := range n { + if _, ok := old[i]; !ok { + addNodes = append(addNodes, n[i]) + } + } + + for i := range old { + val, ok := n[old[i].Name] + if !ok { + deleteNodes = append(deleteNodes, old[i]) + continue + } + + if !val.Equals(old[i]) { + updateNodes = append(updateNodes, val) + continue + } + } + + return &NodesEvent{ + AddNodes: addNodes, + UpdateNodes: updateNodes, + DeleteNodes: deleteNodes, + } +} + +func (u Users) Diff(old Users) *UsersEvent { + addUsers := make([]*User, 0, 4) + updateUsers := make([]*User, 0, 4) + deleteUsers := make([]*User, 0, 4) + + newTmp := map[string]*User{} + oldTmp := map[string]*User{} + + for i := range u { + newTmp[u[i].Username] = u[i] + } + + for i := range old { + oldTmp[old[i].Username] = old[i] + } + + for i := range newTmp { + if _, ok := oldTmp[i]; !ok { + addUsers = append(addUsers, newTmp[i]) + } + } + + for i := range oldTmp { + val, ok := newTmp[oldTmp[i].Username] + if !ok { + deleteUsers = append(deleteUsers, oldTmp[i]) + continue + } + + if !val.Equals(oldTmp[i]) { + updateUsers = append(updateUsers, val) + continue + } + } + + return &UsersEvent{ + AddUsers: addUsers, + UpdateUsers: updateUsers, + DeleteUsers: deleteUsers, + } +} + +func (c Clusters) Diff(old Clusters) *ClustersEvent { + addClusters := make([]*DataSourceCluster, 0, 4) + updateClusters := make([]*ClusterEvent, 0, 4) + deleteClusters := make([]*DataSourceCluster, 0, 4) + + newTmp := map[string]*DataSourceCluster{} + oldTmp := map[string]*DataSourceCluster{} + + for i := range c { + newTmp[c[i].Name] = c[i] + } + + for i := range old { + oldTmp[old[i].Name] = old[i] + } + + for i := range c { + if _, ok := oldTmp[c[i].Name]; !ok { + addClusters = append(addClusters, c[i]) + } + } + + for i := range old { + val, ok := newTmp[old[i].Name] + if !ok { + deleteClusters = append(deleteClusters, old[i]) + continue + } + + if !reflect.DeepEqual(val, old[i]) { + updateClusters = append(updateClusters, val.Diff(old[i])) + continue + } + } + + return &ClustersEvent{ + AddCluster: addClusters, + UpdateCluster: updateClusters, + DeleteCluster: deleteClusters, + } +} + +func (d *DataSourceCluster) Diff(old *DataSourceCluster) *ClusterEvent { + + ret := &ClusterEvent{ + Name: d.Name, + Type: d.Type, + SqlMaxLimit: d.SqlMaxLimit, + Parameters: d.Parameters, + GroupsEvent: Groups(d.Groups).Diff(old.Groups), + } + + return ret +} + +func (g Groups) Diff(old Groups) *GroupsEvent { + addGroups := make([]*Group, 0, 4) + updateGroups := make([]*Group, 0, 4) + deleteGroups := make([]*Group, 0, 4) + + newTmp := map[string]*Group{} + oldTmp := map[string]*Group{} + + for i := range g { + newTmp[g[i].Name] = g[i] + } + for i := range old { + oldTmp[old[i].Name] = old[i] + } + + for i := range g { + if _, ok := oldTmp[g[i].Name]; !ok { + addGroups = append(addGroups, g[i]) + } + } + + for i := range old { + val, ok := newTmp[old[i].Name] + if !ok { + deleteGroups = append(deleteGroups, old[i]) + continue + } + + if !reflect.DeepEqual(val, old[i]) { + updateGroups = append(updateGroups, val) + continue + } + } + + return &GroupsEvent{ + AddGroups: addGroups, + DeleteGroups: deleteGroups, + UpdateGroups: updateGroups, + } +} + +func (s *ShardingRule) Diff(old *ShardingRule) *ShardingRuleEvent { + addTables := make([]*Table, 0, 4) + updateTables := make([]*Table, 0, 4) + deleteTables := make([]*Table, 0, 4) + + newTmp := map[string]*Table{} + oldTmp := map[string]*Table{} + + for i := range s.Tables { + newTmp[s.Tables[i].Name] = s.Tables[i] + } + for i := range old.Tables { + oldTmp[old.Tables[i].Name] = old.Tables[i] + } + + for i := range s.Tables { + if _, ok := oldTmp[s.Tables[i].Name]; !ok { + addTables = append(addTables, s.Tables[i]) + } + } + + for i := range old.Tables { + val, ok := newTmp[old.Tables[i].Name] + if !ok { + deleteTables = append(deleteTables, old.Tables[i]) + continue + } + + if !reflect.DeepEqual(val, old.Tables[i]) { + updateTables = append(updateTables, val) + continue + } + } + + return &ShardingRuleEvent{ + AddTables: addTables, + UpdateTables: updateTables, + DeleteTables: deleteTables, + } +} + +func (s *ShadowRule) Diff(old *ShadowRule) *ShadowRuleEvent { + addTables := make([]*ShadowTable, 0, 4) + updateTables := make([]*ShadowTable, 0, 4) + deleteTables := make([]*ShadowTable, 0, 4) + + newTmp := map[string]*ShadowTable{} + oldTmp := map[string]*ShadowTable{} + + for i := range s.ShadowTables { + newTmp[s.ShadowTables[i].Name] = s.ShadowTables[i] + } + for i := range old.ShadowTables { + oldTmp[old.ShadowTables[i].Name] = old.ShadowTables[i] + } + + for i := range s.ShadowTables { + if _, ok := oldTmp[s.ShadowTables[i].Name]; !ok { + addTables = append(addTables, s.ShadowTables[i]) + } + } + + for i := range old.ShadowTables { + val, ok := newTmp[old.ShadowTables[i].Name] + if !ok { + deleteTables = append(deleteTables, old.ShadowTables[i]) + continue + } + + if !reflect.DeepEqual(val, old.ShadowTables[i]) { + updateTables = append(updateTables, val) + continue + } + } + + return &ShadowRuleEvent{ + AddTables: addTables, + UpdateTables: updateTables, + DeleteTables: deleteTables, + } +} diff --git a/pkg/config/diff_test.go b/pkg/config/diff_test.go new file mode 100644 index 00000000..ae1f054d --- /dev/null +++ b/pkg/config/diff_test.go @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package config + +import ( + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +func TestNodes_Diff(t *testing.T) { + type args struct { + old Nodes + } + tests := []struct { + name string + n Nodes + args args + want *NodesEvent + }{ + { + name: "NotChange_Nodes", + n: map[string]*Node{ + "mock_node_1": { + Name: "mock_node_1", + Host: "127.0.0.1", + Port: 3306, + Username: "arana", + Password: "arana", + Database: "mock_db_1", + }, + }, + args: struct{ old Nodes }{ + old: map[string]*Node{ + "mock_node_1": { + Name: "mock_node_1", + Host: "127.0.0.1", + Port: 3306, + Username: "arana", + Password: "arana", + Database: "mock_db_1", + }, + }, + }, + want: &NodesEvent{ + AddNodes: []*Node{}, + UpdateNodes: []*Node{}, + DeleteNodes: []*Node{}, + }, + }, + { + name: "Change_AddNodes", + n: map[string]*Node{ + "mock_node_1": { + Name: "mock_node_1", + Host: "127.0.0.1", + Port: 3306, + Username: "arana", + Password: "arana", + Database: "mock_db_1", + }, + "mock_node_2": { + Name: "mock_node_1", + Host: "127.0.0.1", + Port: 3306, + Username: "arana", + Password: "arana", + Database: "mock_db_1", + }, + }, + args: struct{ old Nodes }{ + old: map[string]*Node{ + "mock_node_1": { + Name: "mock_node_1", + Host: "127.0.0.1", + Port: 3306, + Username: "arana", + Password: "arana", + Database: "mock_db_1", + }, + }, + }, + want: &NodesEvent{ + AddNodes: []*Node{ + { + Name: "mock_node_1", + Host: "127.0.0.1", + Port: 3306, + Username: "arana", + Password: "arana", + Database: "mock_db_1", + }, + }, + UpdateNodes: []*Node{}, + DeleteNodes: []*Node{}, + }, + }, + { + name: "Change_DeleteNodes", + n: map[string]*Node{}, + args: struct{ old Nodes }{ + old: map[string]*Node{ + "mock_node_1": { + Name: "mock_node_1", + Host: "127.0.0.1", + Port: 3306, + Username: "arana", + Password: "arana", + Database: "mock_db_1", + }, + }, + }, + want: &NodesEvent{ + AddNodes: []*Node{}, + UpdateNodes: []*Node{}, + DeleteNodes: []*Node{ + { + Name: "mock_node_1", + Host: "127.0.0.1", + Port: 3306, + Username: "arana", + Password: "arana", + Database: "mock_db_1", + }, + }, + }, + }, + + { + name: "Change_UpdateNodes", + n: map[string]*Node{ + "mock_node_1": { + Name: "mock_node_1", + Host: "127.0.0.1", + Port: 3306, + Username: "arana", + Password: "arana", + Database: "mock_db_1", + Parameters: map[string]string{ + "mock_param_key_1": "mock_param_value_1", + }, + }, + }, + args: struct{ old Nodes }{ + old: map[string]*Node{ + "mock_node_1": { + Name: "mock_node_1", + Host: "127.0.0.1", + Port: 3306, + Username: "arana", + Password: "arana", + Database: "mock_db_1", + }, + }, + }, + want: &NodesEvent{ + AddNodes: []*Node{}, + UpdateNodes: []*Node{ + { + Name: "mock_node_1", + Host: "127.0.0.1", + Port: 3306, + Username: "arana", + Password: "arana", + Database: "mock_db_1", + Parameters: map[string]string{ + "mock_param_key_1": "mock_param_value_1", + }, + }, + }, + DeleteNodes: []*Node{}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equalf(t, tt.want, tt.n.Diff(tt.args.old), "Diff(%v)", tt.args.old) + }) + } +} diff --git a/pkg/config/equals.go b/pkg/config/equals.go new file mode 100644 index 00000000..eb05db50 --- /dev/null +++ b/pkg/config/equals.go @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package config + +import ( + "reflect" +) + +func (u *User) Equals(o *User) bool { + + return u.Username == o.Username && u.Password == o.Password +} + +func (n *Node) Equals(o *Node) bool { + if n.Name != o.Name { + return false + } + + if n.Host != o.Host || n.Port != o.Port { + return false + } + + if n.Database != o.Database || n.Username != o.Username || n.Password != o.Password { + return false + } + + if n.Weight != o.Weight { + return false + } + + if len(n.Labels) != len(o.Labels) { + return false + } + + if len(n.Labels) != 0 && len(o.Labels) != 0 && !reflect.DeepEqual(n.Labels, o.Labels) { + return false + } + + if len(n.Parameters) != 0 && len(o.Parameters) != 0 && !reflect.DeepEqual(n.Parameters, o.Parameters) { + return false + } + + return true +} + +func (r Rules) Equals(o Rules) bool { + if len(r) == 0 && len(o) == 0 { + return true + } + + if len(r) != len(o) { + return false + } + + newT := make([]*Rule, 0, 4) + updateT := make([]*Rule, 0, 4) + deleteT := make([]*Rule, 0, 4) + + newTmp := map[string]*Rule{} + oldTmp := map[string]*Rule{} + + for i := range r { + newTmp[r[i].Column] = r[i] + } + for i := range o { + oldTmp[o[i].Column] = o[i] + } + + for i := range r { + if _, ok := oldTmp[o[i].Column]; !ok { + newT = append(newT, o[i]) + } + } + + for i := range o { + val, ok := newTmp[o[i].Column] + if !ok { + deleteT = append(deleteT, o[i]) + continue + } + + if !reflect.DeepEqual(val, o[i]) { + updateT = append(updateT, val) + continue + } + } + + return len(newT) == 0 && len(updateT) == 0 && len(deleteT) == 0 +} + +func (t *Table) Equals(o *Table) bool { + if len(t.DbRules) != len(o.DbRules) { + return false + } + + if len(t.TblRules) != len(o.TblRules) { + return false + } + + if !Rules(t.DbRules).Equals(o.DbRules) { + return false + } + if !Rules(t.TblRules).Equals(o.TblRules) { + return false + } + + if !reflect.DeepEqual(t.Topology, o.Topology) || !reflect.DeepEqual(t.ShadowTopology, o.ShadowTopology) { + return false + } + + if t.AllowFullScan == o.AllowFullScan { + return false + } + + if !reflect.DeepEqual(t.Attributes, o.Attributes) { + return false + } + + return true +} diff --git a/pkg/config/etcd/etcd.go b/pkg/config/etcd/etcd.go index 5c8bbace..26fd8810 100644 --- a/pkg/config/etcd/etcd.go +++ b/pkg/config/etcd/etcd.go @@ -26,65 +26,79 @@ import ( ) import ( - etcdv3 "github.com/dubbogo/gost/database/kv/etcd/v3" - "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" + + "google.golang.org/grpc" ) import ( "github.com/arana-db/arana/pkg/config" - "github.com/arana-db/arana/pkg/util/env" "github.com/arana-db/arana/pkg/util/log" ) +var ( + PluginName = "etcd" +) + func init() { - config.Register(&storeOperate{}) + config.Register(&storeOperate{ + cancelList: make([]context.CancelFunc, 0, 4), + }) } type storeOperate struct { - client *etcdv3.Client - lock *sync.RWMutex + client *clientv3.Client + lock sync.RWMutex receivers map[config.PathKey]*etcdWatcher cancelList []context.CancelFunc } func (c *storeOperate) Init(options map[string]interface{}) error { endpoints, _ := options["endpoints"].(string) - tmpClient, err := etcdv3.NewConfigClientWithErr( - etcdv3.WithName(etcdv3.RegistryETCDV3Client), - etcdv3.WithTimeout(10*time.Second), - etcdv3.WithEndpoints(strings.Split(endpoints, ",")...), - ) + + ctx, cancel := context.WithCancel(context.Background()) + c.cancelList = append(c.cancelList, cancel) + + rawClient, err := clientv3.New(clientv3.Config{ + Context: ctx, + Endpoints: strings.Split(endpoints, ","), + DialTimeout: 10 * time.Second, + DialOptions: []grpc.DialOption{grpc.WithBlock()}, + }) + if err != nil { log.Errorf("failed to initialize etcd client error: %s", err.Error()) return err } - c.client = tmpClient - c.lock = &sync.RWMutex{} + c.client = rawClient c.receivers = make(map[config.PathKey]*etcdWatcher) - c.cancelList = make([]context.CancelFunc, 0, 2) return nil } func (c *storeOperate) Save(key config.PathKey, val []byte) error { - return c.client.Put(string(key), string(val)) + _, err := c.client.Put(context.Background(), string(key), string(val)) + if err != nil { + return err + } + + return nil } func (c *storeOperate) Get(key config.PathKey) ([]byte, error) { - v, err := c.client.Get(string(key)) + resp, err := c.client.Get(context.Background(), string(key)) if err != nil { return nil, err } - if env.IsDevelopEnvironment() { - log.Infof("[ConfigCenter][etcd] load config content : %#v", v) + if len(resp.Kvs) == 0 { + return nil, err } - return []byte(v), nil + return resp.Kvs[0].Value, nil } type etcdWatcher struct { @@ -128,14 +142,12 @@ func (w *etcdWatcher) run(ctx context.Context) { } func (c *storeOperate) Watch(key config.PathKey) (<-chan []byte, error) { + c.lock.Lock() defer c.lock.Unlock() if _, ok := c.receivers[key]; !ok { - watchCh, err := c.client.Watch(string(key)) - if err != nil { - return nil, err - } + watchCh := c.client.Watch(context.Background(), string(key)) w := newWatcher(watchCh) c.receivers[key] = w @@ -156,7 +168,7 @@ func (c *storeOperate) Watch(key config.PathKey) (<-chan []byte, error) { } func (c *storeOperate) Name() string { - return "etcd" + return PluginName } func (c *storeOperate) Close() error { diff --git a/pkg/config/etcd/etcd_test.go b/pkg/config/etcd/etcd_test.go index 1d9a6299..a42b53a9 100644 --- a/pkg/config/etcd/etcd_test.go +++ b/pkg/config/etcd/etcd_test.go @@ -18,6 +18,7 @@ package etcd import ( + "context" "encoding/json" "net/url" "testing" @@ -30,6 +31,8 @@ import ( "github.com/tidwall/gjson" "go.etcd.io/etcd/server/v3/embed" + + "gopkg.in/yaml.v3" ) import ( @@ -40,24 +43,23 @@ import ( const _defaultEtcdV3WorkDir = "/tmp/arana/config" var ( - mockConfData = map[config.PathKey]string{ - config.DefaultConfigMetadataPath: "", - config.DefaultConfigDataListenersPath: "", - config.DefaultConfigDataSourceClustersPath: "", - config.DefaultConfigDataShardingRulePath: "", - config.DefaultConfigDataTenantsPath: "", - } - - cfg *config.Configuration + mockConfData = map[config.PathKey]string{} + cfg *config.Configuration + mockPath = map[string]*config.PathInfo{} ) func doDataMock() { cfg, _ = config.Load(testdata.Path("fake_config.yaml")) - data, _ := json.Marshal(cfg) + for i := range cfg.Data.Tenants { + tenant := cfg.Data.Tenants[i] + mockPath[tenant.Name] = config.NewPathInfo(tenant.Name) - for k, v := range config.ConfigKeyMapping { - mockConfData[k] = string(gjson.GetBytes(data, v).String()) + data, _ := json.Marshal(tenant) + + for k, v := range mockPath[tenant.Name].ConfigKeyMapping { + mockConfData[k] = gjson.GetBytes(data, v).String() + } } } @@ -95,9 +97,11 @@ func Test_storeOpertae(t *testing.T) { doDataMock() cfg, _ := config.Load(testdata.Path("fake_config.yaml")) - data, _ := json.Marshal(cfg) - for k, v := range config.ConfigKeyMapping { - err := operate.Save(k, []byte(gjson.GetBytes(data, v).String())) + + tenantName := cfg.Data.Tenants[0].Name + + for k, v := range mockConfData { + err := operate.Save(k, []byte(v)) assert.NoError(t, err, "save must success") } @@ -108,26 +112,25 @@ func Test_storeOpertae(t *testing.T) { t.Logf("%s => %s", k, string(ret)) } - receiver, err := operate.Watch(config.DefaultConfigDataTenantsPath) + receiver, err := operate.Watch(mockPath[tenantName].DefaultConfigDataUsersPath) assert.NoError(t, err, "watch must success") newCfg, _ := config.Load(testdata.Path("fake_config.yaml")) - data, _ = json.Marshal(newCfg) - - expectVal := string(gjson.GetBytes(data, config.ConfigKeyMapping[config.DefaultConfigDataTenantsPath]).String()) - - for k := range config.ConfigKeyMapping { - if k == config.DefaultConfigDataTenantsPath { - err := operate.client.Put(string(k), expectVal) - assert.NoError(t, err, "put to etcd must success") - break - } + newCfg.Data.Tenants[0].Users = []*config.User{ + { + Username: "arana", + Password: "arana", + }, } + data, _ := yaml.Marshal(newCfg.Data.Tenants[0].Users) + + _, err = operate.client.Put(context.TODO(), string(mockPath[tenantName].DefaultConfigDataUsersPath), string(data)) + assert.NoError(t, err, "put to etcd must success") ret := <-receiver - t.Logf("expect val : %s", expectVal) + t.Logf("expect val : %s", string(data)) t.Logf("acutal val : %s", string(ret)) - assert.Equal(t, expectVal, string(ret)) + assert.Equal(t, string(data), string(ret)) } diff --git a/pkg/config/event.go b/pkg/config/event.go new file mode 100644 index 00000000..2fa5594c --- /dev/null +++ b/pkg/config/event.go @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package config + +type EventType int32 + +const ( + _ EventType = iota + EventTypeTenants + EventTypeUsers + EventTypeNodes + EventTypeClusters + EventTypeShardingRule + EventTypeShadowRule +) + +type ( + Event interface { + Type() EventType + } + + // TenantsEvent tenants event + TenantsEvent struct { + AddTenants Tenants + DeleteTenants Tenants + } + + // UsersEvent users event + UsersEvent struct { + AddUsers Users + UpdateUsers Users + DeleteUsers Users + } + + // ClustersEvent clusters event + ClustersEvent struct { + AddCluster Clusters + DeleteCluster Clusters + UpdateCluster []*ClusterEvent + } + + // ClusterEvent cluster event + ClusterEvent struct { + Name string + Type DataSourceType + SqlMaxLimit int + Parameters ParametersMap + GroupsEvent *GroupsEvent + } + + // GroupsEvent groups event + GroupsEvent struct { + AddGroups Groups + UpdateGroups Groups + DeleteGroups Groups + } + + // GroupEvent group event + GroupEvent struct { + Name string + AddNodes Nodes + UpdateNodes Nodes + DeleteNodes Nodes + } + + // ShardingRuleEvent sharding rule event + ShardingRuleEvent struct { + AddTables []*Table + UpdateTables []*Table + DeleteTables []*Table + } + + // ShadowRuleEvent shadow rule event + ShadowRuleEvent struct { + AddTables []*ShadowTable + UpdateTables []*ShadowTable + DeleteTables []*ShadowTable + } + + // NodesEvent nodes event + NodesEvent struct { + AddNodes []*Node + UpdateNodes []*Node + DeleteNodes []*Node + } +) + +func (e TenantsEvent) Type() EventType { + return EventTypeTenants +} + +func (e NodesEvent) Type() EventType { + return EventTypeNodes +} + +func (e UsersEvent) Type() EventType { + return EventTypeUsers +} + +func (e ClustersEvent) Type() EventType { + return EventTypeClusters +} + +func (e ShardingRuleEvent) Type() EventType { + return EventTypeShardingRule +} + +func (e ShadowRuleEvent) Type() EventType { + return EventTypeShadowRule +} diff --git a/pkg/config/file/file.go b/pkg/config/file/file.go index e3ffdfe9..c644cd75 100644 --- a/pkg/config/file/file.go +++ b/pkg/config/file/file.go @@ -18,11 +18,14 @@ package file import ( + "context" "encoding/json" "os" "path/filepath" "strings" "sync" + "sync/atomic" + "time" ) import ( @@ -40,20 +43,58 @@ import ( "github.com/arana-db/arana/pkg/util/log" ) -var configFilenameList = []string{"config.yaml", "config.yml"} +var ( + PluginName = "file" + configFilenameList = []string{"config.yaml", "config.yml"} +) func init() { config.Register(&storeOperate{}) } -type storeOperate struct { +type receiverBucket struct { lock sync.RWMutex - receivers map[config.PathKey][]chan []byte - cfgJson map[config.PathKey]string + receivers map[config.PathKey][]chan<- []byte +} + +func (b *receiverBucket) add(key config.PathKey, rec chan<- []byte) { + b.lock.Lock() + defer b.lock.Unlock() + + if _, ok := b.receivers[key]; !ok { + b.receivers[key] = make([]chan<- []byte, 0, 2) + } + b.receivers[key] = append(b.receivers[key], rec) +} + +func (b *receiverBucket) notifyWatcher(k config.PathKey, val []byte) { + b.lock.RLock() + defer b.lock.RUnlock() + + for i := range b.receivers[k] { + b.receivers[k][i] <- val + } +} + +type storeOperate struct { + initialize int32 + lock sync.RWMutex + contents map[config.PathKey]string + receivers *receiverBucket + cancels []context.CancelFunc + mapping map[string]*config.PathInfo } func (s *storeOperate) Init(options map[string]interface{}) error { - s.receivers = make(map[config.PathKey][]chan []byte) + if !atomic.CompareAndSwapInt32(&s.initialize, 0, 1) { + return nil + } + + ctx, cancel := context.WithCancel(context.Background()) + s.cancels = append(s.cancels, cancel) + + s.mapping = make(map[string]*config.PathInfo) + s.receivers = &receiverBucket{receivers: map[config.PathKey][]chan<- []byte{}} var ( content string ok bool @@ -79,80 +120,102 @@ func (s *storeOperate) Init(options map[string]interface{}) error { return errors.New("no config file found") } + path, err := formatPath(path) + if err != nil { + return err + } if err := s.readFromFile(path, &cfg); err != nil { return err } + + go s.watchFileChange(ctx, path) } - configJson, err := json.Marshal(cfg) - if err != nil { - return errors.Wrap(err, "config json.marshal failed") + for i := range cfg.Data.Tenants { + name := cfg.Data.Tenants[i].Name + s.mapping[name] = config.NewPathInfo(name) } - s.initCfgJsonMap(string(configJson)) + + s.updateContents(cfg, false) return nil } -func (s *storeOperate) initCfgJsonMap(val string) { - s.cfgJson = make(map[config.PathKey]string) +func (s *storeOperate) updateContents(cfg config.Configuration, notify bool) { + s.lock.Lock() + defer s.lock.Unlock() + + s.contents = make(map[config.PathKey]string) + + tenants := make([]string, 0, 4) + for i := range cfg.Data.Tenants { + tenants = append(tenants, cfg.Data.Tenants[i].Name) - for k, v := range config.ConfigKeyMapping { - s.cfgJson[k] = gjson.Get(val, v).String() + tmp, _ := json.Marshal(cfg.Data.Tenants[i]) + ret := string(tmp) + mapping := s.mapping[cfg.Data.Tenants[i].Name] + + for k, v := range mapping.ConfigKeyMapping { + val, _ := config.JSONToYAML(gjson.Get(ret, v).String()) + s.contents[k] = string(val) + if notify { + s.receivers.notifyWatcher(k, val) + } + } } + ret, _ := yaml.Marshal(tenants) + s.contents[config.DefaultTenantsPath] = string(ret) + if env.IsDevelopEnvironment() { - log.Infof("[ConfigCenter][File] load config content : %#v", s.cfgJson) + log.Debugf("[ConfigCenter][File] load config content : %#v", s.contents) } } func (s *storeOperate) Save(key config.PathKey, val []byte) error { + s.lock.Lock() + defer s.lock.Unlock() + + s.contents[key] = string(val) + s.receivers.notifyWatcher(key, val) return nil } +//Get func (s *storeOperate) Get(key config.PathKey) ([]byte, error) { - val := []byte(s.cfgJson[key]) + s.lock.RLock() + defer s.lock.RUnlock() + + val := []byte(s.contents[key]) return val, nil } -// Watch TODO change notification through file inotify mechanism +// Watch func (s *storeOperate) Watch(key config.PathKey) (<-chan []byte, error) { - s.lock.Lock() - defer s.lock.Unlock() - - if _, ok := s.receivers[key]; !ok { - s.receivers[key] = make([]chan []byte, 0, 2) - } - rec := make(chan []byte) - - s.receivers[key] = append(s.receivers[key], rec) - + s.receivers.add(key, rec) return rec, nil } func (s *storeOperate) Name() string { - return "file" + return PluginName } func (s *storeOperate) Close() error { + + for i := range s.cancels { + s.cancels[i]() + } + return nil } +//readFromFile func (s *storeOperate) readFromFile(path string, cfg *config.Configuration) error { var ( f *os.File err error ) - if strings.HasPrefix(path, "~") { - var home string - if home, err = os.UserHomeDir(); err != nil { - return err - } - path = strings.Replace(path, "~", home, 1) - } - - path = filepath.Clean(path) - if f, err = os.Open(path); err != nil { return errors.Wrapf(err, "failed to open arana config file '%s'", path) } @@ -179,3 +242,55 @@ func (s *storeOperate) searchDefaultConfigFile() (string, bool) { } return "", false } + +func formatPath(path string) (string, error) { + if strings.HasPrefix(path, "~") { + home, err := os.UserHomeDir() + if err != nil { + return "", err + } + path = strings.Replace(path, "~", home, 1) + } + + path = filepath.Clean(path) + + return path, nil +} + +//watchFileChange +func (s *storeOperate) watchFileChange(ctx context.Context, path string) { + + refreshT := time.NewTicker(30 * time.Second) + + oldStat, err := os.Stat(path) + if err != nil { + log.Errorf("[ConfigCenter][File] get file=%s stat fail : %s", path, err.Error()) + } + + for { + select { + case <-refreshT.C: + stat, err := os.Stat(path) + if err != nil { + log.Errorf("[ConfigCenter][File] get file=%s stat fail : %s", path, err.Error()) + continue + } + + if stat.ModTime().Equal(oldStat.ModTime()) { + continue + } + + cfg := &config.Configuration{} + if err := s.readFromFile(path, cfg); err != nil { + log.Errorf("[ConfigCenter][File] read file=%s and marshal to Configuration fail : %s", path, err.Error()) + return + } + + log.Errorf("[ConfigCenter][File] watch file=%s change : %+v", path, stat.ModTime()) + s.updateContents(*cfg, true) + case <-ctx.Done(): + + } + } + +} diff --git a/pkg/config/file/file_test.go b/pkg/config/file/file_test.go index 16a69c4d..8506c200 100644 --- a/pkg/config/file/file_test.go +++ b/pkg/config/file/file_test.go @@ -22,6 +22,10 @@ import ( "testing" ) +import ( + "gopkg.in/yaml.v3" +) + import ( "github.com/arana-db/arana/pkg/config" "github.com/arana-db/arana/testdata" @@ -30,263 +34,115 @@ import ( var ( FakeConfigPath = testdata.Path("fake_config.yaml") EmptyConfigPath = testdata.Path("fake_empty_config.yaml") -) -var jsonConfig = `{ - "kind":"ConfigMap", - "apiVersion":"1.0", - "metadata":{ - "name":"arana-config" - }, - "data":{ - "listeners":[ - { - "protocol_type":"mysql", - "socket_address":{ - "address":"0.0.0.0", - "port":13306 - }, - "server_version":"5.7.0" - } - ], - "tenants":[ - { - "name":"arana", - "users":[ - { - "username":"arana", - "password":"123456" - }, - { - "username":"dksl", - "password":"123456" - } - ] - } - ], - "clusters":[ - { - "name":"employees", - "type":"mysql", - "sql_max_limit":-1, - "tenant":"arana", - "parameters":{ - "max_allowed_packet":"256M" - }, - "groups":[ - { - "name":"employees_0000", - "nodes":[ - { - "name":"node0", - "host":"arana-mysql", - "port":3306, - "username":"root", - "password":"123456", - "database":"employees_0000", - "parameters":null, - "weight":"r10w10" - }, - { - "name":"node0_r_0", - "host":"arana-mysql", - "port":3306, - "username":"root", - "password":"123456", - "database":"employees_0000_r", - "parameters":null, - "weight":"r0w0" - } - ] - }, - { - "name":"employees_0001", - "nodes":[ - { - "name":"node1", - "host":"arana-mysql", - "port":3306, - "username":"root", - "password":"123456", - "database":"employees_0001", - "parameters":null, - "weight":"r10w10" - } - ] - }, - { - "name":"employees_0002", - "nodes":[ - { - "name":"node2", - "host":"arana-mysql", - "port":3306, - "username":"root", - "password":"123456", - "database":"employees_0002", - "parameters":null, - "weight":"r10w10" - } - ] - }, - { - "name":"employees_0003", - "nodes":[ - { - "name":"node3", - "host":"arana-mysql", - "port":3306, - "username":"root", - "password":"123456", - "database":"employees_0003", - "parameters":null, - "weight":"r10w10" - } - ] - } - ] - } - ], - "sharding_rule":{ - "tables":[ - { - "name":"employees.student", - "sequence":{ - "type":"snowflake", - "option":null - }, - "allow_full_scan":true, - "db_rules":[ - { - "column":"uid", - "type":"scriptExpr", - "expr":"parseInt($value % 32 / 8)", - "step":0 - } - ], - "tbl_rules":[ - { - "column":"uid", - "type":"scriptExpr", - "expr":"$value % 32", - "step":32 - } - ], - "topology":{ - "db_pattern":"employees_${0000..0003}", - "tbl_pattern":"student_${0000..0031}" - }, - "shadow_topology":null, - "attributes":{ - "sqlMaxLimit":"-1" - } - } - ] - } - } -}` - -var yamlConfig = ` + yamlConfig = ` kind: ConfigMap apiVersion: "1.0" metadata: name: arana-config data: - listeners: - - protocol_type: mysql - server_version: 5.7.0 - socket_address: - address: 0.0.0.0 - port: 13306 - tenants: - name: arana users: + - username: root + password: "123456" - username: arana password: "123456" - - username: dksl + clusters: + - name: employees + type: mysql + sql_max_limit: -1 + tenant: arana + parameters: + max_allowed_packet: 256M + groups: + - name: employees_0000 + nodes: + - node0 + - node0_r_0 + - name: employees_0001 + nodes: + - node1 + - name: employees_0002 + nodes: + - node2 + - name: employees_0003 + nodes: + - node3 + sharding_rule: + tables: + - name: employees.student + allow_full_scan: true + sequence: + type: snowflake + option: + db_rules: + - column: uid + type: scriptExpr + expr: parseInt($value % 32 / 8) + tbl_rules: + - column: uid + type: scriptExpr + expr: $value % 32 + step: 32 + topology: + db_pattern: employees_${0000..0003} + tbl_pattern: student_${0000..0031} + attributes: + sqlMaxLimit: -1 + nodes: + node0: + name: node0 + host: arana-mysql + port: 3306 + username: root password: "123456" - - clusters: - - name: employees - type: mysql - sql_max_limit: -1 - tenant: arana - parameters: - max_allowed_packet: 256M - groups: - - name: employees_0000 - nodes: - - name: node0 - host: arana-mysql - port: 3306 - username: root - password: "123456" - database: employees_0000 - weight: r10w10 - parameters: - - name: node0_r_0 - host: arana-mysql - port: 3306 - username: root - password: "123456" - database: employees_0000_r - weight: r0w0 - parameters: - - name: employees_0001 - nodes: - - name: node1 - host: arana-mysql - port: 3306 - username: root - password: "123456" - database: employees_0001 - weight: r10w10 - parameters: - - name: employees_0002 - nodes: - - name: node2 - host: arana-mysql - port: 3306 - username: root - password: "123456" - database: employees_0002 - weight: r10w10 - parameters: - - name: employees_0003 - nodes: - - name: node3 - host: arana-mysql - port: 3306 - username: root - password: "123456" - database: employees_0003 - weight: r10w10 - parameters: - sharding_rule: - tables: - - name: employees.student - allow_full_scan: true - db_rules: - - column: uid - type: scriptExpr - expr: parseInt($value % 32 / 8) - tbl_rules: - - column: uid - type: scriptExpr - expr: $value % 32 - step: 32 - topology: - db_pattern: employees_${0000..0003} - tbl_pattern: student_${0000..0031} - attributes: - sqlMaxLimit: -1 + database: employees_0000 + weight: r10w10 + parameters: + node0_r_0: + name: node0_r_0 + host: arana-mysql + port: 3306 + username: root + password: "123456" + database: employees_0000_r + weight: r0w0 + parameters: + node1: + name: node1 + host: arana-mysql + port: 3306 + username: root + password: "123456" + database: employees_0001 + weight: r10w10 + parameters: + node2: + name: node2 + host: arana-mysql + port: 3306 + username: root + password: "123456" + database: employees_0002 + weight: r10w10 + parameters: + node3: + name: node3 + host: arana-mysql + port: 3306 + username: root + password: "123456" + database: employees_0003 + weight: r10w10 + parameters: ` +) func Test_storeOperate_Close(t *testing.T) { type fields struct { - receivers map[config.PathKey][]chan []byte - cfgJson map[config.PathKey]string + receivers *receiverBucket + contents map[config.PathKey]string } tests := []struct { name string @@ -299,7 +155,7 @@ func Test_storeOperate_Close(t *testing.T) { t.Run(tt.name, func(t *testing.T) { s := &storeOperate{ receivers: tt.fields.receivers, - cfgJson: tt.fields.cfgJson, + contents: tt.fields.contents, } if err := s.Close(); (err != nil) != tt.wantErr { t.Errorf("Close() error = %v, wantErr %v", err, tt.wantErr) @@ -310,8 +166,8 @@ func Test_storeOperate_Close(t *testing.T) { func Test_storeOperate_Get(t *testing.T) { type fields struct { - receivers map[config.PathKey][]chan []byte - cfgJson map[config.PathKey]string + receivers *receiverBucket + contents map[config.PathKey]string } type args struct { key config.PathKey @@ -335,7 +191,7 @@ func Test_storeOperate_Get(t *testing.T) { t.Run(tt.name, func(t *testing.T) { s := &storeOperate{ receivers: tt.fields.receivers, - cfgJson: tt.fields.cfgJson, + contents: tt.fields.contents, } got, err := s.Get(tt.args.key) if (err != nil) != tt.wantErr { @@ -351,8 +207,8 @@ func Test_storeOperate_Get(t *testing.T) { func Test_storeOperate_Init(t *testing.T) { type fields struct { - receivers map[config.PathKey][]chan []byte - cfgJson map[config.PathKey]string + receivers *receiverBucket + contents map[config.PathKey]string } type args struct { options map[string]interface{} @@ -394,7 +250,7 @@ func Test_storeOperate_Init(t *testing.T) { t.Run(tt.name, func(t *testing.T) { s := &storeOperate{ receivers: tt.fields.receivers, - cfgJson: tt.fields.cfgJson, + contents: tt.fields.contents, } if err := s.Init(tt.args.options); (err != nil) != tt.wantErr { t.Errorf("Init() error = %v, wantErr %v", err, tt.wantErr) @@ -405,8 +261,8 @@ func Test_storeOperate_Init(t *testing.T) { func Test_storeOperate_Name(t *testing.T) { type fields struct { - receivers map[config.PathKey][]chan []byte - cfgJson map[config.PathKey]string + receivers *receiverBucket + contents map[config.PathKey]string } tests := []struct { name string @@ -419,7 +275,7 @@ func Test_storeOperate_Name(t *testing.T) { t.Run(tt.name, func(t *testing.T) { s := &storeOperate{ receivers: tt.fields.receivers, - cfgJson: tt.fields.cfgJson, + contents: tt.fields.contents, } if got := s.Name(); got != tt.want { t.Errorf("Name() = %v, want %v", got, tt.want) @@ -430,8 +286,8 @@ func Test_storeOperate_Name(t *testing.T) { func Test_storeOperate_Save(t *testing.T) { type fields struct { - receivers map[config.PathKey][]chan []byte - cfgJson map[config.PathKey]string + receivers *receiverBucket + contents map[config.PathKey]string } type args struct { key config.PathKey @@ -449,7 +305,7 @@ func Test_storeOperate_Save(t *testing.T) { t.Run(tt.name, func(t *testing.T) { s := &storeOperate{ receivers: tt.fields.receivers, - cfgJson: tt.fields.cfgJson, + contents: tt.fields.contents, } if err := s.Save(tt.args.key, tt.args.val); (err != nil) != tt.wantErr { t.Errorf("Save() error = %v, wantErr %v", err, tt.wantErr) @@ -460,8 +316,8 @@ func Test_storeOperate_Save(t *testing.T) { func Test_storeOperate_Watch(t *testing.T) { type fields struct { - receivers map[config.PathKey][]chan []byte - cfgJson map[config.PathKey]string + receivers *receiverBucket + contents map[config.PathKey]string } type args struct { key config.PathKey @@ -474,7 +330,7 @@ func Test_storeOperate_Watch(t *testing.T) { }{ { "Watch", - fields{make(map[config.PathKey][]chan []byte), make(map[config.PathKey]string)}, + fields{&receiverBucket{receivers: map[config.PathKey][]chan<- []byte{}}, make(map[config.PathKey]string)}, args{"/arana-db/config/data/dataSourceClusters"}, false, }, @@ -483,7 +339,7 @@ func Test_storeOperate_Watch(t *testing.T) { t.Run(tt.name, func(t *testing.T) { s := &storeOperate{ receivers: tt.fields.receivers, - cfgJson: tt.fields.cfgJson, + contents: tt.fields.contents, } got, err := s.Watch(tt.args.key) if (err != nil) != tt.wantErr { @@ -497,10 +353,10 @@ func Test_storeOperate_Watch(t *testing.T) { } } -func Test_storeOperate_initCfgJsonMap(t *testing.T) { +func Test_storeOperate_initContentsMap(t *testing.T) { type fields struct { - receivers map[config.PathKey][]chan []byte - cfgJson map[config.PathKey]string + receivers *receiverBucket + contents map[config.PathKey]string } type args struct { val string @@ -510,23 +366,27 @@ func Test_storeOperate_initCfgJsonMap(t *testing.T) { fields fields args args }{ - {"initCfgJsonMap", fields{}, args{jsonConfig}}, + {"initContentsMap", fields{}, args{yamlConfig}}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { s := &storeOperate{ receivers: tt.fields.receivers, - cfgJson: tt.fields.cfgJson, + contents: tt.fields.contents, } - s.initCfgJsonMap(tt.args.val) + + cfg := new(config.Configuration) + _ = yaml.Unmarshal([]byte(tt.args.val), cfg) + + s.updateContents(*cfg, false) }) } } func Test_storeOperate_readFromFile(t *testing.T) { type fields struct { - receivers map[config.PathKey][]chan []byte - cfgJson map[config.PathKey]string + receivers *receiverBucket + contents map[config.PathKey]string } type args struct { path string @@ -554,7 +414,7 @@ func Test_storeOperate_readFromFile(t *testing.T) { t.Run(tt.name, func(t *testing.T) { s := &storeOperate{ receivers: tt.fields.receivers, - cfgJson: tt.fields.cfgJson, + contents: tt.fields.contents, } if err := s.readFromFile(tt.args.path, tt.args.cfg); (err != nil) != tt.wantErr { t.Errorf("readFromFile() error = %v, wantErr %v", err, tt.wantErr) @@ -565,8 +425,8 @@ func Test_storeOperate_readFromFile(t *testing.T) { func Test_storeOperate_searchDefaultConfigFile(t *testing.T) { type fields struct { - receivers map[config.PathKey][]chan []byte - cfgJson map[config.PathKey]string + receivers *receiverBucket + contents map[config.PathKey]string } tests := []struct { name string @@ -580,7 +440,7 @@ func Test_storeOperate_searchDefaultConfigFile(t *testing.T) { t.Run(tt.name, func(t *testing.T) { s := &storeOperate{ receivers: tt.fields.receivers, - cfgJson: tt.fields.cfgJson, + contents: tt.fields.contents, } got, got1 := s.searchDefaultConfigFile() if got != tt.want { diff --git a/pkg/config/model.go b/pkg/config/model.go index dcb0cf41..e832af42 100644 --- a/pkg/config/model.go +++ b/pkg/config/model.go @@ -42,17 +42,16 @@ import ( ) type ( - // Configuration represents an Arana configuration. - Configuration struct { + DataRevision interface { + Revision() string + } + + Spec struct { Kind string `yaml:"kind" json:"kind,omitempty"` APIVersion string `yaml:"apiVersion" json:"apiVersion,omitempty"` Metadata map[string]interface{} `yaml:"metadata" json:"metadata"` - Data *Data `validate:"required,structonly" yaml:"data" json:"data"` } - // DataSourceType is the data source type - DataSourceType string - // SocketAddress specify either a logical or physical address and port, which are // used to tell server where to bind/listen, connect to upstream and find // management servers @@ -61,31 +60,46 @@ type ( Port int `default:"13306" yaml:"port" json:"port"` } + Listener struct { + ProtocolType string `yaml:"protocol_type" json:"protocol_type"` + SocketAddress *SocketAddress `yaml:"socket_address" json:"socket_address"` + ServerVersion string `yaml:"server_version" json:"server_version"` + } + + // Configuration represents an Arana configuration. + Configuration struct { + Spec `yaml:",inline"` + Data *Data `validate:"required,structonly" yaml:"data" json:"data"` + } + + // DataSourceType is the data source type + DataSourceType string + Data struct { - Listeners []*Listener `validate:"required,dive" yaml:"listeners" json:"listeners"` - Tenants []*Tenant `validate:"required,dive" yaml:"tenants" json:"tenants"` - DataSourceClusters []*DataSourceCluster `validate:"required,dive" yaml:"clusters" json:"clusters"` - ShardingRule *ShardingRule `validate:"required,dive" yaml:"sharding_rule,omitempty" json:"sharding_rule,omitempty"` - ShadowRule *ShadowRule `yaml:"shadow_rule,omitempty" json:"shadow_rule,omitempty"` + Tenants []*Tenant `validate:"required,dive" yaml:"tenants" json:"tenants"` } Tenant struct { - Name string `validate:"required" yaml:"name" json:"name"` - Users []*User `validate:"required" yaml:"users" json:"users"` + Spec + Name string `validate:"required" yaml:"name" json:"name"` + Users []*User `validate:"required" yaml:"users" json:"users"` + DataSourceClusters []*DataSourceCluster `validate:"required,dive" yaml:"clusters" json:"clusters"` + ShardingRule *ShardingRule `validate:"required,dive" yaml:"sharding_rule,omitempty" json:"sharding_rule,omitempty"` + ShadowRule *ShadowRule `yaml:"shadow_rule,omitempty" json:"shadow_rule,omitempty"` + Nodes map[string]*Node `validate:"required" yaml:"nodes" json:"nodes"` } DataSourceCluster struct { Name string `yaml:"name" json:"name"` Type DataSourceType `yaml:"type" json:"type"` SqlMaxLimit int `default:"-1" yaml:"sql_max_limit" json:"sql_max_limit,omitempty"` - Tenant string `yaml:"tenant" json:"tenant"` Parameters ParametersMap `yaml:"parameters" json:"parameters"` Groups []*Group `yaml:"groups" json:"groups"` } Group struct { - Name string `yaml:"name" json:"name"` - Nodes []*Node `yaml:"nodes" json:"nodes"` + Name string `yaml:"name" json:"name"` + Nodes []string `yaml:"nodes" json:"nodes"` } Node struct { @@ -135,12 +149,6 @@ type ( Regex string `yaml:"regex" json:"regex"` } - Listener struct { - ProtocolType string `yaml:"protocol_type" json:"protocol_type"` - SocketAddress *SocketAddress `yaml:"socket_address" json:"socket_address"` - ServerVersion string `yaml:"server_version" json:"server_version"` - } - User struct { Username string `yaml:"username" json:"username"` Password string `yaml:"password" json:"password"` @@ -246,6 +254,12 @@ func Load(path string) (*Configuration, error) { return &cfg, nil } +func (t *Tenant) Empty() bool { + return len(t.Users) == 0 && + len(t.Nodes) == 0 && + len(t.DataSourceClusters) == 0 +} + var _weightRegexp = regexp.MustCompile(`^[rR]([0-9]+)[wW]([0-9]+)$`) func (d *Node) GetReadAndWriteWeight() (int, int, error) { @@ -357,3 +371,25 @@ func GetConnPropIdleTime(connProps map[string]interface{}, defaultValue time.Dur return time.Duration(n) * time.Second } + +type ( + Clusters []*DataSourceCluster + Tenants []string + Nodes map[string]*Node + Groups []*Group + Users []*User + Rules []*Rule +) + +func NewEmptyTenant() *Tenant { + return &Tenant{ + Spec: Spec{ + Metadata: map[string]interface{}{}, + }, + Users: make([]*User, 0, 1), + DataSourceClusters: make([]*DataSourceCluster, 0, 1), + ShardingRule: new(ShardingRule), + ShadowRule: new(ShadowRule), + Nodes: map[string]*Node{}, + } +} diff --git a/pkg/config/model_test.go b/pkg/config/model_test.go index 74b74cc7..0a7a6814 100644 --- a/pkg/config/model_test.go +++ b/pkg/config/model_test.go @@ -59,18 +59,18 @@ func TestDataSourceClustersConf(t *testing.T) { assert.NoError(t, err) assert.NotEqual(t, nil, conf) - assert.Equal(t, 1, len(conf.Data.DataSourceClusters)) - dataSourceCluster := conf.Data.DataSourceClusters[0] + assert.Equal(t, 1, len(conf.Data.Tenants[0].DataSourceClusters)) + dataSourceCluster := conf.Data.Tenants[0].DataSourceClusters[0] assert.Equal(t, "employees", dataSourceCluster.Name) assert.Equal(t, config.DBMySQL, dataSourceCluster.Type) assert.Equal(t, -1, dataSourceCluster.SqlMaxLimit) - assert.Equal(t, "arana", dataSourceCluster.Tenant) + assert.Equal(t, "arana", conf.Data.Tenants[0].Name) assert.Equal(t, 4, len(dataSourceCluster.Groups)) group := dataSourceCluster.Groups[0] assert.Equal(t, "employees_0000", group.Name) assert.Equal(t, 2, len(group.Nodes)) - node := group.Nodes[0] + node := conf.Data.Tenants[0].Nodes["node0"] assert.Equal(t, "arana-mysql", node.Host) assert.Equal(t, 3306, node.Port) assert.Equal(t, "root", node.Username) @@ -86,9 +86,9 @@ func TestShardingRuleConf(t *testing.T) { assert.NoError(t, err) assert.NotEqual(t, nil, conf) - assert.NotNil(t, conf.Data.ShardingRule) - assert.Equal(t, 1, len(conf.Data.ShardingRule.Tables)) - table := conf.Data.ShardingRule.Tables[0] + assert.NotNil(t, conf.Data.Tenants[0].ShardingRule) + assert.Equal(t, 1, len(conf.Data.Tenants[0].ShardingRule.Tables)) + table := conf.Data.Tenants[0].ShardingRule.Tables[0] assert.Equal(t, "employees.student", table.Name) assert.Equal(t, true, table.AllowFullScan) diff --git a/pkg/config/nacos/nacos.go b/pkg/config/nacos/nacos.go index b9a35400..ad2a9edf 100644 --- a/pkg/config/nacos/nacos.go +++ b/pkg/config/nacos/nacos.go @@ -30,12 +30,12 @@ import ( "github.com/nacos-group/nacos-sdk-go/v2/clients/config_client" "github.com/nacos-group/nacos-sdk-go/v2/common/constant" "github.com/nacos-group/nacos-sdk-go/v2/vo" + + "github.com/pkg/errors" ) import ( "github.com/arana-db/arana/pkg/config" - "github.com/arana-db/arana/pkg/util/env" - "github.com/arana-db/arana/pkg/util/log" ) const ( @@ -48,6 +48,14 @@ const ( _server string = "endpoints" _contextPath string = "contextPath" _scheme string = "scheme" + + _pathSplit string = "::" +) + +var ( + PluginName = "nacos" + + ErrorPublishConfigFail = errors.New("save config into nacos fail") ) func init() { @@ -58,27 +66,19 @@ func init() { type storeOperate struct { groupName string client config_client.IConfigClient - confMap map[config.PathKey]string - cfgLock *sync.RWMutex - lock *sync.RWMutex + cfgLock sync.RWMutex + lock sync.RWMutex receivers map[config.PathKey]*nacosWatcher cancelList []context.CancelFunc } // Init plugin initialization func (s *storeOperate) Init(options map[string]interface{}) error { - s.lock = &sync.RWMutex{} - s.cfgLock = &sync.RWMutex{} - s.confMap = make(map[config.PathKey]string) s.receivers = make(map[config.PathKey]*nacosWatcher) if err := s.initNacosClient(options); err != nil { return err } - if err := s.loadDataFromServer(); err != nil { - return err - } - return nil } @@ -152,53 +152,42 @@ func parseClientConfig(options map[string]interface{}) constant.ClientConfig { return cc } -func (s *storeOperate) loadDataFromServer() error { - s.cfgLock.Lock() - defer s.cfgLock.Unlock() - - for dataId := range config.ConfigKeyMapping { - data, err := s.client.GetConfig(vo.ConfigParam{ - DataId: string(dataId), - Group: s.groupName, - }) - if err != nil { - return err - } - - s.confMap[dataId] = data - } - - return nil -} - // Save save a configuration data func (s *storeOperate) Save(key config.PathKey, val []byte) error { - _, err := s.client.PublishConfig(vo.ConfigParam{ + ok, err := s.client.PublishConfig(vo.ConfigParam{ Group: s.groupName, - DataId: string(key), + DataId: buildNacosDataId(string(key)), Content: string(val), }) + if err != nil { + return err + } - return err + if !ok { + return ErrorPublishConfigFail + } + return nil } // Get get a configuration func (s *storeOperate) Get(key config.PathKey) ([]byte, error) { - defer s.cfgLock.RUnlock() - s.cfgLock.RLock() - - val := []byte(s.confMap[key]) + ret, err := s.client.GetConfig(vo.ConfigParam{ + DataId: buildNacosDataId(string(key)), + Group: s.groupName, + }) - if env.IsDevelopEnvironment() { - log.Infof("[ConfigCenter][nacos] load config content : %#v", string(val)) + if err != nil { + return nil, err } - return val, nil + + return []byte(ret), nil } // Watch Monitor changes of the key func (s *storeOperate) Watch(key config.PathKey) (<-chan []byte, error) { - defer s.lock.Unlock() s.lock.Lock() + defer s.lock.Unlock() + if _, ok := s.receivers[key]; !ok { w, err := s.newWatcher(key, s.client) if err != nil { @@ -213,8 +202,8 @@ func (s *storeOperate) Watch(key config.PathKey) (<-chan []byte, error) { w := s.receivers[key] - defer w.lock.Unlock() w.lock.Lock() + defer w.lock.Unlock() rec := make(chan []byte) s.receivers[key].receivers = append(s.receivers[key].receivers, rec) @@ -223,11 +212,12 @@ func (s *storeOperate) Watch(key config.PathKey) (<-chan []byte, error) { // Name plugin name func (s *storeOperate) Name() string { - return "nacos" + return PluginName } -// Close do close storeOperate +// Close closes storeOperate func (s *storeOperate) Close() error { + s.client.CloseClient() return nil } @@ -244,13 +234,13 @@ func (s *storeOperate) newWatcher(key config.PathKey, client config_client.IConf } err := client.ListenConfig(vo.ConfigParam{ - DataId: string(key), + DataId: buildNacosDataId(string(key)), Group: s.groupName, OnChange: func(_, _, dataId, content string) { s.cfgLock.Lock() defer s.cfgLock.Unlock() - s.confMap[config.PathKey(dataId)] = content + dataId = revertNacosDataId(dataId) s.receivers[config.PathKey(dataId)].ch <- []byte(content) }, }) @@ -276,3 +266,11 @@ func (w *nacosWatcher) run(ctx context.Context) { } } } + +func buildNacosDataId(v string) string { + return strings.ReplaceAll(v, "/", _pathSplit) +} + +func revertNacosDataId(v string) string { + return strings.ReplaceAll(v, _pathSplit, "/") +} diff --git a/pkg/config/nacos/nacos_test.go b/pkg/config/nacos/nacos_test.go index bccb523c..a77f22e6 100644 --- a/pkg/config/nacos/nacos_test.go +++ b/pkg/config/nacos/nacos_test.go @@ -33,6 +33,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/tidwall/gjson" + + "gopkg.in/yaml.v3" ) import ( @@ -41,25 +43,23 @@ import ( ) var ( - mockConfData = map[config.PathKey]string{ - config.DefaultConfigPath: "", - config.DefaultConfigMetadataPath: "", - config.DefaultConfigDataListenersPath: "", - config.DefaultConfigDataSourceClustersPath: "", - config.DefaultConfigDataShardingRulePath: "", - config.DefaultConfigDataTenantsPath: "", - } - - cfg *config.Configuration + mockConfData = map[config.PathKey]string{} + cfg *config.Configuration + mockPath = map[string]*config.PathInfo{} ) func doDataMock() { cfg, _ = config.Load(testdata.Path("fake_config.yaml")) - data, _ := json.Marshal(cfg) + for i := range cfg.Data.Tenants { + tenant := cfg.Data.Tenants[i] + mockPath[tenant.Name] = config.NewPathInfo(tenant.Name) - for k, v := range config.ConfigKeyMapping { - mockConfData[k] = string(gjson.GetBytes(data, v).String()) + data, _ := json.Marshal(tenant) + + for k, v := range mockPath[tenant.Name].ConfigKeyMapping { + mockConfData[k] = gjson.GetBytes(data, v).String() + } } } @@ -177,9 +177,6 @@ func buildOperate() *storeOperate { operate := &storeOperate{ groupName: "arana", client: newNacosClient(), - confMap: make(map[config.PathKey]string), - cfgLock: &sync.RWMutex{}, - lock: &sync.RWMutex{}, receivers: make(map[config.PathKey]*nacosWatcher), cancelList: []context.CancelFunc{}, } @@ -188,52 +185,42 @@ func buildOperate() *storeOperate { return operate } -func Test_loadDataFromServer(t *testing.T) { - operate := buildOperate() - defer operate.client.CloseClient() - - err := operate.loadDataFromServer() - assert.NoError(t, err, "") - - for k, v := range operate.confMap { - assert.Equalf(t, mockConfData[k], v, "%s should be equal", k) - } -} - func Test_watch(t *testing.T) { operate := buildOperate() defer operate.client.CloseClient() - err := operate.loadDataFromServer() - assert.NoError(t, err, "should be success") - - assert.NoError(t, err, "should be success") - newCfg, _ := config.Load(testdata.Path("fake_config.yaml")) - receiver, err := operate.Watch(config.DefaultConfigDataTenantsPath) + newCfg.Data.Tenants[0].Nodes = map[string]*config.Node{ + "node0": { + Name: "node0", + Host: "127.0.0.1", + Port: 3306, + Username: "arana", + Password: "arana", + Database: "mock_db", + }, + } + + receiver, err := operate.Watch(mockPath[newCfg.Data.Tenants[0].Name].DefaultConfigDataNodesPath) assert.NoError(t, err, "should be success") - data, err := json.Marshal(newCfg) + data, err := yaml.Marshal(newCfg.Data.Tenants[0].Nodes) assert.NoError(t, err, "should be marshal success") - for k, v := range config.ConfigKeyMapping { - if k == config.DefaultConfigDataTenantsPath { - operate.client.PublishConfig(vo.ConfigParam{ - DataId: string(config.DefaultConfigDataTenantsPath), - Content: string(gjson.GetBytes(data, v).String()), - }) - } - } + ok, err := operate.client.PublishConfig(vo.ConfigParam{ + DataId: buildNacosDataId(string(mockPath[newCfg.Data.Tenants[0].Name].DefaultConfigDataNodesPath)), + Content: string(data), + }) + assert.True(t, ok) + assert.NoError(t, err) t.Logf("new config val : %s", string(data)) ret := <-receiver - expectVal := string(gjson.GetBytes(data, config.ConfigKeyMapping[config.DefaultConfigDataTenantsPath]).String()) - - t.Logf("expect val : %s", expectVal) + t.Logf("expect val : %s", string(data)) t.Logf("acutal val : %s", string(ret)) - assert.Equal(t, expectVal, string(ret)) + assert.Equal(t, string(data), string(ret)) } diff --git a/pkg/util/match/slice_match.go b/pkg/util/match/slice_match.go new file mode 100644 index 00000000..062e6d57 --- /dev/null +++ b/pkg/util/match/slice_match.go @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package match + +import ( + "bytes" + "reflect" +) + +// Copy from +// isEmpty gets whether the specified object is considered empty or not. +func isEmpty(object interface{}) bool { + + // get nil case out of the way + if object == nil { + return true + } + + objValue := reflect.ValueOf(object) + + switch objValue.Kind() { + // collection types are empty when they have no element + case reflect.Array, reflect.Chan, reflect.Map, reflect.Slice: + return objValue.Len() == 0 + // pointers are empty if nil or if the value they point to is empty + case reflect.Ptr: + if objValue.IsNil() { + return true + } + deref := objValue.Elem().Interface() + return isEmpty(deref) + // for all other types, compare against the zero value + default: + zero := reflect.Zero(objValue.Type()) + return reflect.DeepEqual(object, zero.Interface()) + } +} + +func ElementsMatch(listA, listB interface{}) (ok bool) { + if isEmpty(listA) && isEmpty(listB) { + return true + } + + if !isList(listA) || !isList(listB) { + return false + } + + extraA, extraB := diffLists(listA, listB) + + if len(extraA) == 0 && len(extraB) == 0 { + return true + } + + return false +} + +// isList checks that the provided value is array or slice. +func isList(list interface{}, msgAndArgs ...interface{}) (ok bool) { + kind := reflect.TypeOf(list).Kind() + if kind != reflect.Array && kind != reflect.Slice { + return false + } + return true +} + +// diffLists diffs two arrays/slices and returns slices of elements that are only in A and only in B. +// If some element is present multiple times, each instance is counted separately (e.g. if something is 2x in A and +// 5x in B, it will be 0x in extraA and 3x in extraB). The order of items in both lists is ignored. +func diffLists(listA, listB interface{}) (extraA, extraB []interface{}) { + aValue := reflect.ValueOf(listA) + bValue := reflect.ValueOf(listB) + + aLen := aValue.Len() + bLen := bValue.Len() + + // Mark indexes in bValue that we already used + visited := make([]bool, bLen) + for i := 0; i < aLen; i++ { + element := aValue.Index(i).Interface() + found := false + for j := 0; j < bLen; j++ { + if visited[j] { + continue + } + if ObjectsAreEqual(bValue.Index(j).Interface(), element) { + visited[j] = true + found = true + break + } + } + if !found { + extraA = append(extraA, element) + } + } + + for j := 0; j < bLen; j++ { + if visited[j] { + continue + } + extraB = append(extraB, bValue.Index(j).Interface()) + } + + return +} + +func ObjectsAreEqual(expected, actual interface{}) bool { + if expected == nil || actual == nil { + return expected == actual + } + + exp, ok := expected.([]byte) + if !ok { + return reflect.DeepEqual(expected, actual) + } + + act, ok := actual.([]byte) + if !ok { + return false + } + if exp == nil || act == nil { + return exp == nil && act == nil + } + return bytes.Equal(exp, act) +} diff --git a/test/suite.go b/test/suite.go index 51e3f762..64c6501f 100644 --- a/test/suite.go +++ b/test/suite.go @@ -130,7 +130,7 @@ type MySuite struct { db *sql.DB dbSync sync.Once - tmpFile, bootstrapConfig, configPath, scriptPath string + tmpBootFile, tmpFile, bootstrapConfig, configPath, scriptPath string cases *Cases actualDataset *Message @@ -193,7 +193,8 @@ func (ms *MySuite) DB() *sql.DB { } func (ms *MySuite) SetupSuite() { - if ms.devMode { + devMode := os.Getenv("ARANA_DEBUG_MODE") + if ms.devMode || devMode == "true" || devMode == "on" || devMode == "1" { return } @@ -212,6 +213,8 @@ func (ms *MySuite) SetupSuite() { require.NoError(ms.T(), err) mysqlAddr := fmt.Sprintf("%s:%d", ms.container.Host, ms.container.Port) + // random port + ms.port = 13306 + int(rand2.Int31n(10000)) ms.T().Logf("====== mysql is listening on %s ======\n", mysqlAddr) ms.T().Logf("====== arana will listen on 127.0.0.1:%d ======\n", ms.port) @@ -220,8 +223,9 @@ func (ms *MySuite) SetupSuite() { ms.configPath = "../conf/config.yaml" } cfgPath := testdata.Path(ms.configPath) + bootPath := testdata.Path("../conf/bootstrap.yaml") - err = ms.createConfigFile(cfgPath, ms.container.Host, ms.container.Port) + err = ms.createConfigFile(cfgPath, bootPath, ms.container.Host, ms.container.Port) require.NoError(ms.T(), err) if ms.bootstrapConfig == "" { @@ -229,7 +233,7 @@ func (ms *MySuite) SetupSuite() { } go func() { _ = os.Setenv(constants.EnvConfigPath, ms.tmpFile) - start.Run(testdata.Path("../conf/bootstrap.yaml"), "") + start.Run(ms.tmpBootFile, "") }() // waiting for arana server started @@ -251,10 +255,12 @@ func (ms *MySuite) TearDownSuite() { if ms.db != nil { _ = ms.db.Close() } - _ = ms.container.Terminate(context.Background()) + if ms.container != nil { + _ = ms.container.Terminate(context.Background()) + } } -func (ms *MySuite) createConfigFile(cfgPath, host string, port int) error { +func (ms *MySuite) createConfigFile(cfgPath, bootCfgPath, host string, port int) error { b, err := ioutil.ReadFile(cfgPath) if err != nil { return err @@ -285,6 +291,34 @@ func (ms *MySuite) createConfigFile(cfgPath, host string, port int) error { return err } + // resolve host and ports + bb, err := ioutil.ReadFile(bootCfgPath) + if err != nil { + return err + } + + bootContent := strings. + NewReplacer( + "arana-mysql", host, + "13306", strconv.Itoa(ms.port), // arana port + "3306", strconv.Itoa(port), // mysql port + ). + Replace(string(bb)) + + // clone temp config file + bf, err := ioutil.TempFile("", "arana-boot.*.yaml") + if err != nil { + return err + } + defer func() { + _ = bf.Close() + }() + if _, err = bf.WriteString(bootContent); err != nil { + return err + } + + ms.tmpBootFile = bf.Name() + ms.T().Logf("====== generate temp arana bootstrap config: %s ======\n", ms.tmpBootFile) ms.tmpFile = f.Name() ms.T().Logf("====== generate temp arana config: %s ======\n", ms.tmpFile) diff --git a/testdata/fake_bootstrap.yaml b/testdata/fake_bootstrap.yaml index 6523bdc6..8b1752ff 100644 --- a/testdata/fake_bootstrap.yaml +++ b/testdata/fake_bootstrap.yaml @@ -15,66 +15,28 @@ # limitations under the License. # +kind: ConfigMap +apiVersion: "1.0" +listeners: + - protocol_type: mysql + server_version: 5.7.0 + socket_address: + address: 0.0.0.0 + port: 13306 config: name: file options: - content: |- - kind: ConfigMap - apiVersion: "1.0" - metadata: - name: arana-config - data: - listeners: - - protocol_type: mysql - server_version: 5.7.0 - socket_address: - address: 0.0.0.0 - port: 13306 - - tenants: - - name: arana - users: - - username: arana - password: "123456" - - clusters: - - name: employee - type: mysql - sql_max_limit: -1 - tenant: arana - parameters: - max_allowed_packet: 256M - groups: - - name: employee_0000 - nodes: - - name: node_1 - host: 127.0.0.1 - port: 3306 - username: root - password: "123456" - database: employees_0001 - weight: r10w10 - parameters: - - sharding_rule: - tables: - - name: employee.student - allow_full_scan: true - db_rules: - - column: student_id - type: modShard - expr: modShard(3) - tbl_rules: - - column: student_id - type: modShard - expr: modShard(8) - topology: - db_pattern: employee_0000 - tbl_pattern: student_${0000...0007} - attributes: - sqlMaxLimit: -1 - foo: bar # name: etcd # options: - # endpoints: "http://localhost:2382" \ No newline at end of file + # endpoints: "http://localhost:2379" + + # name: nacos + # options: + # endpoints: "localhost:8080" + # namespace: arana + # group: arana + # contextPath: /nacos + # scheme: http + # username: nacos + # password: nacos diff --git a/testdata/fake_config.yaml b/testdata/fake_config.yaml index 2bad53c6..eade8b61 100644 --- a/testdata/fake_config.yaml +++ b/testdata/fake_config.yaml @@ -20,96 +20,99 @@ apiVersion: "1.0" metadata: name: arana-config data: - listeners: - - protocol_type: mysql - server_version: 5.7.0 - socket_address: - address: 0.0.0.0 - port: 13306 - tenants: - name: arana users: + - username: root + password: "123456" - username: arana password: "123456" - - username: dksl + clusters: + - name: employees + type: mysql + sql_max_limit: -1 + tenant: arana + parameters: + max_allowed_packet: 256M + groups: + - name: employees_0000 + nodes: + - node0 + - node0_r_0 + - name: employees_0001 + nodes: + - node1 + - name: employees_0002 + nodes: + - node2 + - name: employees_0003 + nodes: + - node3 + sharding_rule: + tables: + - name: employees.student + allow_full_scan: true + sequence: + type: snowflake + option: + db_rules: + - column: uid + type: scriptExpr + expr: parseInt($value % 32 / 8) + tbl_rules: + - column: uid + type: scriptExpr + expr: $value % 32 + step: 32 + topology: + db_pattern: employees_${0000..0003} + tbl_pattern: student_${0000..0031} + attributes: + sqlMaxLimit: -1 + nodes: + node0: + name: node0 + host: arana-mysql + port: 3306 + username: root + password: "123456" + database: employees_0000 + weight: r10w10 + parameters: + node0_r_0: + name: node0_r_0 + host: arana-mysql + port: 3306 + username: root + password: "123456" + database: employees_0000_r + weight: r0w0 + parameters: + node1: + name: node1 + host: arana-mysql + port: 3306 + username: root + password: "123456" + database: employees_0001 + weight: r10w10 + parameters: + node2: + name: node2 + host: arana-mysql + port: 3306 + username: root + password: "123456" + database: employees_0002 + weight: r10w10 + parameters: + node3: + name: node3 + host: arana-mysql + port: 3306 + username: root password: "123456" + database: employees_0003 + weight: r10w10 + parameters: - clusters: - - name: employees - type: mysql - sql_max_limit: -1 - tenant: arana - parameters: - max_allowed_packet: 256M - groups: - - name: employees_0000 - nodes: - - name: node0 - host: arana-mysql - port: 3306 - username: root - password: "123456" - database: employees_0000 - weight: r10w10 - parameters: - - name: node0_r_0 - host: arana-mysql - port: 3306 - username: root - password: "123456" - database: employees_0000_r - weight: r0w0 - parameters: - - name: employees_0001 - nodes: - - name: node1 - host: arana-mysql - port: 3306 - username: root - password: "123456" - database: employees_0001 - weight: r10w10 - parameters: - - name: employees_0002 - nodes: - - name: node2 - host: arana-mysql - port: 3306 - username: root - password: "123456" - database: employees_0002 - weight: r10w10 - parameters: - - name: employees_0003 - nodes: - - name: node3 - host: arana-mysql - port: 3306 - username: root - password: "123456" - database: employees_0003 - weight: r10w10 - parameters: - sharding_rule: - tables: - - name: employees.student - allow_full_scan: true - sequence: - type: snowflake - option: - db_rules: - - column: uid - type: scriptExpr - expr: parseInt($value % 32 / 8) - tbl_rules: - - column: uid - type: scriptExpr - expr: $value % 32 - step: 32 - topology: - db_pattern: employees_${0000..0003} - tbl_pattern: student_${0000..0031} - attributes: - sqlMaxLimit: -1 - foo: bar