Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix sql conn #448

Merged
merged 5 commits into from
Sep 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions .github/workflows/test-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,8 @@ jobs:
release:
name: release
needs: test
# if: github.event.workflow_run.head_branch == 'master' || github.event.workflow_run.head_branch == 'dev'
if: github.ref == 'refs/heads/master' || github.ref == 'refs/heads/dev'
runs-on: ubuntu-latest
# env:
# GOROOT: /usr/local/go

steps:
- name: Checkout
Expand Down Expand Up @@ -111,7 +108,7 @@ jobs:

- name: Build odysee-api
if: steps.release.outputs.app == 'api'
run: make api
run: make oapi

- name: Build and push odysee-api docker image
if: steps.release.outputs.app == 'api'
Expand Down
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
.idea/
/lbrytv
/assets/bindata.go
/assets/static/app
/dist/
Expand Down
6 changes: 3 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ RUN apk update && \
COPY --from=gather /build/ffprobe /usr/local/bin/

WORKDIR /app
COPY dist/linux_amd64/lbrytv /app
RUN chmod a+x /app/lbrytv
COPY ./docker/lbrytv.yml ./config/lbrytv.yml
COPY dist/linux_amd64/oapi /app
RUN chmod a+x /app/oapi
COPY ./docker/oapi.yml ./config/oapi.yml
COPY ./docker/launcher.sh ./

CMD ["./launcher.sh"]
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ get_sqlboiler:
models: get_sqlboiler
sqlboiler --add-global-variants --wipe psql --no-context --no-tests

.PHONY: api
api:
.PHONY: oapi
oapi:
CGO_ENABLED=0 GOARCH=amd64 GOOS=linux go build \
-o dist/linux_amd64/lbrytv \
-o dist/linux_amd64/oapi \
-ldflags "-s -w -X github.com/OdyseeTeam/odysee-api/version.version=$(api_version) -X github.com/OdyseeTeam/odysee-api/version.commit=$(git_hash) -X github.com/OdyseeTeam/odysee-api/apps/version.buildDate=$(date)" \
.

Expand Down
63 changes: 49 additions & 14 deletions api/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package api

