Skip to content

Commit

Permalink
Merge pull request #1 from airenas/several-tr-workers
Browse files Browse the repository at this point in the history
Implement several backend support using consul registry
  • Loading branch information
airenas committed Jan 27, 2023
2 parents 70d62dc + d6a9de2 commit 96d0356
Show file tree
Hide file tree
Showing 25 changed files with 1,169 additions and 159 deletions.
31 changes: 26 additions & 5 deletions cmd/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ import (
"github.com/airenas/async-api/pkg/miniofs"
"github.com/airenas/async-api/pkg/usage"
"github.com/airenas/go-app/pkg/goapp"
"github.com/airenas/roxy/internal/pkg/consul"
"github.com/airenas/roxy/internal/pkg/postgres"
"github.com/airenas/roxy/internal/pkg/transcriber"
"github.com/airenas/roxy/internal/pkg/utils"
"github.com/airenas/roxy/internal/pkg/worker"
"github.com/hashicorp/consul/api"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/labstack/gommon/color"
"github.com/vgarvardt/gue/v5"
Expand Down Expand Up @@ -62,23 +63,27 @@ func main() {

data.DB = db

data.Transcriber, err = transcriber.NewClient(cfg.GetString("transcriber.uploadUrl"),
cfg.GetString("transcriber.statusUrl"), cfg.GetString("transcriber.resultUrl"), cfg.GetString("transcriber.cleanUrl"))
transcribersProvider, err := consul.NewProvider(api.DefaultConfig(), defaultStr(cfg.GetString("worker.registryName"), "asr"))
if err != nil {
goapp.Log.Fatal().Err(err).Msg("can't init transcriber")
goapp.Log.Fatal().Err(err).Msg("can't init transcriber's provider")
}
data.TranscriberPr = transcribersProvider

data.UsageRestorer, err = usage.NewRestorer(cfg.GetString("doorman.URL"))
if err != nil {
goapp.Log.Fatal().Err(err).Msg("can't init usage restorer")
}
data.RetryDelay = defaultDur(cfg.GetDuration("worker.retryDelay"), time.Minute)

printBanner()

go utils.RunPerfEndpoint()

ctx, cancelFunc := context.WithCancel(context.Background())
// data.StopCtx = ctx
doneProviderCh, err := transcribersProvider.StartRegistryLoop(ctx, defaultDur(cfg.GetDuration("worker.checkRegistry"), time.Minute))
if err != nil {
goapp.Log.Fatal().Err(err).Msg("can't start consul checker")
}
doneCh, err := worker.StartWorkerService(ctx, data)
if err != nil {
goapp.Log.Fatal().Err(err).Msg("can't start worker service")
Expand All @@ -91,6 +96,8 @@ func main() {
goapp.Log.Info().Msg("Got exit signal")
case <-doneCh:
goapp.Log.Info().Msg("Service exit")
case <-doneProviderCh:
goapp.Log.Info().Msg("Consul checker exit")
}
cancelFunc()
select {
Expand All @@ -101,6 +108,20 @@ func main() {
}
}

func defaultDur(dur, d time.Duration) time.Duration {
if dur > 0 {
return dur
}
return d
}

func defaultStr(s1, d string) string {
if s1 != "" {
return s1
}
return d
}

var (
version = "DEV"
)
Expand Down
12 changes: 11 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/facebookgo/grace v0.0.0-20180706040059-75cf19382434
github.com/google/uuid v1.3.0
github.com/gorilla/websocket v1.5.0
github.com/hashicorp/consul/api v1.15.3
github.com/jackc/pgx/v5 v5.2.0
github.com/jordan-wright/email v4.0.1-0.20210109023952-943e75fe5223+incompatible
github.com/labstack/echo-contrib v0.13.0
Expand All @@ -23,6 +24,7 @@ require (
)

require (
github.com/armon/go-metrics v0.4.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
Expand All @@ -34,10 +36,17 @@ require (
github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect
github.com/facebookgo/stats v0.0.0-20151006221625-1b76add642e4 // indirect
github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-hclog v1.2.0 // indirect
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/hashicorp/serf v0.9.8 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
github.com/jackc/puddle/v2 v2.1.2 // indirect
Expand All @@ -50,6 +59,7 @@ require (
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/minio/md5-simd v1.1.2 // indirect
github.com/minio/sha256-simd v1.0.0 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
Expand All @@ -76,7 +86,7 @@ require (
go.opentelemetry.io/otel/metric v0.34.0 // indirect
go.opentelemetry.io/otel/trace v1.11.2 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/crypto v0.4.0 // indirect
golang.org/x/net v0.4.0 // indirect
golang.org/x/sync v0.1.0 // indirect
Expand Down
Loading

0 comments on commit 96d0356

Please sign in to comment.