From 8e9278c355e47c81a17b88e71a78dbc51a26b638 Mon Sep 17 00:00:00 2001 From: Eugene Ma Date: Mon, 21 Jun 2021 14:42:46 -0700 Subject: [PATCH 1/3] add Concurrency flag to fasthttp server --- config.go | 1 + config.yml | 1 + processor.go | 2 ++ 3 files changed, 4 insertions(+) diff --git a/config.go b/config.go index 615e18b..9506875 100644 --- a/config.go +++ b/config.go @@ -18,6 +18,7 @@ type config struct { LogLevel string `yaml:"log_level"` Timeout time.Duration TimeoutShutdown time.Duration `yaml:"timeout_shutdown"` + Concurrency int Tenant struct { Label string diff --git a/config.yml b/config.yml index f7aec5f..a6bb765 100644 --- a/config.yml +++ b/config.yml @@ -5,6 +5,7 @@ target: http://127.0.0.1:9091/receive log_level: debug timeout: 10s timeout_shutdown: 0s +concurrency: 10 tenant: label: tenant diff --git a/processor.go b/processor.go index b81abc1..8adf937 100644 --- a/processor.go +++ b/processor.go @@ -50,6 +50,8 @@ func newProcessor(c config) *processor { ReadTimeout: c.Timeout, WriteTimeout: c.Timeout, IdleTimeout: 60 * time.Second, + + Concurrency: c.Concurrency, } p.cli = &fh.Client{ From a4dfc5331a5ac36220e1094d05949b23caf7ac0b Mon Sep 17 00:00:00 2001 From: Eugene Ma Date: Tue, 22 Jun 2021 09:50:40 -0700 Subject: [PATCH 2/3] set default Concurrency to 512 --- config.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/config.go b/config.go index 9506875..565579e 100644 --- a/config.go +++ b/config.go @@ -42,6 +42,10 @@ func configParse(b []byte) (*config, error) { cfg.Timeout = 10 * time.Second } + if cfg.Concurrency == 0 { + cfg.Concurrency = 512 + } + if cfg.Tenant.Header == "" { cfg.Tenant.Header = "X-Scope-OrgID" } From c51c4351aca09d1349f62d5f8b725561d6c47d37 Mon Sep 17 00:00:00 2001 From: Eugene Ma Date: Tue, 22 Jun 2021 09:50:45 -0700 Subject: [PATCH 3/3] add test --- processor_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/processor_test.go b/processor_test.go index 0268619..bd5fedc 100644 --- a/processor_test.go +++ b/processor_test.go @@ -148,8 +148,9 @@ func sinkHandler(ctx *fh.RequestCtx) { } func Test_config(t *testing.T) { - _, err := configLoad("config.yml") + cfg, err := configLoad("config.yml") assert.Nil(t, err) + assert.Equal(t, 10, cfg.Concurrency) } func Test_handle(t *testing.T) {