Skip to content

Commit

Permalink
refactor: refact config to support deploy as gateway (#231)
Browse files Browse the repository at this point in the history
  • Loading branch information
dk-lockdown committed Aug 8, 2022
1 parent 146f75d commit f5f49bd
Show file tree
Hide file tree
Showing 58 changed files with 756 additions and 604 deletions.
141 changes: 73 additions & 68 deletions cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,82 +75,87 @@ var (
Run: func(cmd *cobra.Command, args []string) {
//h := initHolmes()
//h.Start()
conf := config.Load(configPath)

for _, filterConf := range conf.Filters {
factory := filter.GetFilterFactory(filterConf.Kind)
if factory == nil {
log.Fatalf("there is no filter factory for filter: %s", filterConf.Kind)
}
f, err := factory.NewFilter(filterConf.Config)
if err != nil {
log.Fatal(errors.WithMessagef(err, "failed to create filter: %s", filterConf.Name))
}
filter.RegisterFilter(filterConf.Name, f)
conf, err := config.Load(configPath)
if err != nil {
log.Fatal(err)
}

resource.InitDBManager(conf.DataSources, func(dbName, dsn string) pools.Factory {
collector, err := driver.NewConnector(dbName, dsn)
if err != nil {
log.Fatal(err)
}
return collector.NewBackendConnection
})

executors := make(map[string]proto.Executor)
for _, executorConf := range conf.Executors {
if executorConf.Mode == config.SDB {
executor, err := executor.NewSingleDBExecutor(executorConf)
if err != nil {
log.Fatal(err)
dbpack := server.NewServer()
for appid, dbpackConf := range conf.AppConfig {
for _, filterConf := range dbpackConf.Filters {
factory := filter.GetFilterFactory(filterConf.Kind)
if factory == nil {
log.Fatalf("there is no filter factory for filter: %s", filterConf.Kind)
}
executors[executorConf.Name] = executor
}
if executorConf.Mode == config.RWS {
executor, err := executor.NewReadWriteSplittingExecutor(executorConf)
f, err := factory.NewFilter(appid, filterConf.Config)
if err != nil {
log.Fatal(err)
log.Fatal(errors.Wrapf(err, "failed to create filter: %s", filterConf.Name))
}
executors[executorConf.Name] = executor
filter.RegisterFilter(filterConf.Name, f)
}
if executorConf.Mode == config.SHD {
executor, err := executor.NewShardingExecutor(executorConf)

resource.RegisterDBManager(appid, dbpackConf.DataSources, func(dbName, dsn string) pools.Factory {
collector, err := driver.NewConnector(dbName, dsn)
if err != nil {
log.Fatal(err)
}
executors[executorConf.Name] = executor
}
}

if conf.DistributedTransaction != nil {
dbpackHttp.DistributedTransactionEnabled = true
dt.InitDistributedTransactionManager(conf.DistributedTransaction)
}

dbpackHttp.Listeners = conf.Listeners
dbpack := server.NewServer()
for _, listenerConf := range conf.Listeners {
switch listenerConf.ProtocolType {
case config.Mysql:
listener, err := listener.NewMysqlListener(listenerConf)
if err != nil {
log.Fatalf("create mysql listener failed %v", err)
return collector.NewBackendConnection
})

executors := make(map[string]proto.Executor)
for _, executorConf := range dbpackConf.Executors {
if executorConf.Mode == config.SDB {
executor, err := executor.NewSingleDBExecutor(executorConf)
if err != nil {
log.Fatal(err)
}
executors[executorConf.Name] = executor
}
dbListener := listener.(proto.DBListener)
executor := executors[listenerConf.Executor]
if executor == nil {
log.Fatalf("executor: %s is not exists for mysql listener", listenerConf.Executor)
if executorConf.Mode == config.RWS {
executor, err := executor.NewReadWriteSplittingExecutor(executorConf)
if err != nil {
log.Fatal(err)
}
executors[executorConf.Name] = executor
}
dbListener.SetExecutor(executor)
dbpack.AddListener(dbListener)
case config.Http:
listener, err := listener.NewHttpListener(listenerConf)
if err != nil {
log.Fatalf("create http listener failed %v", err)
if executorConf.Mode == config.SHD {
executor, err := executor.NewShardingExecutor(executorConf)
if err != nil {
log.Fatal(err)
}
executors[executorConf.Name] = executor
}
}

dbpackHttp.Listeners = dbpackConf.Listeners
for _, listenerConf := range dbpackConf.Listeners {
switch listenerConf.ProtocolType {
case config.Mysql:
listener, err := listener.NewMysqlListener(listenerConf)
if err != nil {
log.Fatalf("create mysql listener failed %v", err)
}
dbListener := listener.(proto.DBListener)
executor := executors[listenerConf.Executor]
if executor == nil {
log.Fatalf("executor: %s is not exists for mysql listener", listenerConf.Executor)
}
dbListener.SetExecutor(executor)
dbpack.AddListener(dbListener)
case config.Http:
listener, err := listener.NewHttpListener(listenerConf)
if err != nil {
log.Fatalf("create http listener failed %v", err)
}
dbpack.AddListener(listener)
default:
log.Fatalf("unsupported %v listener protocol type", listenerConf.ProtocolType)
}
dbpack.AddListener(listener)
default:
log.Fatalf("unsupported %v listener protocol type", listenerConf.ProtocolType)
}

if dbpackConf.DistributedTransaction != nil {
dbpackHttp.DistributedTransactionEnabled = true
dt.RegisterTransactionManager(dbpackConf.DistributedTransaction)
}
}

Expand All @@ -173,8 +178,8 @@ var (
// default listen at 18888
var lis net.Listener
var lisErr error
if conf.HTTPListenPort != nil {
lis, lisErr = net.Listen("tcp4", fmt.Sprintf(":%d", *conf.HTTPListenPort))
if conf.ProbePort > 0 {
lis, lisErr = net.Listen("tcp4", fmt.Sprintf(":%d", conf.ProbePort))
} else {
lis, lisErr = net.Listen("tcp4", fmt.Sprintf(":%d", defaultHTTPListenPort))
}
Expand All @@ -185,8 +190,8 @@ var (

go initServer(ctx, lis)

if conf.Trace != nil {
go initTracing(ctx, conf.Trace.JaegerEndpoint)
if conf.Tracer != nil {
go initTracing(ctx, conf.Tracer.JaegerEndpoint)
}

dbpack.Start(ctx)
Expand Down
125 changes: 63 additions & 62 deletions docker/conf/config_rws.yaml
Original file line number Diff line number Diff line change
@@ -1,67 +1,68 @@
listeners:
- protocol_type: mysql
socket_address:
address: 0.0.0.0
port: 13306
config:
users:
dksl: "123456"
server_version: "8.0.27"
executor: redirect
probe_port: 9999
termination_drain_duration: 3s
app_config:
svc:
distributed_transaction:
retry_dead_threshold: 130000
rollback_retry_timeout_unlock_enable: true
etcd_config:
endpoints:
- etcd:2379

executors:
- name: redirect
mode: rws
config:
load_balance_algorithm: RandomWeight
data_sources:
- name: employees-master
weight: r0w10
- name: employees-slave
weight: r10w0
filters:
- cryptoFilter

data_source_cluster:
- name: employees-master
capacity: 10
max_capacity: 20
idle_timeout: 60s
dsn: root:123456@tcp(dbpack-mysql1:3306)/employees?timeout=1s&readTimeout=1s&writeTimeout=1s&parseTime=true&loc=Local&charset=utf8mb4,utf8
ping_interval: 20s
ping_times_for_change_status: 3
filters:
- mysqlDTFilter
listeners:
- protocol_type: mysql
socket_address:
address: 0.0.0.0
port: 13306
config:
users:
dksl: "123456"
server_version: "8.0.27"
executor: redirect

- name: employees-slave
capacity: 10
max_capacity: 20
idle_timeout: 60s
dsn: root:123456@tcp(dbpack-mysql2:3306)/employees?timeout=60s&readTimeout=60s&writeTimeout=60s&parseTime=true&loc=Local&charset=utf8mb4,utf8
ping_interval: 20s
ping_times_for_change_status: 3
executors:
- name: redirect
mode: rws
config:
load_balance_algorithm: RandomWeight
data_sources:
- name: employees-master
weight: r0w10
- name: employees-slave
weight: r10w0
filters:
- cryptoFilter

filters:
- name: mysqlDTFilter
kind: MysqlDistributedTransaction
conf:
appid: svc
lock_retry_interval: 50ms
lock_retry_times: 30
- name: cryptoFilter
kind: CryptoFilter
conf:
column_crypto_list:
- table: departments
columns: ["dept_name"]
aeskey: 123456789abcdefg
data_source_cluster:
- name: employees-master
capacity: 10
max_capacity: 20
idle_timeout: 60s
dsn: root:123456@tcp(dbpack-mysql1:3306)/employees?timeout=1s&readTimeout=1s&writeTimeout=1s&parseTime=true&loc=Local&charset=utf8mb4,utf8
ping_interval: 20s
ping_times_for_change_status: 3
filters:
- mysqlDTFilter

distributed_transaction:
appid: svc
retry_dead_threshold: 130000
rollback_retry_timeout_unlock_enable: true
etcd_config:
endpoints:
- etcd:2379
- name: employees-slave
capacity: 10
max_capacity: 20
idle_timeout: 60s
dsn: root:123456@tcp(dbpack-mysql2:3306)/employees?timeout=60s&readTimeout=60s&writeTimeout=60s&parseTime=true&loc=Local&charset=utf8mb4,utf8
ping_interval: 20s
ping_times_for_change_status: 3

http_listen_port: 9999
filters:
- name: mysqlDTFilter
kind: MysqlDistributedTransaction
conf:
appid: svc
lock_retry_interval: 50ms
lock_retry_times: 30
- name: cryptoFilter
kind: CryptoFilter
conf:
column_crypto_list:
- table: departments
columns: [ "dept_name" ]
aeskey: 123456789abcdefg

0 comments on commit f5f49bd

Please sign in to comment.