Skip to content

Commit

Permalink
Make configurable registry service name
Browse files Browse the repository at this point in the history
  • Loading branch information
airenas committed Jan 27, 2023
1 parent d6d8072 commit d6a9de2
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 27 deletions.
27 changes: 17 additions & 10 deletions cmd/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func main() {

data.DB = db

transcribersProvider, err := consul.NewProvider(api.DefaultConfig())
transcribersProvider, err := consul.NewProvider(api.DefaultConfig(), defaultStr(cfg.GetString("worker.registryName"), "asr"))
if err != nil {
goapp.Log.Fatal().Err(err).Msg("can't init transcriber's provider")
}
Expand All @@ -73,21 +73,14 @@ func main() {
if err != nil {
goapp.Log.Fatal().Err(err).Msg("can't init usage restorer")
}
data.RetryDelay = cfg.GetDuration("worker.retryDelay")
if data.RetryDelay <= 0 {
data.RetryDelay = time.Minute
}
data.RetryDelay = defaultDur(cfg.GetDuration("worker.retryDelay"), time.Minute)

printBanner()

go utils.RunPerfEndpoint()

consulCheckInterval := cfg.GetDuration("worker.checkRegistry")
if consulCheckInterval <= 0 {
consulCheckInterval = time.Minute
}
ctx, cancelFunc := context.WithCancel(context.Background())
doneProviderCh, err := transcribersProvider.StartCheckLoop(ctx, consulCheckInterval)
doneProviderCh, err := transcribersProvider.StartRegistryLoop(ctx, defaultDur(cfg.GetDuration("worker.checkRegistry"), time.Minute))
if err != nil {
goapp.Log.Fatal().Err(err).Msg("can't start consul checker")
}
Expand Down Expand Up @@ -115,6 +108,20 @@ func main() {
}
}

func defaultDur(dur, d time.Duration) time.Duration {
if dur > 0 {
return dur
}
return d
}

func defaultStr(s1, d string) string {
if s1 != "" {
return s1
}
return d
}

var (
version = "DEV"
)
Expand Down
19 changes: 12 additions & 7 deletions internal/pkg/consul/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ const (
)

type Provider struct {
consul *api.Client
consul *api.Client
srvName string

lock *sync.RWMutex
trans []*trWrap
Expand All @@ -36,16 +37,20 @@ type trWrap struct {
}

// NewProvider creates consul service registrator
func NewProvider(cfg *api.Config) (*Provider, error) {
func NewProvider(cfg *api.Config, srvNameInConsul string) (*Provider, error) {
c, err := api.NewClient(cfg)
if err != nil {
return nil, err
}
return newProvider(c), nil
if srvNameInConsul == "" {
return nil, fmt.Errorf("no srv name")
}
return newProvider(c, srvNameInConsul), nil
}

func newProvider(c *api.Client) *Provider {
return &Provider{consul: c, lock: &sync.RWMutex{}, trans: make([]*trWrap, 0)}
func newProvider(c *api.Client, srvNameInConsul string) *Provider {
goapp.Log.Info().Str("service", srvNameInConsul).Msg("cfg: srv name in consul")
return &Provider{consul: c, srvName: srvNameInConsul, lock: &sync.RWMutex{}, trans: make([]*trWrap, 0)}
}

func (c *Provider) Get(srv string, allowNew bool) (tapi.Transcriber, string, error) {
Expand Down Expand Up @@ -77,7 +82,7 @@ func (c *Provider) Get(srv string, allowNew bool) (tapi.Transcriber, string, err
return nil, "", nil
}

func (c *Provider) StartCheckLoop(ctx context.Context, checkInterval time.Duration) (<-chan struct{}, error) {
func (c *Provider) StartRegistryLoop(ctx context.Context, checkInterval time.Duration) (<-chan struct{}, error) {
goapp.Log.Info().Msgf("Starting consul service check every %v", checkInterval)
res := make(chan struct{}, 2)
go func() {
Expand Down Expand Up @@ -110,7 +115,7 @@ func (c *Provider) serviceLoop(ctx context.Context, interval time.Duration) {
func (c *Provider) check(ctx context.Context) error {
ctxInt, cf := context.WithTimeout(ctx, time.Second*5)
defer cf()
srvs, _, err := c.consul.Health().Service("asr", "", true, (&api.QueryOptions{}).WithContext(ctxInt))
srvs, _, err := c.consul.Health().Service(c.srvName, "", true, (&api.QueryOptions{}).WithContext(ctxInt))
if err != nil {
return fmt.Errorf("can't invoke consul: %v", err)
}
Expand Down
20 changes: 10 additions & 10 deletions internal/pkg/consul/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

func Test_Get_empty(t *testing.T) {
p := newProvider(nil)
p := newProvider(nil, "")
tr, name, err := p.Get("olia", true)
assert.Nil(t, tr)
assert.Equal(t, "", name)
Expand All @@ -23,7 +23,7 @@ func Test_Get_empty(t *testing.T) {
}

func Test_Get_existing(t *testing.T) {
p := newProvider(nil)
p := newProvider(nil, "")
tr := &mocks.Transcriber{}
p.trans = append(p.trans, &trWrap{real: tr, srv: "olia"})
rtr, name, err := p.Get("olia", true)
Expand All @@ -45,7 +45,7 @@ func Test_Get_existing(t *testing.T) {
}

func Test_Get_by_name(t *testing.T) {
p := newProvider(nil)
p := newProvider(nil, "")
tr := &mocks.Transcriber{}
tr1 := &mocks.Transcriber{}
p.trans = append(p.trans, &trWrap{real: tr, srv: "olia"})
Expand All @@ -64,7 +64,7 @@ func Test_Get_by_name(t *testing.T) {
}

func Test_Get_round_robin(t *testing.T) {
p := newProvider(nil)
p := newProvider(nil, "")
tr := &mocks.Transcriber{}
tr1 := &mocks.Transcriber{}
p.trans = append(p.trans, &trWrap{real: tr, srv: "olia"})
Expand All @@ -83,21 +83,21 @@ func testAssertEqPtr(t *testing.T, tr, exp tapi.Transcriber) {
}

func TestProvider_updateSrv_no_meta(t *testing.T) {
p := newProvider(nil)
p := newProvider(nil, "")
err := p.updateSrv([]*api.ServiceEntry{{Service: &api.AgentService{Service: "olia", Port: 80, Address: "srv", Meta: map[string]string{}}}})
assert.NotNil(t, err)
}

func TestProvider_updateSrv_adds(t *testing.T) {
p := newProvider(nil)
p := newProvider(nil, "")
err := p.updateSrv([]*api.ServiceEntry{{Service: &api.AgentService{Service: "olia", Port: 80, Address: "srv",
Meta: map[string]string{uploadKey: "up", statusKey: "st", resultKey: "res", cleanKey: "cl"}}}})
assert.Nil(t, err)
assert.Equal(t, 1, len(p.trans))
}

func TestProvider_updateSrv_addsSame(t *testing.T) {
p := newProvider(nil)
p := newProvider(nil, "")
err := p.updateSrv([]*api.ServiceEntry{{Service: &api.AgentService{Service: "olia", Port: 80, Address: "srv",
Meta: map[string]string{uploadKey: "up", statusKey: "st", resultKey: "res", cleanKey: "cl"}}}})
assert.Nil(t, err)
Expand All @@ -111,7 +111,7 @@ func TestProvider_updateSrv_addsSame(t *testing.T) {
}

func TestProvider_updateSrv_updates(t *testing.T) {
p := newProvider(nil)
p := newProvider(nil, "")
err := p.updateSrv([]*api.ServiceEntry{{Service: &api.AgentService{Service: "olia", Port: 80, Address: "srv",
Meta: map[string]string{uploadKey: "up", statusKey: "st", resultKey: "res", cleanKey: "cl"}}}})
assert.Nil(t, err)
Expand All @@ -125,7 +125,7 @@ func TestProvider_updateSrv_updates(t *testing.T) {
}

func TestProvider_updateSrv_addsTwo(t *testing.T) {
p := newProvider(nil)
p := newProvider(nil, "")
err := p.updateSrv([]*api.ServiceEntry{{Service: &api.AgentService{Service: "olia", Port: 80, Address: "srv",
Meta: map[string]string{uploadKey: "up", statusKey: "st", resultKey: "res", cleanKey: "cl"}}}})
assert.Nil(t, err)
Expand All @@ -139,7 +139,7 @@ func TestProvider_updateSrv_addsTwo(t *testing.T) {
}

func TestProvider_updateSrv_drops(t *testing.T) {
p := newProvider(nil)
p := newProvider(nil, "")
err := p.updateSrv([]*api.ServiceEntry{{Service: &api.AgentService{Service: "olia", Port: 80, Address: "srv",
Meta: map[string]string{uploadKey: "up", statusKey: "st", resultKey: "res", cleanKey: "cl"}}},
{Service: &api.AgentService{Service: "olia", Port: 81, Address: "srv",
Expand Down

0 comments on commit d6a9de2

Please sign in to comment.