Skip to content

Commit

Permalink
Merge pull request #461 from OdyseeTeam/improve-publish-concurrency
Browse files Browse the repository at this point in the history
Add publish processing time metrics, increase concurrency
  • Loading branch information
anbsky committed Oct 28, 2022
2 parents b1d58e0 + e89a593 commit 79615f2
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 22 deletions.
21 changes: 18 additions & 3 deletions app/geopublish/forklift/carriage.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,17 @@ func (c *Carriage) ProcessTask(ctx context.Context, t *asynq.Task) error {
}

func (c *Carriage) Process(p UploadProcessPayload) (*UploadProcessResult, error) {
var t time.Time
r := &UploadProcessResult{UploadID: p.UploadID, UserID: p.UserID}
log := c.logger.With("upload_id", p.UploadID, "user_id", p.UserID)

uploader := c.store.Uploader()

t = time.Now()
info, err := c.analyzer.Analyze(context.Background(), p.Path)
metrics.ProcessingTime.WithLabelValues(metrics.LabelProcessingAnalyze).Observe(float64(time.Since(t)))
if info == nil {
metrics.ProcessingErrors.WithLabelValues(metrics.LabelProcessingAnalyze).Inc()
return r, err
}
log.Debug("stream analyzed", "info", info, "err", err)
Expand All @@ -137,20 +141,26 @@ func (c *Carriage) Process(p UploadProcessPayload) (*UploadProcessResult, error)
return r, err
}

t = time.Now()
stream, err := src.Split()
metrics.ProcessingTime.WithLabelValues(metrics.LabelProcessingBlobSplit).Observe(float64(time.Since(t)))
if err != nil {
metrics.ProcessingErrors.WithLabelValues(metrics.LabelProcessingBlobSplit).Inc()
return r, err
}
streamSource := stream.GetSource()
r.SDHash = hex.EncodeToString(streamSource.GetSdHash())
defer os.RemoveAll(path.Join(c.blobsPath, r.SDHash))

t = time.Now()
summary, err := uploader.Upload(src)
metrics.ProcessingTime.WithLabelValues(metrics.LabelProcessingReflection).Observe(float64(time.Since(t)))
if err != nil {
// The errors current uploader returns usually do not make sense to retry.
metrics.BlobUploadErrors.WithLabelValues(metrics.LabelFatal).Inc()
metrics.ProcessingErrors.WithLabelValues(metrics.LabelProcessingReflection).Inc()
return r, err
} else if summary.Err > 0 {
metrics.BlobUploadErrors.WithLabelValues(metrics.LabelCommon).Inc()
metrics.ProcessingErrors.WithLabelValues(metrics.LabelProcessingReflection).Inc()
r.Retry = true
return r, fmt.Errorf("%w (%v)", ErrUpload, summary.Err)
}
Expand Down Expand Up @@ -190,20 +200,25 @@ func (c *Carriage) Process(p UploadProcessPayload) (*UploadProcessResult, error)
delete(pp, "file_path")

log.Debug("sending request", "method", p.Request.Method, "params", p.Request)
t = time.Now()
res, err := caller.Call(context.Background(), p.Request)
metrics.ProcessingTime.WithLabelValues(metrics.LabelProcessingQuery).Observe(float64(time.Since(t)))

metrics.QueriesSent.Inc()
if err != nil {
metrics.ProcessingErrors.WithLabelValues(metrics.LabelProcessingQuery).Inc()
metrics.QueriesFailed.Inc()
r.Retry = true
return r, fmt.Errorf("error calling sdk: %w", err)
}
metrics.QueriesCompleted.Inc()

r.Response = res
if res.Error != nil {
metrics.ProcessingErrors.WithLabelValues(metrics.LabelProcessingQuery).Inc()
metrics.QueriesErrored.Inc()
return r, fmt.Errorf("sdk returned an error: %s", res.Error.Message)
}
metrics.QueriesCompleted.Inc()
log.Info("stream processed", "method", p.Request.Method, "params", p.Request)
return r, nil
}
Expand Down
21 changes: 16 additions & 5 deletions app/geopublish/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import (
const ns = "geopublish"
const LabelFatal = "fatal"
const LabelCommon = "common"
const LabelProcessingTotal = "total"
const LabelProcessingAnalyze = "analyze"
const LabelProcessingBlobSplit = "blob_split"
const LabelProcessingReflection = "reflection"
const LabelProcessingQuery = "query"

var (
UploadsCreated = prometheus.NewCounter(prometheus.CounterOpts{
Expand Down Expand Up @@ -45,21 +50,27 @@ var (
Namespace: ns,
Name: "queries_errored",
})
BlobUploadErrors = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Name: "blob_upload_errors",
}, []string{"type"})

QueueTasks = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Name: "queue_tasks",
}, []string{"status"})

ProcessingTime = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: ns,
Name: "processing_time",
Buckets: []float64{1, 5, 30, 60, 120, 300, 600},
}, []string{"stage"})
ProcessingErrors = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Name: "processing_errors",
}, []string{"stage"})
)

