Skip to content

Commit

Permalink
Merge pull request #180 from viru-tech/env-credentials
Browse files Browse the repository at this point in the history
Read credentials from envs or command line
  • Loading branch information
yuzhichang committed Aug 4, 2023
2 parents 7660cc3 + d69680d commit ca3f6c7
Show file tree
Hide file tree
Showing 9 changed files with 98 additions and 28 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ jobs:
golangci-lint:
runs-on: ubuntu-latest
steps:
- uses: actions/setup-go@v3
- uses: actions/setup-go@v4
with:
go-version: 1.19
go-version: '1.20'
- uses: actions/checkout@v3
- name: golangci-lint
uses: golangci/golangci-lint-action@v3
with:
# Optional: version of golangci-lint to use in form of v1.2 or v1.2.3 or `latest` to use the latest version
version: v1.48.0
version: v1.51.2

# Optional: working directory, useful for monorepos
# working-directory: somedir
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ jobs:
steps:

- name: Set up Go 1.x
uses: actions/setup-go@v2
uses: actions/setup-go@v4
with:
go-version: 1.19
go-version: '1.20'
id: go

- name: Check out code into the Go module directory
Expand All @@ -31,7 +31,7 @@ jobs:
run: make build

- name: unittest
run: make unittest
run: make gotest

- name: benchtest
run: make benchtest
Expand Down
7 changes: 7 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ COPY --from=builder /app/kafka_gen_metric /usr/local/bin/kafka_gen_metric
# - LOG_LEVEL
# - LOG_PATHS
# - HTTP_PORT
# - HTTP_HOST
# - METRIC_PUSH_GATEWAY_ADDRS
# - PUSH_INTERVAL
# - LOCAL_CFG_FILE
Expand All @@ -29,6 +30,12 @@ COPY --from=builder /app/kafka_gen_metric /usr/local/bin/kafka_gen_metric
# - NACOS_GROUP
# - NACOS_DATAID
# - NACOS_SERVICE_NAME
# - CLICKHOUSE_USERNAME
# - CLICKHOUSE_PASSWORD
# - KAFKA_USERNAME
# - KAFKA_PASSWORD
# - KAFKA_GSSAPI_USERNAME
# - KAFKA_GSSAPI_PASSWORD
# See cmd/clickhouse_sinker/main.go for details.

ENTRYPOINT ["/usr/local/bin/clickhouse_sinker"]
7 changes: 7 additions & 0 deletions Dockerfile_goreleaser
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ ADD dist/nacos_publish_config_linux_amd64_v1/nacos_publish_config /usr/local/bin
# - LOG_LEVEL
# - LOG_PATHS
# - HTTP_PORT
# - HTTP_HOST
# - METRIC_PUSH_GATEWAY_ADDRS
# - PUSH_INTERVAL
# - LOCAL_CFG_FILE
Expand All @@ -20,6 +21,12 @@ ADD dist/nacos_publish_config_linux_amd64_v1/nacos_publish_config /usr/local/bin
# - NACOS_GROUP
# - NACOS_DATAID
# - NACOS_SERVICE_NAME
# - CLICKHOUSE_USERNAME
# - CLICKHOUSE_PASSWORD
# - KAFKA_USERNAME
# - KAFKA_PASSWORD
# - KAFKA_GSSAPI_USERNAME
# - KAFKA_GSSAPI_PASSWORD
# See cmd/clickhouse_sinker/main.go for details.

