diff --git a/config.go b/config.go index 615e18b..565579e 100644 --- a/config.go +++ b/config.go @@ -18,6 +18,7 @@ type config struct { LogLevel string `yaml:"log_level"` Timeout time.Duration TimeoutShutdown time.Duration `yaml:"timeout_shutdown"` + Concurrency int Tenant struct { Label string @@ -41,6 +42,10 @@ func configParse(b []byte) (*config, error) { cfg.Timeout = 10 * time.Second } + if cfg.Concurrency == 0 { + cfg.Concurrency = 512 + } + if cfg.Tenant.Header == "" { cfg.Tenant.Header = "X-Scope-OrgID" } diff --git a/config.yml b/config.yml index f7aec5f..a6bb765 100644 --- a/config.yml +++ b/config.yml @@ -5,6 +5,7 @@ target: http://127.0.0.1:9091/receive log_level: debug timeout: 10s timeout_shutdown: 0s +concurrency: 10 tenant: label: tenant diff --git a/processor.go b/processor.go index b81abc1..8adf937 100644 --- a/processor.go +++ b/processor.go @@ -50,6 +50,8 @@ func newProcessor(c config) *processor { ReadTimeout: c.Timeout, WriteTimeout: c.Timeout, IdleTimeout: 60 * time.Second, + + Concurrency: c.Concurrency, } p.cli = &fh.Client{ diff --git a/processor_test.go b/processor_test.go index 0268619..bd5fedc 100644 --- a/processor_test.go +++ b/processor_test.go @@ -148,8 +148,9 @@ func sinkHandler(ctx *fh.RequestCtx) { } func Test_config(t *testing.T) { - _, err := configLoad("config.yml") + cfg, err := configLoad("config.yml") assert.Nil(t, err) + assert.Equal(t, 10, cfg.Concurrency) } func Test_handle(t *testing.T) {