func RegisterMetrics() {
prometheus.MustRegister(
UploadsCreated, UploadsProcessed, UploadsCanceled, UploadsFailed,
QueriesSent, QueriesCompleted, QueriesFailed, QueriesErrored,
BlobUploadErrors, QueueTasks,
QueueTasks, ProcessingTime, ProcessingErrors,
)
}
2 changes: 1 addition & 1 deletion app/geopublish/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func InstallRoutes(router *mux.Router, userGetter UserGetter, uploadPath, urlPre
path.Join(uploadPath, "blobs"),
config.GetReflectorUpstream(),
asynqRedisOpts,
forklift.WithConcurrency(3),
forklift.WithConcurrency(config.GetGeoPublishConcurrency()),
// forklift.WithLogger(logger),
)
if err != nil {
Expand Down
32 changes: 19 additions & 13 deletions apps/lbrytv/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,27 +48,27 @@ func ProjectRoot() string {
return filepath.Dir(ex)
}

// IsProduction is true if we are running in a production environment
// IsProduction is true if we are running in a production environment.
func IsProduction() bool {
return Config.IsProduction()
}

// GetInternalAPIHost returns the address of internal-api server
// GetInternalAPIHost returns the address of internal-api server.
func GetInternalAPIHost() string {
return Config.Viper.GetString("InternalAPIHost")
}

// GetOauthProviderURL returns the address of OAuth provider
// GetOauthProviderURL returns the address of OAuth provider.
func GetOauthProviderURL() string {
return Config.Viper.GetStringMapString("oauth")["providerurl"]
}

// GetOauthClientID returns the address of OAuth client ID
// GetOauthClientID returns the address of OAuth client ID.
func GetOauthClientID() string {
return Config.Viper.GetStringMapString("oauth")["clientid"]
}

// GetOauthTokenURL returns the address of OAuth token retrieval endpoint
// GetOauthTokenURL returns the address of OAuth token retrieval endpoint.
func GetOauthTokenURL() string {
cfg := Config.Viper.GetStringMapString("oauth")
return cfg["providerurl"] + cfg["tokenpath"]
Expand All @@ -92,12 +92,12 @@ func GetAsynqRedisOpts() (asynq.RedisConnOpt, error) {
return redisOpts, nil
}

// GetDatabase returns postgresql database server connection config
// GetDatabase returns postgresql database server connection config.
func GetDatabase() cfg.DBConfig {
return Config.GetDatabase()
}

// GetSentryDSN returns sentry.io service DSN
// GetSentryDSN returns sentry.io service DSN.
func GetSentryDSN() string {
return Config.Viper.GetString("SentryDSN")
}
Expand All @@ -114,32 +114,38 @@ func GetGeoPublishSourceDir() string {
return Config.Viper.GetString("GeoPublishSourceDir")
}

// ShouldLogResponses enables or disables full SDK responses logging
// GetGeoPublishConcurrency sets the number of simultaneously processed uploads per each API instance.
func GetGeoPublishConcurrency() int {
Config.Viper.SetDefault("GeoPublishConcurrency", 3)
return Config.Viper.GetInt("GeoPublishConcurrency")
}

// ShouldLogResponses enables or disables full SDK responses logging. Produces a lot of logging, use for debugging only.
func ShouldLogResponses() bool {
return Config.Viper.GetBool("ShouldLogResponses")
}

// GetPaidTokenPrivKey returns absolute path to the private RSA key for generating paid tokens
// GetPaidTokenPrivKey returns absolute path to the private RSA key for generating paid tokens.
func GetPaidTokenPrivKey() string {
return Config.Viper.GetString("PaidTokenPrivKey")
}

// GetStreamsV5 returns config map for streams endpoint v5
// GetStreamsV5 returns config map for v5 streams endpoint.
func GetStreamsV5() map[string]string {
return Config.Viper.GetStringMapString("StreamsV5")
}

// GetStreamsV5 returns config map for streams endpoint v5
// GetReflectorUpstream returns config map for publish reflector server.
func GetReflectorUpstream() map[string]string {
return Config.Viper.GetStringMapString("ReflectorUpstream")
}

// GetAddress determines address to bind http API server to
// GetAddress sets API HTTP binding address.
func GetAddress() string {
return Config.Viper.GetString("Address")
}

// GetLbrynetServers returns the names/addresses of every SDK server
// GetLbrynetServers returns the names/addresses of every SDK server.
func GetLbrynetServers() map[string]string {
if Config.Viper.GetString(deprecatedLbrynetSetting) != "" &&
len(Config.Viper.GetStringMapString(lbrynetServers)) > 0 {
Expand Down
2 changes: 2 additions & 0 deletions oapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ Database:
PublishSourceDir: ./rundata/storage/publish
GeoPublishSourceDir: ./rundata/storage/geopublish

GeoPublishConcurrency: 3

PaidTokenPrivKey: token_privkey.rsa

LbrynetXServer: http://sdk.lbry.tech:5279/api
Expand Down

0 comments on commit 79615f2

Please sign in to comment.