ENTRYPOINT ["/usr/local/bin/clickhouse_sinker"]
15 changes: 15 additions & 0 deletions cmd/clickhouse_sinker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ func initCmdOptions() {
util.EnvStringVar(&cmdOps.NacosDataID, "nacos-dataid")
util.EnvStringVar(&cmdOps.NacosServiceName, "nacos-service-name")

util.EnvStringVar(&cmdOps.ClickhouseUsername, "clickhouse-username")
util.EnvStringVar(&cmdOps.ClickhousePassword, "clickhouse-password")
util.EnvStringVar(&cmdOps.KafkaUsername, "kafka-username")
util.EnvStringVar(&cmdOps.KafkaPassword, "kafka-password")
util.EnvStringVar(&cmdOps.KafkaGSSAPIUsername, "kafka-gssapi-username")
util.EnvStringVar(&cmdOps.KafkaGSSAPIPassword, "kafka-gssapi-password")

// 3. Replace options with the corresponding CLI parameter if present.
flag.BoolVar(&cmdOps.ShowVer, "v", cmdOps.ShowVer, "show build version and quit")
flag.StringVar(&cmdOps.LogLevel, "log-level", cmdOps.LogLevel, "one of debug, info, warn, error, dpanic, panic, fatal")
Expand All @@ -98,6 +105,14 @@ func initCmdOptions() {
flag.StringVar(&cmdOps.NacosGroup, "nacos-group", cmdOps.NacosGroup, `nacos group name. Empty string doesn't work!`)
flag.StringVar(&cmdOps.NacosDataID, "nacos-dataid", cmdOps.NacosDataID, "nacos dataid")
flag.StringVar(&cmdOps.NacosServiceName, "nacos-service-name", cmdOps.NacosServiceName, "nacos service name")

flag.StringVar(&cmdOps.ClickhouseUsername, "clickhouse-username", cmdOps.ClickhouseUsername, "clickhouse username")
flag.StringVar(&cmdOps.ClickhousePassword, "clickhouse-password", cmdOps.ClickhousePassword, "clickhouse password")
flag.StringVar(&cmdOps.KafkaUsername, "kafka-username", cmdOps.KafkaUsername, "kafka username")
flag.StringVar(&cmdOps.KafkaPassword, "kafka-password", cmdOps.KafkaPassword, "kafka password")
flag.StringVar(&cmdOps.KafkaGSSAPIUsername, "kafka-gssapi-username", cmdOps.KafkaGSSAPIUsername, "kafka GSSAPI username")
flag.StringVar(&cmdOps.KafkaGSSAPIPassword, "kafka-gssapi-password", cmdOps.KafkaGSSAPIPassword, "kafka GSSAPI password")

flag.Parse()
}

Expand Down
12 changes: 11 additions & 1 deletion cmd/nacos_publish_config/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ var (
localCfgFile = flag.String("local-cfg-file", "/etc/clickhouse_sinker.hjson", "local config file")
replicas = flag.Int("replicas", 1, "replicate each task to multiple ones with the same config except task name, consumer group and table name")
maxOpenConns = flag.Int("max-open-conns", 0, "max open connections per shard")

clickhouseUsername = flag.String("clickhouse-username", "", "clickhouse username")
clickhousePassword = flag.String("clickhouse-password", "", "clickhouse password")
kafkaUsername = flag.String("kafka-username", "", "kafka username")
kafkaPassword = flag.String("kafka-password", "", "kafka password")
)

// Empty is not valid namespaceID
Expand Down Expand Up @@ -74,7 +79,12 @@ func PublishSinkerConfig() {
return
}

if err = cfg.Normallize(false, ""); err != nil {
if err = cfg.Normallize(false, "", util.Credentials{
ClickhouseUsername: *clickhouseUsername,
ClickhousePassword: *clickhousePassword,
KafkaUsername: *kafkaUsername,
KafkaPassword: *kafkaPassword,
}); err != nil {
util.Logger.Fatal("cfg.Normallize failed", zap.Error(err))
return
}
Expand Down
39 changes: 29 additions & 10 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ type KafkaConfig struct {
ClientCertFile string // Required for client authentication. It's client cert.pem.
ClientKeyFile string // Required if and only if ClientCertFile is present. It's client key.pem.

TrustStoreLocation string //JKS format of CA certificate, used to extract CA cert.pem.
TrustStoreLocation string // JKS format of CA certificate, used to extract CA cert.pem.
TrustStorePassword string
KeystoreLocation string //JKS format of client certificate and key, used to extrace client cert.pem and key.pem.
KeystoreLocation string // JKS format of client certificate and key, used to extrace client cert.pem and key.pem.
KeystorePassword string
EndpIdentAlgo string
}
//simplified sarama.Config.Net.SASL to only support SASL/PLAIN and SASL/GSSAPI(Kerberos)
// simplified sarama.Config.Net.SASL to only support SASL/PLAIN and SASL/GSSAPI(Kerberos)
Sasl struct {
// Whether or not to use SASL authentication when connecting to the broker
// (defaults to false).
Expand All @@ -72,7 +72,7 @@ type KafkaConfig struct {
// Password for SASL/PLAIN or SASL/SCRAM authentication
Password string
GSSAPI struct {
AuthType int //1. KRB5_USER_AUTH, 2. KRB5_KEYTAB_AUTH
AuthType int // 1. KRB5_USER_AUTH, 2. KRB5_KEYTAB_AUTH
KeyTabPath string
KerberosConfigPath string
ServiceName string
Expand All @@ -98,7 +98,7 @@ type ClickHouseConfig struct {
// Whether skip verify clickhouse-server cert
InsecureSkipVerify bool

RetryTimes int //<=0 means retry infinitely
RetryTimes int // <=0 means retry infinitely
MaxOpenConns int
}

Expand Down Expand Up @@ -172,8 +172,8 @@ type Assignment struct {
}

const (
MaxBufferSize = 1 << 20 //1048576
defaultBufferSize = 1 << 18 //262144
MaxBufferSize = 1 << 20 // 1048576
defaultBufferSize = 1 << 18 // 262144
maxFlushInterval = 600
defaultFlushInterval = 10
defaultTimeZone = "Local"
Expand Down Expand Up @@ -202,7 +202,26 @@ func ParseLocalCfgFile(cfgPath string) (cfg *Config, err error) {
}

// Normalize and validate configuration
func (cfg *Config) Normallize(constructGroup bool, httpAddr string) (err error) {
func (cfg *Config) Normallize(constructGroup bool, httpAddr string, cred util.Credentials) (err error) {
if cred.ClickhouseUsername != "" {
cfg.Clickhouse.Username = cred.ClickhouseUsername
}
if cred.ClickhousePassword != "" {
cfg.Clickhouse.Password = cred.ClickhousePassword
}
if cred.KafkaUsername != "" {
cfg.Kafka.Sasl.Username = cred.KafkaUsername
}
if cred.KafkaPassword != "" {
cfg.Kafka.Sasl.Password = cred.KafkaPassword
}
if cred.KafkaGSSAPIUsername != "" {
cfg.Kafka.Sasl.GSSAPI.Username = cred.KafkaGSSAPIUsername
}
if cred.KafkaGSSAPIPassword != "" {
cfg.Kafka.Sasl.GSSAPI.Password = cred.KafkaGSSAPIPassword
}

if len(cfg.Clickhouse.Hosts) == 0 || cfg.Kafka.Brokers == "" {
err = errors.Newf("invalid configuration, Clickhouse or Kafka section is missing!")
return
Expand Down Expand Up @@ -388,12 +407,12 @@ func (cfg *Config) convertKfkSecurity() {
if strings.Contains(cfg.Kafka.Sasl.Mechanism, "GSSAPI") {
// GSSAPI
if configMap["useKeyTab"] != "true" {
//Username and password
// Username and password
cfg.Kafka.Sasl.GSSAPI.AuthType = 1
cfg.Kafka.Sasl.GSSAPI.Username = configMap["username"]
cfg.Kafka.Sasl.GSSAPI.Password = configMap["password"]
} else {
//Keytab
// Keytab
cfg.Kafka.Sasl.GSSAPI.AuthType = 2
cfg.Kafka.Sasl.GSSAPI.KeyTabPath = configMap["keyTab"]
if principal, ok := configMap["principal"]; ok {
Expand Down
23 changes: 12 additions & 11 deletions task/sinker.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import (
)

var (
createTableSQL = `CREATE TABLE IF NOT EXISTS %s as %s.%s ENGINE=Merge('%s', '%s')`
createTableSQL = `CREATE TABLE IF NOT EXISTS %s AS %s.%s ENGINE=Merge('%s', '%s')`
dropTableSQL = `DROP TABLE IF EXISTS %s `
countSeriesSQL = `WITH (SELECT max(timestamp) FROM %s) AS m
SELECT count() FROM %s FINAL WHERE __series_id GLOBAL IN (
Expand Down Expand Up @@ -111,20 +111,21 @@ func (s *Sinker) Run() {
}

if s.rcm == nil {
if _, err = os.Stat(s.cmdOps.LocalCfgFile); err == nil {
if newCfg, err = config.ParseLocalCfgFile(s.cmdOps.LocalCfgFile); err != nil {
util.Logger.Fatal("config.ParseLocalCfgFile failed", zap.Error(err))
return
}
} else {
if _, err := os.Stat(s.cmdOps.LocalCfgFile); err != nil {
util.Logger.Fatal("expect --local-cfg-file or --nacos-dataid")
return
}

if newCfg, err = config.ParseLocalCfgFile(s.cmdOps.LocalCfgFile); err != nil {
util.Logger.Fatal("config.ParseLocalCfgFile failed", zap.Error(err))
return
}

ha := ""
if s.cmdOps.NacosServiceName != "" {
ha = s.httpAddr
}
if err = newCfg.Normallize(true, ha); err != nil {
if err = newCfg.Normallize(true, ha, s.cmdOps.Credentials); err != nil {
util.Logger.Fatal("newCfg.Normallize failed", zap.Error(err))
return
}
Expand Down Expand Up @@ -191,7 +192,7 @@ func (s *Sinker) Run() {
if s.cmdOps.NacosServiceName != "" {
ha = s.httpAddr
}
if err = newCfg.Normallize(true, ha); err != nil {
if err = newCfg.Normallize(true, ha, s.cmdOps.Credentials); err != nil {
util.Logger.Error("newCfg.Normallize failed", zap.Error(err))
continue
}
Expand Down Expand Up @@ -302,7 +303,7 @@ func (s *Sinker) applyConfig(newCfg *config.Config) (err error) {
}

func (s *Sinker) applyFirstConfig(newCfg *config.Config) (err error) {
util.Logger.Info("going to apply the first config", zap.Reflect("config", newCfg))
util.Logger.Info("going to apply the first config", zap.Any("config", newCfg))
// 1. Initialize clickhouse connections
chCfg := &newCfg.Clickhouse
if err = pool.InitClusterConn(chCfg.Hosts, chCfg.Port, chCfg.DB, chCfg.Username, chCfg.Password,
Expand Down Expand Up @@ -338,7 +339,7 @@ func (s *Sinker) applyFirstConfig(newCfg *config.Config) (err error) {
}

func (s *Sinker) applyAnotherConfig(newCfg *config.Config) (err error) {
util.Logger.Info("going to apply another config", zap.Int("number", s.numCfg), zap.Reflect("config", newCfg))
util.Logger.Info("going to apply another config", zap.Int("number", s.numCfg), zap.Any("config", newCfg))
if !reflect.DeepEqual(newCfg.Kafka, s.curCfg.Kafka) || !reflect.DeepEqual(newCfg.Clickhouse, s.curCfg.Clickhouse) {
// 1. Stop tasks gracefully. Wait until all flying data be processed (write to CH and commit to Kafka).
s.stopAllTasks()
Expand Down
11 changes: 11 additions & 0 deletions util/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,17 @@ type CmdOptions struct {
NacosPassword string
NacosDataID string
NacosServiceName string // participate in assignment management if not empty

Credentials
}

type Credentials struct {
ClickhouseUsername string
ClickhousePassword string
KafkaUsername string
KafkaPassword string
KafkaGSSAPIUsername string
KafkaGSSAPIPassword string
}

// StringContains check if contains string in array
Expand Down

0 comments on commit ca3f6c7

Please sign in to comment.