Skip to content
This repository has been archived by the owner on Aug 25, 2022. It is now read-only.

Commit

Permalink
PLAT-301 Migrated to viper for config files and cobra for app flags
Browse files Browse the repository at this point in the history
  • Loading branch information
vgarvardt committed Mar 31, 2017
1 parent 0e46d58 commit 36c983d
Show file tree
Hide file tree
Showing 13 changed files with 293 additions and 318 deletions.
50 changes: 29 additions & 21 deletions README.md
Expand Up @@ -18,7 +18,9 @@ Service is written in Go language and can be build with go compiler of version 1

### Application configuration

Application is configured with environment variables or YAML config file.
Application is configured with environment variables or config files of different formats - JSON, TOML, YAML, HCL, and Java properties.

By default it tries to read config file from `/etc/kandalf/conf/config.<ext>` and `./config.<ext>`. You can change the path using `-c <file_path>` or `--config <file_path>` application parameters. If file is not found config loader does fallback to reading config values from environment variables.

#### Environment variables

Expand All @@ -35,41 +37,47 @@ Application is configured with environment variables or YAML config file.
* `WORKER_CACHE_SIZE` - Max messages number that we store in memory before trying to publish to Kafka (_default_: `10`)
* `WORKER_CACHE_FLUSH_TIMEOUT` - Max amount of time we store messages in memory before trying to publish to Kafka, must be valid [duration string](https://golang.org/pkg/time/#ParseDuration) (_default_: `5s`)
* `WORKER_STORAGE_READ_TIMEOUT` - Timeout between attempts of reading persisted messages from storage, to publish them to Kafka, must be at least 2x greater than `WORKER_CYCLE_TIMEOUT`, must be valid [duration string](https://golang.org/pkg/time/#ParseDuration) (_default_: `10s`)
* `WORKER_STORAGE_MAX_ERRORS` - Max storage read errors in a row before worker stops trying reading in current read cycle. Next read cycle will be in `WORKER_STORAGE_READ_TIMEOUT` interval. (_default_: `10`)

#### Config file
#### Config file (YAML example)

You can use `-c <file_path>` parameter to load application settings from YAML file. Config should have the following structure:
Config should have the following structure:

```yaml
log_level: "info" # same as env LOG_LEVEL
rabbit_dsn: "amqp://user:password@rmq" # same as env RABBIT_DSN
storage_dsn: "redis://redis.local/?key=storage:key" # same as env STORAGE_DSN
logLevel: "info" # same as env LOG_LEVEL
rabbitDSN: "amqp://user:password@rmq" # same as env RABBIT_DSN
storageDSN: "redis://redis.local/?key=storage:key" # same as env STORAGE_DSN
kafka:
brokers: # same as env KAFKA_BROKERS
brokers: # same as env KAFKA_BROKERS
- "192.0.0.1:9092"
- "192.0.0.2:9092"
max_retry: 5 # same as env KAFKA_MAX_RETRY
pipes_config: "/etc/kandalf/conf/pipes.yml" # same as env KAFKA_PIPES_CONFIG
maxRetry: 5 # same as env KAFKA_MAX_RETRY
pipesConfig: "/etc/kandalf/conf/pipes.yml" # same as env KAFKA_PIPES_CONFIG
stats:
dsn: "statsd.local:8125" # same as env STATS_DSN
prefix: "kandalf" # same as env STATS_PREFIX
dsn: "statsd.local:8125" # same as env STATS_DSN
prefix: "kandalf" # same as env STATS_PREFIX
worker:
cycle_timeout: "2s" # same as env WORKER_CYCLE_TIMEOUT
cache_size: 10 # same as env WORKER_CACHE_SIZE
cache_flush_timeout: "5s" # same as env WORKER_CACHE_FLUSH_TIMEOUT
storage_read_timeout: "10s" # same as env WORKER_STORAGE_READ_TIMEOUT
cycleTimeout: "2s" # same as env WORKER_CYCLE_TIMEOUT
cacheSize: 10 # same as env WORKER_CACHE_SIZE
cacheFlushTimeout: "5s" # same as env WORKER_CACHE_FLUSH_TIMEOUT
storageReadTimeout: "10s" # same as env WORKER_STORAGE_READ_TIMEOUT
storageMaxErrors: 10 # same as env WORKER_STORAGE_MAX_ERRORS
```

You can find sample config file in [ci/assets/config.yml](./ci/assets/config.yml).

### Pipes configuration

The rules, defining which messages should be send to which Kafka topics, are defined in "[pipes.yml](./ci/assets/pipes.yml)" and is called "pipes".
The rules, defining which messages should be send to which Kafka topics, are defined in Kafka Pipes Config file and are called "pipes". Each pipe has the following structure:

Each pipe should contain following keys:
```yaml
- kafkaTopic: "loyalty" # name of the topic in Kafka where message will be sent
rabbitExchangeName: "customers" # name of the exchange in RabbitMQ
rabbitRoutingKey: "badge.received" # routing key for exchange
rabbitQueueName: "kandalf-customers-badge.received" # the name of RabbitMQ queue to read messages from
```

* **kafka_topic** _str_ — name of the topic in Kafka where message will be sent;
* **rabbitmq_exchange_name** _str_ — name of the exchange in RabbitMQ;
* **rabbitmq_routing_key** _str_ — routing key for exchange;
* **rabbitmq_queue_name** _str_ — the name of RabbitMQ queue to read messages from.
You can find sample Kafka Pipes Config file in [ci/assets/pipes.yml](./ci/assets/pipes.yml).

## How to build a binary on a local machine

Expand Down
20 changes: 10 additions & 10 deletions ci/assets/config.yml
@@ -1,21 +1,21 @@
# Available options are: "debug", "info", "warning", "error", "fatal" and "panic"
log_level: "info"
rabbit_dsn: "amqp://user:password@rmq"
storage_dsn: "redis://redis.local/?key=storage:key"
logLevel: "info"
rabbitDSN: "amqp://user:password@rmq"
storageDSN: "redis://redis.local/?key=storage:key"
kafka:
brokers:
- "192.0.0.1:9092"
- "192.0.0.2:9092"
# The total number of times to retry sending a message.
# Should be similar to the `message.send.max.retries` setting of the JVM producer.
max_retry: 5
pipes_config: "/etc/kandalf/conf/pipes.yml"
maxRetry: 5
pipesConfig: "/etc/kandalf/conf/pipes.yml"
stats:
dsn: "statsd.local:8125"
prefix: "kandalf"
worker:
cycle_timeout: "2s"
cache_size: 10
cache_flush_timeout: "5s"
storage_read_timeout: "10s"
storage_max_errors: 10
cycleTimeout: "2s"
cacheSize: 10
cacheFlushTimeout: "5s"
storageReadTimeout: "10s"
storageMaxErrors: 10
20 changes: 9 additions & 11 deletions ci/assets/pipes.yml
@@ -1,16 +1,14 @@
-
pipes:
# Message from that RabbitMQ exchange
rabbitmq_exchange_name: "customers"
- rabbitExchangeName: "customers"
# With that routing key
rabbitmq_routing_key: "order.created"
rabbitRoutingKey: "order.created"
# Will be placed to that kafka topic
kafka_topic: "new-orders"
kafkaTopic: "new-orders"
# The queue name can be whatever you want, just keep it unique within pipes.
# If you launch multiple kandalf instances they all will consume messages from that queue.
rabbitmq_queue_name: "kandalf-customers-order.created"
rabbitQueueName: "kandalf-customers-order.created"

-
kafka_topic: "loyalty"
rabbitmq_exchange_name: "customers"
rabbitmq_routing_key: "badge.received"
rabbitmq_queue_name: "kandalf-customers-badge.received"
- kafkaTopic: "loyalty"
rabbitExchangeName: "customers"
rabbitRoutingKey: "badge.received"
rabbitQueueName: "kandalf-customers-badge.received"
82 changes: 82 additions & 0 deletions cmd/kandalf/app.go
@@ -0,0 +1,82 @@
package main

import (
"net/url"
"strings"

log "github.com/Sirupsen/logrus"
"github.com/hellofresh/kandalf/pkg/amqp"
"github.com/hellofresh/kandalf/pkg/config"
"github.com/hellofresh/kandalf/pkg/kafka"
"github.com/hellofresh/kandalf/pkg/storage"
"github.com/hellofresh/kandalf/pkg/workers"
"github.com/hellofresh/stats-go"
"github.com/spf13/cobra"
)

// RunApp is main application bootstrap and runner
func RunApp(cmd *cobra.Command, args []string) {
printVersion, err := cmd.Flags().GetBool(flagVersion)
failOnError(err, "Failed to read version flag")
if printVersion {
cmd.Println(cmd.Short)
return
}

log.WithField("version", version).Info("Kandalf starting...")

globalConfig, err := config.Load(configPath)
failOnError(err, "Failed to load application configuration")

level, err := log.ParseLevel(strings.ToLower(globalConfig.LogLevel))
failOnError(err, "Failed to get log level")
log.SetLevel(level)

pipesList, err := config.LoadPipesFromFile(globalConfig.Kafka.PipesConfig)
failOnError(err, "Failed to load pipes config")

statsClient := stats.NewStatsdStatsClient(globalConfig.Stats.DSN, globalConfig.Stats.Prefix)
defer func() {
if err := statsClient.Close(); err != nil {
log.WithError(err).Error("Got error on closing stats client")
}
}()

storageURL, err := url.Parse(globalConfig.StorageDSN)
failOnError(err, "Failed to parse Storage DSN")

persistentStorage, err := storage.NewPersistentStorage(storageURL)
failOnError(err, "Failed to establish Redis connection")
// Do not close storage here as it is required in Worker close to store unhandled messages

producer, err := kafka.NewProducer(globalConfig.Kafka, statsClient)
failOnError(err, "Failed to establish Kafka connection")
defer func() {
if err := producer.Close(); err != nil {
log.WithError(err).Error("Got error on closing kafka producer")
}
}()

worker, err := workers.NewBridgeWorker(globalConfig.Worker, persistentStorage, producer, statsClient)
defer func() {
if err := worker.Close(); err != nil {
log.WithError(err).Error("Got error on closing persistent storage")
}
}()

queuesHandler := amqp.NewQueuesHandler(pipesList, worker.MessageHandler, statsClient)
amqpConnection, err := amqp.NewConnection(globalConfig.RabbitDSN, queuesHandler)
failOnError(err, "Failed to establish initial connection to AMQP")
defer func() {
if err := amqpConnection.Close(); err != nil {
log.WithError(err).Error("Got error on closing AMQP connection")
}
}()

forever := make(chan bool)

worker.Go(forever)

log.Infof("[*] Waiting for users. To exit press CTRL+C")
<-forever
}
99 changes: 20 additions & 79 deletions cmd/kandalf/main.go
@@ -1,18 +1,15 @@
package main

import (
"flag"
"net/url"
"os"
"strings"

log "github.com/Sirupsen/logrus"
"github.com/hellofresh/kandalf/pkg/amqp"
"github.com/hellofresh/kandalf/pkg/config"
"github.com/hellofresh/kandalf/pkg/kafka"
"github.com/hellofresh/kandalf/pkg/storage"
"github.com/hellofresh/kandalf/pkg/workers"
"github.com/hellofresh/stats-go"
"github.com/spf13/cobra"
)

const flagVersion = "version"

var (
version string
configPath string
)

func failOnError(err error, msg string) {
Expand All @@ -21,75 +18,19 @@ func failOnError(err error, msg string) {
}
}

var version string

func main() {
var (
globalConfig config.GlobalConfig
err error
)

flagSet := flag.NewFlagSet("Kandalf v"+version, flag.ExitOnError)
configPath := flagSet.String("c", "", "Path to config file, set if you want to load settings from YAML file, otherwise settings are loaded from environment variables")
flagSet.Parse(os.Args[1:])

if *configPath != "" {
globalConfig, err = config.LoadConfigFromFile(*configPath)
failOnError(err, "Failed to load config from file")
} else {
globalConfig, err = config.LoadConfigFromEnv()
failOnError(err, "Failed to load config from environment")
versionString := "Kandalf v" + version
var RootCmd = &cobra.Command{
Use: "kandalf",
Short: versionString,
Long: versionString + `. RabbitMQ to Kafka bridge.
Complete documentation is available at https://github.com/hellofresh/kandalf`,
Run: RunApp,
}
RootCmd.Flags().StringVarP(&configPath, "config", "c", "", "Source of a configuration file")
RootCmd.Flags().BoolP(flagVersion, "v", false, "Print application version")

level, err := log.ParseLevel(strings.ToLower(globalConfig.LogLevel))
failOnError(err, "Failed to get log level")
log.SetLevel(level)

pipesList, err := config.LoadPipesFromFile(globalConfig.Kafka.PipesConfig)
failOnError(err, "Failed to load pipes config")

statsClient := stats.NewStatsdStatsClient(globalConfig.Stats.DSN, globalConfig.Stats.Prefix)
defer func() {
if err := statsClient.Close(); err != nil {
log.WithError(err).Error("Got error on closing stats client")
}
}()

storageURL, err := url.Parse(globalConfig.StorageDSN)
failOnError(err, "Failed to parse Storage DSN")

persistentStorage, err := storage.NewPersistentStorage(storageURL)
failOnError(err, "Failed to establish Redis connection")
// Do not close storage here as it is required in Worker close to store unhandled messages

producer, err := kafka.NewProducer(globalConfig.Kafka, statsClient)
failOnError(err, "Failed to establish Kafka connection")
defer func() {
if err := producer.Close(); err != nil {
log.WithError(err).Error("Got error on closing kafka producer")
}
}()

worker, err := workers.NewBridgeWorker(globalConfig.Worker, persistentStorage, producer, statsClient)
defer func() {
if err := worker.Close(); err != nil {
log.WithError(err).Error("Got error on closing persistent storage")
}
}()

queuesHandler := amqp.NewQueuesHandler(pipesList, worker.MessageHandler, statsClient)
amqpConnection, err := amqp.NewConnection(globalConfig.RabbitDSN, queuesHandler)
failOnError(err, "Failed to establish initial connection to AMQP")
defer func() {
if err := amqpConnection.Close(); err != nil {
log.WithError(err).Error("Got error on closing AMQP connection")
}
}()

forever := make(chan bool)

worker.Go(forever)

log.Infof("[*] Waiting for users. To exit press CTRL+C")
<-forever
err := RootCmd.Execute()
failOnError(err, "Failed to execute root command")
}
2 changes: 1 addition & 1 deletion doc.go
@@ -1,5 +1,5 @@
/*
RabbitMQ to Kafka bridge.
Package kandalf is RabbitMQ to Kafka bridge.
For a full guide visit https://github.com/hellofresh/kandalf
*/
Expand Down

0 comments on commit 36c983d

Please sign in to comment.