import (
"net/http"
"net/http/pprof"
"strings"
"sync"
"time"

"github.com/OdyseeTeam/odysee-api/app/auth"
"github.com/OdyseeTeam/odysee-api/app/geopublish"
gpmetrics "github.com/OdyseeTeam/odysee-api/app/geopublish/metrics"
"github.com/OdyseeTeam/odysee-api/app/proxy"
"github.com/OdyseeTeam/odysee-api/app/publish"
"github.com/OdyseeTeam/odysee-api/app/query/cache"
Expand Down Expand Up @@ -39,11 +41,19 @@ var logger = monitor.NewModuleLogger("api")

var onceMetrics sync.Once

type RoutesOptions struct {
EnableV3Publish bool
EnableProfiling bool
}

// emptyHandler can be used when you just need to let middlewares do their job and no actual response is needed.
func emptyHandler(_ http.ResponseWriter, _ *http.Request) {}

// InstallRoutes sets up global API handlers
func InstallRoutes(r *mux.Router, sdkRouter *sdkrouter.Router) {
func InstallRoutes(r *mux.Router, sdkRouter *sdkrouter.Router, opts *RoutesOptions) {
if opts == nil {
opts = &RoutesOptions{}
}
uploadPath := config.GetPublishSourceDir()

upHandler := &publish.Handler{UploadPath: uploadPath}
Expand Down Expand Up @@ -77,6 +87,22 @@ func InstallRoutes(r *mux.Router, sdkRouter *sdkrouter.Router) {
internalRouter := r.PathPrefix("/internal").Subrouter()
internalRouter.Handle("/metrics", promhttp.Handler())

if opts.EnableProfiling {
pr := internalRouter.PathPrefix("/pprof").Subrouter()
pr.HandleFunc("/symbol", pprof.Symbol).Methods(http.MethodPost)
pr.HandleFunc("/", pprof.Index)
pr.HandleFunc("/cmdline", pprof.Cmdline)
pr.HandleFunc("/profile", pprof.Profile)
pr.HandleFunc("/symbol", pprof.Symbol)
pr.HandleFunc("/trace", pprof.Trace)
pr.Handle("/allocs", pprof.Handler("allocs"))
pr.Handle("/block", pprof.Handler("block"))
pr.Handle("/goroutine", pprof.Handler("goroutine"))
pr.Handle("/heap", pprof.Handler("heap"))
pr.Handle("/mutex", pprof.Handler("mutex"))
pr.Handle("/threadcreate", pprof.Handler("threadcreate"))
}

v2Router := r.PathPrefix("/api/v2").Subrouter()
v2Router.Use(defaultMiddlewares(oauthAuther, legacyProvider, sdkRouter))
status.InstallRoutes(v2Router)
Expand Down Expand Up @@ -110,12 +136,6 @@ func InstallRoutes(r *mux.Router, sdkRouter *sdkrouter.Router) {
logger.Log().WithError(err).Fatal(err)
}

onceMetrics.Do(func() {
redislocker.RegisterMetrics()
collector := prometheuscollector.New(tusHandler.Metrics)
prometheus.MustRegister(collector)
})

tusRouter := v2Router.PathPrefix("/publish").Subrouter()
tusRouter.Use(tusHandler.Middleware)
tusRouter.HandleFunc("/", tusHandler.PostFile).Methods(http.MethodPost).Name("tus_publish")
Expand All @@ -125,14 +145,29 @@ func InstallRoutes(r *mux.Router, sdkRouter *sdkrouter.Router) {
tusRouter.HandleFunc("/{id}/notify", tusHandler.Notify).Methods(http.MethodPost)
tusRouter.PathPrefix("/").HandlerFunc(emptyHandler).Methods(http.MethodOptions)

v3Router := r.PathPrefix("/api/v3").Subrouter()
v3Router.Use(defaultMiddlewares(oauthAuther, legacyProvider, sdkRouter))
ug := auth.NewUniversalUserGetter(oauthAuther, legacyProvider, zapadapter.NewKV(nil))
gPath := config.GetGeoPublishSourceDir()
err = geopublish.InstallRoutes(v3Router.PathPrefix("/publish").Subrouter(), ug, gPath, "/api/v3/publish/")
if err != nil {
panic(err)
var v3Handler *geopublish.Handler
if opts.EnableV3Publish {
v3Router := r.PathPrefix("/api/v3").Subrouter()
v3Router.Use(defaultMiddlewares(oauthAuther, legacyProvider, sdkRouter))
ug := auth.NewUniversalUserGetter(oauthAuther, legacyProvider, zapadapter.NewKV(nil))
gPath := config.GetGeoPublishSourceDir()
v3Handler, err = geopublish.InstallRoutes(v3Router.PathPrefix("/publish").Subrouter(), ug, gPath, "/api/v3/publish/")
if err != nil {
panic(err)
}
}

onceMetrics.Do(func() {
gpmetrics.RegisterMetrics()
redislocker.RegisterMetrics()
if !opts.EnableV3Publish {
tus2metrics := prometheuscollector.New(tusHandler.Metrics)
prometheus.MustRegister(tus2metrics)
} else {
tus3metrics := prometheuscollector.New(v3Handler.Metrics)
prometheus.MustRegister(tus3metrics)
}
})
}

func defaultMiddlewares(oauthAuther auth.Authenticator, legacyProvider auth.Provider, router *sdkrouter.Router) mux.MiddlewareFunc {
Expand Down
4 changes: 2 additions & 2 deletions api/routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestRoutesPublish(t *testing.T) {
req := publish.CreatePublishRequest(t, []byte("test file"))
rr := httptest.NewRecorder()

InstallRoutes(r, rt)
InstallRoutes(r, rt, nil)
r.ServeHTTP(rr, req)

assert.Equal(t, http.StatusOK, rr.Code)
Expand All @@ -45,7 +45,7 @@ func TestCORS(t *testing.T) {
config.Override("CORSDomains", allowedDomains)
defer config.RestoreOverridden()

InstallRoutes(r, rt)
InstallRoutes(r, rt, nil)

cases := map[string]string{
"https://odysee.com": "https://odysee.com",
Expand Down
20 changes: 20 additions & 0 deletions app/geopublish/forklift/forklift.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"time"

"github.com/OdyseeTeam/odysee-api/app/geopublish/metrics"
"github.com/lbryio/transcoder/pkg/logging"
"github.com/lbryio/transcoder/pkg/logging/zapadapter"

Expand Down Expand Up @@ -120,6 +121,25 @@ func (f *Forklift) Start() error {
// if err := f.asynqServer.Run(mux); err != nil {
// return fmt.Errorf("could not run server: %w", err)
// }

go func() {
t := time.NewTicker(1 * time.Second)
for {
select {
case <-t.C:
q, err := f.asynqInspector.GetQueueInfo("default")
if err != nil {
continue
}
metrics.QueueTasks.WithLabelValues("active").Set(float64(q.Active))
metrics.QueueTasks.WithLabelValues("completed").Set(float64(q.Completed))
metrics.QueueTasks.WithLabelValues("pending").Set(float64(q.Pending))
metrics.QueueTasks.WithLabelValues("failed").Set(float64(q.Failed))
case <-f.stopChan:
return
}
}
}()
if err := f.asynqServer.Start(mux); err != nil {
return fmt.Errorf("could not start asynq server: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion app/geopublish/geopublish.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ import (
"github.com/OdyseeTeam/odysee-api/internal/metrics"
"github.com/OdyseeTeam/odysee-api/internal/monitor"
"github.com/OdyseeTeam/odysee-api/models"
"github.com/volatiletech/sqlboiler/boil"

"github.com/gorilla/mux"
werrors "github.com/pkg/errors"
"github.com/sirupsen/logrus"
tusd "github.com/tus/tusd/pkg/handler"
"github.com/volatiletech/sqlboiler/boil"
"github.com/ybbus/jsonrpc"
)

Expand Down
7 changes: 6 additions & 1 deletion app/geopublish/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,17 @@ var (
Namespace: ns,
Name: "blob_upload_errors",
}, []string{"type"})

QueueTasks = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Name: "queue_tasks",
}, []string{"status"})
)

func RegisterMetrics() {
prometheus.MustRegister(
UploadsCreated, UploadsProcessed, UploadsCanceled, UploadsFailed,
QueriesSent, QueriesCompleted, QueriesFailed, QueriesErrored,
BlobUploadErrors,
BlobUploadErrors, QueueTasks,
)
}
16 changes: 8 additions & 8 deletions app/geopublish/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ import (
tushandler "github.com/tus/tusd/pkg/handler"
)

func InstallRoutes(router *mux.Router, userGetter UserGetter, uploadPath, urlPrefix string) error {
func InstallRoutes(router *mux.Router, userGetter UserGetter, uploadPath, urlPrefix string) (*Handler, error) {
redisOpts, err := config.GetRedisOpts()
if err != nil {
return fmt.Errorf("cannot get redis config: %w", err)
return nil, fmt.Errorf("cannot get redis config: %w", err)
}
asynqRedisOpts, err := config.GetAsynqRedisOpts()
if err != nil {
return fmt.Errorf("cannot get redis config: %w", err)
return nil, fmt.Errorf("cannot get redis config: %w", err)
}

composer := tushandler.NewStoreComposer()
Expand All @@ -37,16 +37,16 @@ func InstallRoutes(router *mux.Router, userGetter UserGetter, uploadPath, urlPre
// forklift.WithLogger(logger),
)
if err != nil {
return fmt.Errorf("cannot initialize forklift: %w", err)
return nil, fmt.Errorf("cannot initialize forklift: %w", err)
}
err = fl.Start()
if err != nil {
return fmt.Errorf("cannot start forklift: %w", err)
return nil, fmt.Errorf("cannot start forklift: %w", err)
}

locker, err := redislocker.New(redisOpts)
if err != nil {
return fmt.Errorf("cannot start redislocker: %w", err)
return nil, fmt.Errorf("cannot start redislocker: %w", err)
}
locker.UseIn(composer)

Expand All @@ -63,7 +63,7 @@ func InstallRoutes(router *mux.Router, userGetter UserGetter, uploadPath, urlPre
WithQueue(fl),
)
if err != nil {
return fmt.Errorf("cannot initialize tus handler: %w", err)
return nil, fmt.Errorf("cannot initialize tus handler: %w", err)
}

r := router
Expand All @@ -76,5 +76,5 @@ func InstallRoutes(router *mux.Router, userGetter UserGetter, uploadPath, urlPre
r.HandleFunc("/{id}/status", tusHandler.Status).Methods(http.MethodGet)
r.PathPrefix("/").HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) {}).Methods(http.MethodOptions)

return nil
return tusHandler, nil
}
32 changes: 13 additions & 19 deletions apps/lbrytv/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"os"
"path/filepath"
"sync"
"time"

cfg "github.com/OdyseeTeam/odysee-api/config"
Expand All @@ -17,18 +16,12 @@ import (
)

const (
lbrynetServers = "LbrynetServers"
deprecatedLbrynet = "Lbrynet"
configName = "lbrytv"
lbrynetServers = "LbrynetServers"
deprecatedLbrynetSetting = "Lbrynet"
configName = "oapi"
)

// overriddenValues stores overridden v values
// and is initialized as an empty map in the read method
var (
overriddenValues map[string]interface{}
once sync.Once
Config *cfg.ConfigWrapper
)
var Config *cfg.ConfigWrapper

func init() {
Config = cfg.ReadConfig(configName)
Expand All @@ -45,9 +38,6 @@ func init() {

c.Viper.SetDefault("Address", ":8080")
c.Viper.SetDefault("Host", "http://localhost:8080")
c.Viper.SetDefault("FreeContentURL", "http://localhost:8080/content/")
c.Viper.SetDefault("ReflectorTimeout", int64(10))
c.Viper.SetDefault("RefractorTimeout", int64(10))
}

func ProjectRoot() string {
Expand Down Expand Up @@ -149,17 +139,17 @@ func GetAddress() string {
return Config.Viper.GetString("Address")
}

//GetLbrynetServers returns the names/addresses of every SDK server
// GetLbrynetServers returns the names/addresses of every SDK server
func GetLbrynetServers() map[string]string {
if Config.Viper.GetString(deprecatedLbrynet) != "" &&
if Config.Viper.GetString(deprecatedLbrynetSetting) != "" &&
len(Config.Viper.GetStringMapString(lbrynetServers)) > 0 {
logrus.Panicf("Both %s and %s are set. This is a highlander situation...there can be only 1.", deprecatedLbrynet, lbrynetServers)
logrus.Panicf("Both %s and %s are set. This is a highlander situation...there can be only 1.", deprecatedLbrynetSetting, lbrynetServers)
}

if len(Config.Viper.GetStringMapString(lbrynetServers)) > 0 {
return Config.Viper.GetStringMapString(lbrynetServers)
} else if Config.Viper.GetString(deprecatedLbrynet) != "" {
return map[string]string{"sdk": Config.Viper.GetString(deprecatedLbrynet)}
} else if Config.Viper.GetString(deprecatedLbrynetSetting) != "" {
return map[string]string{"sdk": Config.Viper.GetString(deprecatedLbrynetSetting)}
} else {
servers, err := models.LbrynetServers().AllG()
if err != nil {
Expand Down Expand Up @@ -199,6 +189,10 @@ func GetRPCTimeout(method string) *time.Duration {
return nil
}

func GetProfiling() bool {
return Config.Viper.GetBool("Profiling")
}

func Override(key string, value interface{}) {
Config.Override(key, value)
}
Expand Down
4 changes: 2 additions & 2 deletions apps/lbrytv/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ func TestGetLbrynetServers(t *testing.T) {
}

func TestGetLbrynetServersNoDB(t *testing.T) {
if Config.Viper.GetString(deprecatedLbrynet) != "" &&
if Config.Viper.GetString(deprecatedLbrynetSetting) != "" &&
len(Config.Viper.GetStringMapString(lbrynetServers)) > 0 {
t.Fatalf("Both %s and %s are set. This is a highlander situation...there can be only one.", deprecatedLbrynet, lbrynetServers)
t.Fatalf("Both %s and %s are set. This is a highlander situation...there can be only one.", deprecatedLbrynetSetting, lbrynetServers)
}
}

Expand Down
Loading