From 36c983d2b9da7349705d612e98e8105313f612fd Mon Sep 17 00:00:00 2001 From: Vladimir Garvardt Date: Fri, 31 Mar 2017 16:30:57 +0200 Subject: [PATCH] PLAT-301 Migrated to viper for config files and cobra for app flags --- README.md | 50 ++++++++------ ci/assets/config.yml | 20 +++--- ci/assets/pipes.yml | 20 +++--- cmd/kandalf/app.go | 82 +++++++++++++++++++++++ cmd/kandalf/main.go | 99 ++++++--------------------- doc.go | 2 +- glide.lock | 46 ++++++++++++- glide.yaml | 2 + pkg/config/config.go | 126 +++++++++++++++-------------------- pkg/config/config_test.go | 74 ++++---------------- pkg/config/pipes.go | 35 ++++------ pkg/config/pipes_test.go | 49 +++++--------- pkg/workers/bridge_worker.go | 6 +- 13 files changed, 293 insertions(+), 318 deletions(-) create mode 100644 cmd/kandalf/app.go diff --git a/README.md b/README.md index 3da6291..a755b0e 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,9 @@ Service is written in Go language and can be build with go compiler of version 1 ### Application configuration -Application is configured with environment variables or YAML config file. +Application is configured with environment variables or config files of different formats - JSON, TOML, YAML, HCL, and Java properties. + +By default it tries to read config file from `/etc/kandalf/conf/config.` and `./config.`. You can change the path using `-c ` or `--config ` application parameters. If file is not found config loader does fallback to reading config values from environment variables. #### Environment variables @@ -35,41 +37,47 @@ Application is configured with environment variables or YAML config file. * `WORKER_CACHE_SIZE` - Max messages number that we store in memory before trying to publish to Kafka (_default_: `10`) * `WORKER_CACHE_FLUSH_TIMEOUT` - Max amount of time we store messages in memory before trying to publish to Kafka, must be valid [duration string](https://golang.org/pkg/time/#ParseDuration) (_default_: `5s`) * `WORKER_STORAGE_READ_TIMEOUT` - Timeout between attempts of reading persisted messages from storage, to publish them to Kafka, must be at least 2x greater than `WORKER_CYCLE_TIMEOUT`, must be valid [duration string](https://golang.org/pkg/time/#ParseDuration) (_default_: `10s`) +* `WORKER_STORAGE_MAX_ERRORS` - Max storage read errors in a row before worker stops trying reading in current read cycle. Next read cycle will be in `WORKER_STORAGE_READ_TIMEOUT` interval. (_default_: `10`) -#### Config file +#### Config file (YAML example) -You can use `-c ` parameter to load application settings from YAML file. Config should have the following structure: +Config should have the following structure: ```yaml -log_level: "info" # same as env LOG_LEVEL -rabbit_dsn: "amqp://user:password@rmq" # same as env RABBIT_DSN -storage_dsn: "redis://redis.local/?key=storage:key" # same as env STORAGE_DSN +logLevel: "info" # same as env LOG_LEVEL +rabbitDSN: "amqp://user:password@rmq" # same as env RABBIT_DSN +storageDSN: "redis://redis.local/?key=storage:key" # same as env STORAGE_DSN kafka: - brokers: # same as env KAFKA_BROKERS + brokers: # same as env KAFKA_BROKERS - "192.0.0.1:9092" - "192.0.0.2:9092" - max_retry: 5 # same as env KAFKA_MAX_RETRY - pipes_config: "/etc/kandalf/conf/pipes.yml" # same as env KAFKA_PIPES_CONFIG + maxRetry: 5 # same as env KAFKA_MAX_RETRY + pipesConfig: "/etc/kandalf/conf/pipes.yml" # same as env KAFKA_PIPES_CONFIG stats: - dsn: "statsd.local:8125" # same as env STATS_DSN - prefix: "kandalf" # same as env STATS_PREFIX + dsn: "statsd.local:8125" # same as env STATS_DSN + prefix: "kandalf" # same as env STATS_PREFIX worker: - cycle_timeout: "2s" # same as env WORKER_CYCLE_TIMEOUT - cache_size: 10 # same as env WORKER_CACHE_SIZE - cache_flush_timeout: "5s" # same as env WORKER_CACHE_FLUSH_TIMEOUT - storage_read_timeout: "10s" # same as env WORKER_STORAGE_READ_TIMEOUT + cycleTimeout: "2s" # same as env WORKER_CYCLE_TIMEOUT + cacheSize: 10 # same as env WORKER_CACHE_SIZE + cacheFlushTimeout: "5s" # same as env WORKER_CACHE_FLUSH_TIMEOUT + storageReadTimeout: "10s" # same as env WORKER_STORAGE_READ_TIMEOUT + storageMaxErrors: 10 # same as env WORKER_STORAGE_MAX_ERRORS ``` +You can find sample config file in [ci/assets/config.yml](./ci/assets/config.yml). + ### Pipes configuration -The rules, defining which messages should be send to which Kafka topics, are defined in "[pipes.yml](./ci/assets/pipes.yml)" and is called "pipes". +The rules, defining which messages should be send to which Kafka topics, are defined in Kafka Pipes Config file and are called "pipes". Each pipe has the following structure: -Each pipe should contain following keys: +```yaml +- kafkaTopic: "loyalty" # name of the topic in Kafka where message will be sent + rabbitExchangeName: "customers" # name of the exchange in RabbitMQ + rabbitRoutingKey: "badge.received" # routing key for exchange + rabbitQueueName: "kandalf-customers-badge.received" # the name of RabbitMQ queue to read messages from +``` -* **kafka_topic** _str_ — name of the topic in Kafka where message will be sent; -* **rabbitmq_exchange_name** _str_ — name of the exchange in RabbitMQ; -* **rabbitmq_routing_key** _str_ — routing key for exchange; -* **rabbitmq_queue_name** _str_ — the name of RabbitMQ queue to read messages from. +You can find sample Kafka Pipes Config file in [ci/assets/pipes.yml](./ci/assets/pipes.yml). ## How to build a binary on a local machine diff --git a/ci/assets/config.yml b/ci/assets/config.yml index 0598539..97d3561 100644 --- a/ci/assets/config.yml +++ b/ci/assets/config.yml @@ -1,21 +1,21 @@ # Available options are: "debug", "info", "warning", "error", "fatal" and "panic" -log_level: "info" -rabbit_dsn: "amqp://user:password@rmq" -storage_dsn: "redis://redis.local/?key=storage:key" +logLevel: "info" +rabbitDSN: "amqp://user:password@rmq" +storageDSN: "redis://redis.local/?key=storage:key" kafka: brokers: - "192.0.0.1:9092" - "192.0.0.2:9092" # The total number of times to retry sending a message. # Should be similar to the `message.send.max.retries` setting of the JVM producer. - max_retry: 5 - pipes_config: "/etc/kandalf/conf/pipes.yml" + maxRetry: 5 + pipesConfig: "/etc/kandalf/conf/pipes.yml" stats: dsn: "statsd.local:8125" prefix: "kandalf" worker: - cycle_timeout: "2s" - cache_size: 10 - cache_flush_timeout: "5s" - storage_read_timeout: "10s" - storage_max_errors: 10 + cycleTimeout: "2s" + cacheSize: 10 + cacheFlushTimeout: "5s" + storageReadTimeout: "10s" + storageMaxErrors: 10 diff --git a/ci/assets/pipes.yml b/ci/assets/pipes.yml index 2f8b82d..acb370d 100644 --- a/ci/assets/pipes.yml +++ b/ci/assets/pipes.yml @@ -1,16 +1,14 @@ -- +pipes: # Message from that RabbitMQ exchange - rabbitmq_exchange_name: "customers" +- rabbitExchangeName: "customers" # With that routing key - rabbitmq_routing_key: "order.created" + rabbitRoutingKey: "order.created" # Will be placed to that kafka topic - kafka_topic: "new-orders" + kafkaTopic: "new-orders" # The queue name can be whatever you want, just keep it unique within pipes. - # If you launch multiple kandalf instances they all will consume messages from that queue. - rabbitmq_queue_name: "kandalf-customers-order.created" + rabbitQueueName: "kandalf-customers-order.created" -- - kafka_topic: "loyalty" - rabbitmq_exchange_name: "customers" - rabbitmq_routing_key: "badge.received" - rabbitmq_queue_name: "kandalf-customers-badge.received" +- kafkaTopic: "loyalty" + rabbitExchangeName: "customers" + rabbitRoutingKey: "badge.received" + rabbitQueueName: "kandalf-customers-badge.received" diff --git a/cmd/kandalf/app.go b/cmd/kandalf/app.go new file mode 100644 index 0000000..15ac6ad --- /dev/null +++ b/cmd/kandalf/app.go @@ -0,0 +1,82 @@ +package main + +import ( + "net/url" + "strings" + + log "github.com/Sirupsen/logrus" + "github.com/hellofresh/kandalf/pkg/amqp" + "github.com/hellofresh/kandalf/pkg/config" + "github.com/hellofresh/kandalf/pkg/kafka" + "github.com/hellofresh/kandalf/pkg/storage" + "github.com/hellofresh/kandalf/pkg/workers" + "github.com/hellofresh/stats-go" + "github.com/spf13/cobra" +) + +// RunApp is main application bootstrap and runner +func RunApp(cmd *cobra.Command, args []string) { + printVersion, err := cmd.Flags().GetBool(flagVersion) + failOnError(err, "Failed to read version flag") + if printVersion { + cmd.Println(cmd.Short) + return + } + + log.WithField("version", version).Info("Kandalf starting...") + + globalConfig, err := config.Load(configPath) + failOnError(err, "Failed to load application configuration") + + level, err := log.ParseLevel(strings.ToLower(globalConfig.LogLevel)) + failOnError(err, "Failed to get log level") + log.SetLevel(level) + + pipesList, err := config.LoadPipesFromFile(globalConfig.Kafka.PipesConfig) + failOnError(err, "Failed to load pipes config") + + statsClient := stats.NewStatsdStatsClient(globalConfig.Stats.DSN, globalConfig.Stats.Prefix) + defer func() { + if err := statsClient.Close(); err != nil { + log.WithError(err).Error("Got error on closing stats client") + } + }() + + storageURL, err := url.Parse(globalConfig.StorageDSN) + failOnError(err, "Failed to parse Storage DSN") + + persistentStorage, err := storage.NewPersistentStorage(storageURL) + failOnError(err, "Failed to establish Redis connection") + // Do not close storage here as it is required in Worker close to store unhandled messages + + producer, err := kafka.NewProducer(globalConfig.Kafka, statsClient) + failOnError(err, "Failed to establish Kafka connection") + defer func() { + if err := producer.Close(); err != nil { + log.WithError(err).Error("Got error on closing kafka producer") + } + }() + + worker, err := workers.NewBridgeWorker(globalConfig.Worker, persistentStorage, producer, statsClient) + defer func() { + if err := worker.Close(); err != nil { + log.WithError(err).Error("Got error on closing persistent storage") + } + }() + + queuesHandler := amqp.NewQueuesHandler(pipesList, worker.MessageHandler, statsClient) + amqpConnection, err := amqp.NewConnection(globalConfig.RabbitDSN, queuesHandler) + failOnError(err, "Failed to establish initial connection to AMQP") + defer func() { + if err := amqpConnection.Close(); err != nil { + log.WithError(err).Error("Got error on closing AMQP connection") + } + }() + + forever := make(chan bool) + + worker.Go(forever) + + log.Infof("[*] Waiting for users. To exit press CTRL+C") + <-forever +} diff --git a/cmd/kandalf/main.go b/cmd/kandalf/main.go index 8963ed8..63855e7 100644 --- a/cmd/kandalf/main.go +++ b/cmd/kandalf/main.go @@ -1,18 +1,15 @@ package main import ( - "flag" - "net/url" - "os" - "strings" - log "github.com/Sirupsen/logrus" - "github.com/hellofresh/kandalf/pkg/amqp" - "github.com/hellofresh/kandalf/pkg/config" - "github.com/hellofresh/kandalf/pkg/kafka" - "github.com/hellofresh/kandalf/pkg/storage" - "github.com/hellofresh/kandalf/pkg/workers" - "github.com/hellofresh/stats-go" + "github.com/spf13/cobra" +) + +const flagVersion = "version" + +var ( + version string + configPath string ) func failOnError(err error, msg string) { @@ -21,75 +18,19 @@ func failOnError(err error, msg string) { } } -var version string - func main() { - var ( - globalConfig config.GlobalConfig - err error - ) - - flagSet := flag.NewFlagSet("Kandalf v"+version, flag.ExitOnError) - configPath := flagSet.String("c", "", "Path to config file, set if you want to load settings from YAML file, otherwise settings are loaded from environment variables") - flagSet.Parse(os.Args[1:]) - - if *configPath != "" { - globalConfig, err = config.LoadConfigFromFile(*configPath) - failOnError(err, "Failed to load config from file") - } else { - globalConfig, err = config.LoadConfigFromEnv() - failOnError(err, "Failed to load config from environment") + versionString := "Kandalf v" + version + var RootCmd = &cobra.Command{ + Use: "kandalf", + Short: versionString, + Long: versionString + `. RabbitMQ to Kafka bridge. + +Complete documentation is available at https://github.com/hellofresh/kandalf`, + Run: RunApp, } + RootCmd.Flags().StringVarP(&configPath, "config", "c", "", "Source of a configuration file") + RootCmd.Flags().BoolP(flagVersion, "v", false, "Print application version") - level, err := log.ParseLevel(strings.ToLower(globalConfig.LogLevel)) - failOnError(err, "Failed to get log level") - log.SetLevel(level) - - pipesList, err := config.LoadPipesFromFile(globalConfig.Kafka.PipesConfig) - failOnError(err, "Failed to load pipes config") - - statsClient := stats.NewStatsdStatsClient(globalConfig.Stats.DSN, globalConfig.Stats.Prefix) - defer func() { - if err := statsClient.Close(); err != nil { - log.WithError(err).Error("Got error on closing stats client") - } - }() - - storageURL, err := url.Parse(globalConfig.StorageDSN) - failOnError(err, "Failed to parse Storage DSN") - - persistentStorage, err := storage.NewPersistentStorage(storageURL) - failOnError(err, "Failed to establish Redis connection") - // Do not close storage here as it is required in Worker close to store unhandled messages - - producer, err := kafka.NewProducer(globalConfig.Kafka, statsClient) - failOnError(err, "Failed to establish Kafka connection") - defer func() { - if err := producer.Close(); err != nil { - log.WithError(err).Error("Got error on closing kafka producer") - } - }() - - worker, err := workers.NewBridgeWorker(globalConfig.Worker, persistentStorage, producer, statsClient) - defer func() { - if err := worker.Close(); err != nil { - log.WithError(err).Error("Got error on closing persistent storage") - } - }() - - queuesHandler := amqp.NewQueuesHandler(pipesList, worker.MessageHandler, statsClient) - amqpConnection, err := amqp.NewConnection(globalConfig.RabbitDSN, queuesHandler) - failOnError(err, "Failed to establish initial connection to AMQP") - defer func() { - if err := amqpConnection.Close(); err != nil { - log.WithError(err).Error("Got error on closing AMQP connection") - } - }() - - forever := make(chan bool) - - worker.Go(forever) - - log.Infof("[*] Waiting for users. To exit press CTRL+C") - <-forever + err := RootCmd.Execute() + failOnError(err, "Failed to execute root command") } diff --git a/doc.go b/doc.go index 374499d..84af63c 100644 --- a/doc.go +++ b/doc.go @@ -1,5 +1,5 @@ /* -RabbitMQ to Kafka bridge. +Package kandalf is RabbitMQ to Kafka bridge. For a full guide visit https://github.com/hellofresh/kandalf */ diff --git a/glide.lock b/glide.lock index 57f5ab3..b719dd6 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 9a4e8aca86e50ca59c27b5417118e56025efc6d0ca7e71e7f34c9c3ce5bda41c -updated: 2017-03-28T17:14:15.850534644+02:00 +hash: d06a33d1fec6c1074376b95c3cfcff763915c637b801f0211880ba8f6526babe +updated: 2017-03-31T15:01:01.067051439+02:00 imports: - name: github.com/davecgh/go-spew version: 6d212800a42e8ab5c146b8ace3490ee17e5225f9 @@ -13,6 +13,8 @@ imports: version: bb955e01b9346ac19dc29eb16586c90ded99a98c - name: github.com/eapache/queue version: 44cc805cf13205b55f69e14bcb69867d1ae92f98 +- name: github.com/fsnotify/fsnotify + version: 629574ca2a5df945712d3079857300b5e4da0236 - name: github.com/go-redis/redis version: 2cf5af9928978bbef8d1076607022403439e177c subpackages: @@ -23,12 +25,33 @@ imports: - internal/proto - name: github.com/golang/snappy version: d9eb7a3d35ec988b8585d4a0068e462c27d28380 +- name: github.com/hashicorp/hcl + version: 630949a3c5fa3c613328e1b8256052cbc2327c9b + subpackages: + - hcl/ast + - hcl/parser + - hcl/scanner + - hcl/strconv + - hcl/token + - json/parser + - json/scanner + - json/token - name: github.com/hellofresh/stats-go version: 47f7950a30e6f2c5f5769dd06856647d58ecf9f6 +- name: github.com/inconshreveable/mousetrap + version: 76626ae9c91c4f2a10f34cad8ce83ea42c93bb75 - name: github.com/kelseyhightower/envconfig version: f611eb38b3875cc3bd991ca91c51d06446afa14c - name: github.com/klauspost/crc32 version: 19b0b332c9e4516a6370a0456e6182c3b5036720 +- name: github.com/magiconair/properties + version: 51463bfca2576e06c62a8504b5c0f06d61312647 +- name: github.com/mitchellh/mapstructure + version: 53818660ed4955e899c0bcafa97299a388bd7c8e +- name: github.com/pelletier/go-buffruneio + version: c37440a7cf42ac63b919c752ca73a85067e05992 +- name: github.com/pelletier/go-toml + version: f6e7596e8daafd44dc9e5c208dc73035020c8481 - name: github.com/pierrec/lz4 version: 90290f74b1b4d9c097f0a3b3c7eba2ef3875c699 - name: github.com/pierrec/xxHash @@ -43,12 +66,31 @@ imports: version: 0fb560e5f7fbcaee2f75e3c34174320709f69944 - name: github.com/Sirupsen/logrus version: ba1b36c82c5e05c4f912a88eab0dcd91a171688f +- name: github.com/spf13/afero + version: 9be650865eab0c12963d8753212f4f9c66cdcf12 + subpackages: + - mem +- name: github.com/spf13/cast + version: ce135a4ebeee6cfe9a26c93ee0d37825f26113c7 +- name: github.com/spf13/cobra + version: 7be4beda01ec05d0b93d80b3facd2b6f44080d94 +- name: github.com/spf13/jwalterweatherman + version: fa7ca7e836cf3a8bb4ebf799f472c12d7e903d66 +- name: github.com/spf13/pflag + version: d16db1e50e33dff1b6cdf37596cef36742128670 +- name: github.com/spf13/viper + version: 84f94806c67f59dd7ae87bc5351f7a9c94a4558d - name: github.com/streadway/amqp version: 2e25825abdbd7752ff08b270d313b93519a0a232 - name: golang.org/x/sys version: 8f0908ab3b2457e2e15403d3697c9ef5cb4b57a9 subpackages: - unix +- name: golang.org/x/text + version: fc7fa097411d30e6708badff276c4c164425590c + subpackages: + - transform + - unicode/norm - name: gopkg.in/alexcesaro/statsd.v2 version: 7fea3f0d2fab1ad973e641e51dba45443a311a90 - name: gopkg.in/yaml.v2 diff --git a/glide.yaml b/glide.yaml index b4482a1..b9a79b7 100644 --- a/glide.yaml +++ b/glide.yaml @@ -12,6 +12,8 @@ import: version: ^1.3.0 - package: github.com/satori/go.uuid version: ^1.1.0 +- package: github.com/spf13/cobra +- package: github.com/spf13/viper - package: github.com/streadway/amqp - package: gopkg.in/yaml.v2 testImport: diff --git a/pkg/config/config.go b/pkg/config/config.go index e408279..af1dd04 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -1,60 +1,22 @@ package config import ( - "io/ioutil" "time" + log "github.com/Sirupsen/logrus" "github.com/kelseyhightower/envconfig" - "gopkg.in/yaml.v2" + "github.com/spf13/viper" ) -// Duration is "time.Duration" wrapper for parsing it from string for environment variables -type Duration struct { - time.Duration -} - -// Decode decodes string value from environment variable to "time.Duration" -func (d *Duration) Decode(value string) error { - val, err := time.ParseDuration(value) - if err != nil { - return err - } - d.Duration = val - return nil -} - -// UnmarshalYAML decodes string value from yaml config variable to "time.Duration" -func (d *Duration) UnmarshalYAML(unmarshal func(v interface{}) error) error { - var i interface{} - if err := unmarshal(&i); err != nil { - return err - } - - if value, ok := i.(string); ok { - if value == "" { - return &yaml.TypeError{Errors: []string{"Empty string is invalid value for duration field"}} - } - val, err := time.ParseDuration(value) - if err != nil { - return &yaml.TypeError{Errors: []string{err.Error()}} - } - d.Duration = val - - return nil - } - - return &yaml.TypeError{Errors: []string{"Failed to cast duration field to string"}} -} - // GlobalConfig contains application configuration values type GlobalConfig struct { // LogLevel defines logging level for application, default is "info" - LogLevel string `yaml:"log_level" envconfig:"LOG_LEVEL" default:"info"` + LogLevel string `envconfig:"LOG_LEVEL" default:"info"` // RabbitDSN is DSN for RabbitMQ instance to consume messages from - RabbitDSN string `yaml:"rabbit_dsn" envconfig:"RABBIT_DSN" required:"true"` + RabbitDSN string `envconfig:"RABBIT_DSN"` // StorageDSN is DSN for persistent storage used in case of Kafka unavailability. Example: // redis://redis.local/?key=storage:key - StorageDSN string `yaml:"storage_dsn" envconfig:"STORAGE_DSN" required:"true"` + StorageDSN string `envconfig:"STORAGE_DSN"` // Kafka contains configuration values for Kafka Kafka KafkaConfig @@ -67,9 +29,9 @@ type GlobalConfig struct { // KafkaConfig contains application configuration values for Kafka type KafkaConfig struct { // Brokers is Kafka brokers comma-separated list, e.g. "192.168.0.1:9092,192.168.0.2:9092" - Brokers []string `envconfig:"KAFKA_BROKERS" required:"true"` + Brokers []string `envconfig:"KAFKA_BROKERS"` // MaxRetry is total number of times to retry sending a message to Kafka, default is 5 - MaxRetry int `yaml:"max_retry" envconfig:"KAFKA_MAX_RETRY" default:"5"` + MaxRetry int `envconfig:"KAFKA_MAX_RETRY"` // PipesConfig is a path to rabbit-kafka bridge mappings config. // This must be YAML file with the following structure: // @@ -86,7 +48,7 @@ type KafkaConfig struct { // rabbitmq_queue_name: "kandalf-customers-badge.received" // // Default path is "/etc/kandalf/conf/pipes.yml". - PipesConfig string `yaml:"pipes_config" envconfig:"KAFKA_PIPES_CONFIG" default:"/etc/kandalf/conf/pipes.yml"` + PipesConfig string `envconfig:"KAFKA_PIPES_CONFIG"` } // StatsConfig contains application configuration values for stats. @@ -101,51 +63,67 @@ type StatsConfig struct { // WorkerConfig contains application configuration values for actual bridge worker type WorkerConfig struct { // CycleTimeout is worker cycle sleep time to avoid CPU overload - CycleTimeout Duration `yaml:"cycle_timeout" envconfig:"WORKER_CYCLE_TIMEOUT" default:"2s"` + CycleTimeout time.Duration `envconfig:"WORKER_CYCLE_TIMEOUT"` // CacheSize is max messages number that we store in memory before trying to publish to Kafka - CacheSize int `yaml:"cache_size" envconfig:"WORKER_CACHE_SIZE" default:"10"` + CacheSize int `envconfig:"WORKER_CACHE_SIZE"` // CacheFlushTimeout is max amount of time we store messages in memory before trying to publish to Kafka - CacheFlushTimeout Duration `yaml:"cache_flush_timeout" envconfig:"WORKER_CACHE_FLUSH_TIMEOUT" default:"5s"` + CacheFlushTimeout time.Duration `envconfig:"WORKER_CACHE_FLUSH_TIMEOUT"` // ReadTimeout is timeout between attempts of reading persisted messages from storage // to publish them to Kafka, must be at least 2x greater than CycleTimeout - StorageReadTimeout Duration `yaml:"storage_read_timeout" envconfig:"WORKER_STORAGE_READ_TIMEOUT" default:"10s"` + StorageReadTimeout time.Duration `envconfig:"WORKER_STORAGE_READ_TIMEOUT"` // StorageMaxErrors is max storage read errors in a row before worker stops trying reading in current // read cycle. Next read cycle will be in "StorageReadTimeout" interval. - StorageMaxErrors int `yaml:"storage_max_errors" envconfig:"WORKER_STORAGE_MAX_ERRORS" default:"10"` + StorageMaxErrors int `envconfig:"WORKER_STORAGE_MAX_ERRORS"` } -var instance GlobalConfig +func init() { + viper.SetDefault("logLevel", "info") + viper.SetDefault("kafka.maxRetry", 5) + viper.SetDefault("kafka.pipesConfig", "/etc/kandalf/conf/pipes.yml") + viper.SetDefault("worker.cycleTimeout", time.Second*time.Duration(2)) + viper.SetDefault("worker.cacheSize", 10) + viper.SetDefault("worker.cacheFlushTimeout", time.Second*time.Duration(5)) + viper.SetDefault("worker.storageReadTimeout", time.Second*time.Duration(10)) + viper.SetDefault("worker.storageMaxErrors", 10) +} -// LoadConfigFromEnv populates config values from environment variables -func LoadConfigFromEnv() (GlobalConfig, error) { - err := envconfig.Process("", &instance) - if err != nil { - return instance, err +// Load loads config values from file, +// fallback to load from environment variables if file is not found or failed to read +func Load(configPath string) (*GlobalConfig, error) { + if configPath != "" { + viper.SetConfigFile(configPath) + } else { + viper.SetConfigName("config") + viper.AddConfigPath("/etc/kandalf/conf") + viper.AddConfigPath(".") } - return instance, nil -} - -// LoadConfigFromFile populates config values from file -func LoadConfigFromFile(configPath string) (GlobalConfig, error) { - data, err := ioutil.ReadFile(configPath) - if err != nil { - return instance, err + if err := viper.ReadInConfig(); err != nil { + log.WithError(err).Warn("No config file found") + return LoadConfigFromEnv() } + log.WithField("path", viper.ConfigFileUsed()).Info("Config loaded from file") - instance, err := LoadConfigFromData(data) - if err != nil { - return instance, err + var instance GlobalConfig + if err := viper.Unmarshal(&instance); err != nil { + return nil, err } - return instance, nil + return &instance, nil } -// LoadConfigFromData populates config values from data -func LoadConfigFromData(data []byte) (GlobalConfig, error) { - if err := yaml.Unmarshal(data, &instance); err != nil { - return instance, err +// LoadConfigFromEnv loads config values from environment variables +func LoadConfigFromEnv() (*GlobalConfig, error) { + var instance GlobalConfig + + if err := viper.Unmarshal(&instance); err != nil { + return nil, err + } + + err := envconfig.Process("", &instance) + if err != nil { + return &instance, err } - return instance, nil + return &instance, nil } diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 313a8ab..dd50f85 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -6,10 +6,9 @@ import ( "testing" "github.com/stretchr/testify/assert" - "gopkg.in/yaml.v2" ) -func assertConfig(t *testing.T, globalConfig GlobalConfig) { +func assertConfig(t *testing.T, globalConfig *GlobalConfig) { assert.Equal(t, "info", globalConfig.LogLevel) assert.Equal(t, "amqp://user:password@rmq", globalConfig.RabbitDSN) assert.Equal(t, "redis://redis.local/?key=storage:key", globalConfig.StorageDSN) @@ -30,63 +29,7 @@ func assertConfig(t *testing.T, globalConfig GlobalConfig) { assert.Equal(t, 10, globalConfig.Worker.StorageMaxErrors) } -func TestLoadConfigFromData(t *testing.T) { - data := []byte(` ---- -log_level: "info" -rabbit_dsn: "amqp://user:password@rmq" -storage_dsn: "redis://redis.local/?key=storage:key" -kafka: - brokers: - - "192.0.0.1:9092" - - "192.0.0.2:9092" - max_retry: 5 - pipes_config: "/etc/kandalf/conf/pipes.yml" -stats: - dsn: "statsd.local:8125" - prefix: "kandalf" -worker: - cycle_timeout: "2s" - cache_size: 10 - cache_flush_timeout: "5s" - storage_read_timeout: "10s" - storage_max_errors: 10 -`) - globalConfig, err := LoadConfigFromData(data) - assert.Nil(t, err) - - assertConfig(t, globalConfig) -} - -func TestDuration_UnmarshalYAML(t *testing.T) { - data := []byte(` ---- -log_level: "info" -rabbit_dsn: "amqp://user:password@rmq" -storage_dsn: "redis://redis.local/?key=storage:key" -kafka: - brokers: - - "192.0.0.1:9092" - - "192.0.0.2:9092" - max_retry: 5 - pipes_config: "/etc/kandalf/conf/pipes.yml" -stats: - dsn: "statsd.local:8125" - prefix: "kandalf" -worker: - cycle_timeout: "hello" - cache_size: 10 - cache_flush_timeout: "5s" - storage_read_timeout: "10s" - storage_max_errors: 10 -`) - _, err := LoadConfigFromData(data) - assert.NotEmpty(t, err) - - assert.IsType(t, &yaml.TypeError{}, err) -} - -func TestLoadConfigFromFile(t *testing.T) { +func TestLoad(t *testing.T) { wd, err := os.Getwd() assert.Nil(t, err) assert.Contains(t, wd, "github.com/hellofresh/kandalf") @@ -96,7 +39,7 @@ func TestLoadConfigFromFile(t *testing.T) { _, err = os.Stat(globalConfigPath) assert.Nil(t, err) - globalConfig, err := LoadConfigFromFile(globalConfigPath) + globalConfig, err := Load(globalConfigPath) assert.Nil(t, err) assertConfig(t, globalConfig) @@ -118,6 +61,15 @@ func setGlobalConfigEnv() { os.Setenv("WORKER_STORAGE_MAX_ERRORS", "10") } +func TestLoad_fallbackToEnv(t *testing.T) { + setGlobalConfigEnv() + + globalConfig, err := Load("") + assert.Nil(t, err) + + assertConfig(t, globalConfig) +} + func TestLoadConfigFromEnv(t *testing.T) { setGlobalConfigEnv() @@ -127,7 +79,7 @@ func TestLoadConfigFromEnv(t *testing.T) { assertConfig(t, globalConfig) } -func TestDuration_Decode(t *testing.T) { +func TestLoadConfigFromEnv_Duration(t *testing.T) { setGlobalConfigEnv() os.Setenv("WORKER_CYCLE_TIMEOUT", "22s") diff --git a/pkg/config/pipes.go b/pkg/config/pipes.go index 2524021..a603877 100644 --- a/pkg/config/pipes.go +++ b/pkg/config/pipes.go @@ -2,17 +2,15 @@ package config import ( "encoding/json" - "io/ioutil" - - "gopkg.in/yaml.v2" + "github.com/spf13/viper" ) // Pipe contains settings for single bridge pipe between Kafka and RabbitMQ type Pipe struct { - KafkaTopic string `yaml:"kafka_topic" json:"kafka_topic"` - RabbitExchangeName string `yaml:"rabbitmq_exchange_name" json:"rabbitmq_exchange_name"` - RabbitRoutingKey string `yaml:"rabbitmq_routing_key" json:"rabbitmq_routing_key"` - RabbitQueueName string `yaml:"rabbitmq_queue_name" json:"rabbitmq_queue_name"` + KafkaTopic string + RabbitExchangeName string + RabbitRoutingKey string + RabbitQueueName string } func (p Pipe) String() string { @@ -22,26 +20,19 @@ func (p Pipe) String() string { // LoadPipesFromFile loads pipes config from file func LoadPipesFromFile(pipesConfigPath string) ([]Pipe, error) { - data, err := ioutil.ReadFile(pipesConfigPath) - if err != nil { + pipesConfigReader := viper.New() + pipesConfigReader.SetConfigFile(pipesConfigPath) + if err := pipesConfigReader.ReadInConfig(); err != nil { return nil, err } - pipes, err := LoadPipesFromData(data) - if err != nil { - return nil, err + var pipes struct { + Pipes []Pipe } - return pipes, nil -} - -// LoadPipesFromData loads pipes config from data -func LoadPipesFromData(data []byte) ([]Pipe, error) { - var pipes []Pipe - - if err := yaml.Unmarshal(data, &pipes); err != nil { - return pipes, err + if err := pipesConfigReader.Unmarshal(&pipes); err != nil { + return nil, err } - return pipes, nil + return pipes.Pipes, nil } diff --git a/pkg/config/pipes_test.go b/pkg/config/pipes_test.go index 4313418..de065d0 100644 --- a/pkg/config/pipes_test.go +++ b/pkg/config/pipes_test.go @@ -1,6 +1,7 @@ package config import ( + "fmt" "os" "path/filepath" "testing" @@ -20,39 +21,6 @@ func assertPipes(t *testing.T, pipes []Pipe) { assert.Equal(t, "kandalf-customers-badge.received", pipes[1].RabbitQueueName) } -func TestLoadPipesFromData(t *testing.T) { - data := []byte(` ---- -- - # Message from that RabbitMQ exchange - rabbitmq_exchange_name: "customers" - # With that routing key - rabbitmq_routing_key: "order.created" - # Will be placed to that kafka topic - kafka_topic: "new-orders" - # The queue name can be whatever you want, just keep it unique within pipes. - # If you launch multiple kandalf instances they all will consume messages from that queue. - rabbitmq_queue_name: "kandalf-customers-order.created" - -- - kafka_topic: "loyalty" - rabbitmq_exchange_name: "customers" - rabbitmq_routing_key: "badge.received" - rabbitmq_queue_name: "kandalf-customers-badge.received" - `) - pipes, err := LoadPipesFromData(data) - assert.Nil(t, err) - assert.Len(t, pipes, 2) - - assertPipes(t, pipes) -} - -func TestLoadPipesFromData_Error(t *testing.T) { - data := []byte("this is not yaml") - _, err := LoadPipesFromData(data) - assert.NotEmpty(t, err) -} - func TestLoadPipesFromFile(t *testing.T) { wd, err := os.Getwd() assert.Nil(t, err) @@ -75,7 +43,7 @@ func TestLoadPipesFromFile_Error(t *testing.T) { assert.Nil(t, err) assert.Contains(t, wd, "github.com/hellofresh/kandalf") - // non-yaml file + // non-supported file type // .../github.com/hellofresh/kandalf/pkg/config/pipes.go pipesPath := filepath.Join(wd, "pipes.go") _, err = os.Stat(pipesPath) @@ -93,3 +61,16 @@ func TestLoadPipesFromFile_Error(t *testing.T) { _, err = LoadPipesFromFile(pipesPath) assert.NotEmpty(t, err) } + +func TestPipe_String(t *testing.T) { + pipe := Pipe{ + KafkaTopic: "topic", + RabbitExchangeName: "rqExchange", + RabbitRoutingKey: "rqKey", + RabbitQueueName: "rqQueue", + } + pipeJSON := `{"KafkaTopic":"topic","RabbitExchangeName":"rqExchange","RabbitRoutingKey":"rqKey","RabbitQueueName":"rqQueue"}` + + assert.Equal(t, pipeJSON, pipe.String()) + assert.Equal(t, pipeJSON, fmt.Sprintf("%s", pipe)) +} diff --git a/pkg/workers/bridge_worker.go b/pkg/workers/bridge_worker.go index ebe55fc..5789414 100644 --- a/pkg/workers/bridge_worker.go +++ b/pkg/workers/bridge_worker.go @@ -48,7 +48,7 @@ func (w *BridgeWorker) Execute() { w.Lock() defer w.Unlock() - if len(w.cache) >= w.config.CacheSize || time.Now().Sub(w.lastFlush) >= w.config.CacheFlushTimeout.Duration { + if len(w.cache) >= w.config.CacheSize || time.Now().Sub(w.lastFlush) >= w.config.CacheFlushTimeout { log.WithFields(log.Fields{"len": len(w.cache), "last_flush": w.lastFlush}). Debug("Flushing worker cache to Kafka") @@ -65,7 +65,7 @@ func (w *BridgeWorker) Execute() { // Go runs the service forever in async way in go-routine func (w *BridgeWorker) Go(interrupt chan bool) { - w.readStorageTicker = time.NewTicker(w.config.StorageReadTimeout.Duration) + w.readStorageTicker = time.NewTicker(w.config.StorageReadTimeout) go func() { for { @@ -80,7 +80,7 @@ func (w *BridgeWorker) Go(interrupt chan bool) { // Prevent CPU overload log.WithField("timeout", w.config.CycleTimeout).Debug("Bridge worker is going to sleep for a while") - time.Sleep(w.config.CycleTimeout.Duration) + time.Sleep(w.config.CycleTimeout) } }() }