Skip to content

Commit

Permalink
Merge pull request #448 from OdyseeTeam/fix-sql-conn
Browse files Browse the repository at this point in the history
Fix sql conn
  • Loading branch information
anbsky committed Sep 28, 2022
2 parents 07073a1 + fc7b28a commit 901d5b1
Show file tree
Hide file tree
Showing 24 changed files with 132 additions and 77 deletions.
5 changes: 1 addition & 4 deletions .github/workflows/test-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,8 @@ jobs:
release:
name: release
needs: test
# if: github.event.workflow_run.head_branch == 'master' || github.event.workflow_run.head_branch == 'dev'
if: github.ref == 'refs/heads/master' || github.ref == 'refs/heads/dev'
runs-on: ubuntu-latest
# env:
# GOROOT: /usr/local/go

steps:
- name: Checkout
Expand Down Expand Up @@ -111,7 +108,7 @@ jobs:

- name: Build odysee-api
if: steps.release.outputs.app == 'api'
run: make api
run: make oapi

- name: Build and push odysee-api docker image
if: steps.release.outputs.app == 'api'
Expand Down
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
.idea/
/lbrytv
/assets/bindata.go
/assets/static/app
/dist/
Expand Down
6 changes: 3 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ RUN apk update && \
COPY --from=gather /build/ffprobe /usr/local/bin/

WORKDIR /app
COPY dist/linux_amd64/lbrytv /app
RUN chmod a+x /app/lbrytv
COPY ./docker/lbrytv.yml ./config/lbrytv.yml
COPY dist/linux_amd64/oapi /app
RUN chmod a+x /app/oapi
COPY ./docker/oapi.yml ./config/oapi.yml
COPY ./docker/launcher.sh ./

CMD ["./launcher.sh"]
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ get_sqlboiler:
models: get_sqlboiler
sqlboiler --add-global-variants --wipe psql --no-context --no-tests

.PHONY: api
api:
.PHONY: oapi
oapi:
CGO_ENABLED=0 GOARCH=amd64 GOOS=linux go build \
-o dist/linux_amd64/lbrytv \
-o dist/linux_amd64/oapi \
-ldflags "-s -w -X github.com/OdyseeTeam/odysee-api/version.version=$(api_version) -X github.com/OdyseeTeam/odysee-api/version.commit=$(git_hash) -X github.com/OdyseeTeam/odysee-api/apps/version.buildDate=$(date)" \
.

Expand Down
63 changes: 49 additions & 14 deletions api/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package api

import (
"net/http"
"net/http/pprof"
"strings"
"sync"
"time"

"github.com/OdyseeTeam/odysee-api/app/auth"
"github.com/OdyseeTeam/odysee-api/app/geopublish"
gpmetrics "github.com/OdyseeTeam/odysee-api/app/geopublish/metrics"
"github.com/OdyseeTeam/odysee-api/app/proxy"
"github.com/OdyseeTeam/odysee-api/app/publish"
"github.com/OdyseeTeam/odysee-api/app/query/cache"
Expand Down Expand Up @@ -39,11 +41,19 @@ var logger = monitor.NewModuleLogger("api")

var onceMetrics sync.Once

type RoutesOptions struct {
EnableV3Publish bool
EnableProfiling bool
}

// emptyHandler can be used when you just need to let middlewares do their job and no actual response is needed.
func emptyHandler(_ http.ResponseWriter, _ *http.Request) {}

// InstallRoutes sets up global API handlers
func InstallRoutes(r *mux.Router, sdkRouter *sdkrouter.Router) {
func InstallRoutes(r *mux.Router, sdkRouter *sdkrouter.Router, opts *RoutesOptions) {
if opts == nil {
opts = &RoutesOptions{}
}
uploadPath := config.GetPublishSourceDir()

upHandler := &publish.Handler{UploadPath: uploadPath}
Expand Down Expand Up @@ -77,6 +87,22 @@ func InstallRoutes(r *mux.Router, sdkRouter *sdkrouter.Router) {
internalRouter := r.PathPrefix("/internal").Subrouter()
internalRouter.Handle("/metrics", promhttp.Handler())

if opts.EnableProfiling {
pr := internalRouter.PathPrefix("/pprof").Subrouter()
pr.HandleFunc("/symbol", pprof.Symbol).Methods(http.MethodPost)
pr.HandleFunc("/", pprof.Index)
pr.HandleFunc("/cmdline", pprof.Cmdline)
pr.HandleFunc("/profile", pprof.Profile)
pr.HandleFunc("/symbol", pprof.Symbol)
pr.HandleFunc("/trace", pprof.Trace)
pr.Handle("/allocs", pprof.Handler("allocs"))
pr.Handle("/block", pprof.Handler("block"))
pr.Handle("/goroutine", pprof.Handler("goroutine"))
pr.Handle("/heap", pprof.Handler("heap"))
pr.Handle("/mutex", pprof.Handler("mutex"))
pr.Handle("/threadcreate", pprof.Handler("threadcreate"))
}

v2Router := r.PathPrefix("/api/v2").Subrouter()
v2Router.Use(defaultMiddlewares(oauthAuther, legacyProvider, sdkRouter))
status.InstallRoutes(v2Router)
Expand Down Expand Up @@ -110,12 +136,6 @@ func InstallRoutes(r *mux.Router, sdkRouter *sdkrouter.Router) {
logger.Log().WithError(err).Fatal(err)
}

onceMetrics.Do(func() {
redislocker.RegisterMetrics()
collector := prometheuscollector.New(tusHandler.Metrics)
prometheus.MustRegister(collector)
})

tusRouter := v2Router.PathPrefix("/publish").Subrouter()
tusRouter.Use(tusHandler.Middleware)
tusRouter.HandleFunc("/", tusHandler.PostFile).Methods(http.MethodPost).Name("tus_publish")
Expand All @@ -125,14 +145,29 @@ func InstallRoutes(r *mux.Router, sdkRouter *sdkrouter.Router) {
tusRouter.HandleFunc("/{id}/notify", tusHandler.Notify).Methods(http.MethodPost)
tusRouter.PathPrefix("/").HandlerFunc(emptyHandler).Methods(http.MethodOptions)

v3Router := r.PathPrefix("/api/v3").Subrouter()
v3Router.Use(defaultMiddlewares(oauthAuther, legacyProvider, sdkRouter))
ug := auth.NewUniversalUserGetter(oauthAuther, legacyProvider, zapadapter.NewKV(nil))
gPath := config.GetGeoPublishSourceDir()
err = geopublish.InstallRoutes(v3Router.PathPrefix("/publish").Subrouter(), ug, gPath, "/api/v3/publish/")
if err != nil {
panic(err)
var v3Handler *geopublish.Handler
if opts.EnableV3Publish {
v3Router := r.PathPrefix("/api/v3").Subrouter()
v3Router.Use(defaultMiddlewares(oauthAuther, legacyProvider, sdkRouter))
ug := auth.NewUniversalUserGetter(oauthAuther, legacyProvider, zapadapter.NewKV(nil))
gPath := config.GetGeoPublishSourceDir()
v3Handler, err = geopublish.InstallRoutes(v3Router.PathPrefix("/publish").Subrouter(), ug, gPath, "/api/v3/publish/")
if err != nil {
panic(err)
}
}

onceMetrics.Do(func() {
gpmetrics.RegisterMetrics()
redislocker.RegisterMetrics()
if !opts.EnableV3Publish {
tus2metrics := prometheuscollector.New(tusHandler.Metrics)
prometheus.MustRegister(tus2metrics)
} else {
tus3metrics := prometheuscollector.New(v3Handler.Metrics)
prometheus.MustRegister(tus3metrics)
}
})
}

func defaultMiddlewares(oauthAuther auth.Authenticator, legacyProvider auth.Provider, router *sdkrouter.Router) mux.MiddlewareFunc {
Expand Down
4 changes: 2 additions & 2 deletions api/routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestRoutesPublish(t *testing.T) {
req := publish.CreatePublishRequest(t, []byte("test file"))
rr := httptest.NewRecorder()

InstallRoutes(r, rt)
InstallRoutes(r, rt, nil)
r.ServeHTTP(rr, req)

assert.Equal(t, http.StatusOK, rr.Code)
Expand All @@ -45,7 +45,7 @@ func TestCORS(t *testing.T) {
config.Override("CORSDomains", allowedDomains)
defer config.RestoreOverridden()

InstallRoutes(r, rt)
InstallRoutes(r, rt, nil)

cases := map[string]string{
"https://odysee.com": "https://odysee.com",
Expand Down
20 changes: 20 additions & 0 deletions app/geopublish/forklift/forklift.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"time"

"github.com/OdyseeTeam/odysee-api/app/geopublish/metrics"
"github.com/lbryio/transcoder/pkg/logging"
"github.com/lbryio/transcoder/pkg/logging/zapadapter"

Expand Down Expand Up @@ -120,6 +121,25 @@ func (f *Forklift) Start() error {
// if err := f.asynqServer.Run(mux); err != nil {
// return fmt.Errorf("could not run server: %w", err)
// }

go func() {
t := time.NewTicker(1 * time.Second)
for {
select {
case <-t.C:
q, err := f.asynqInspector.GetQueueInfo("default")
if err != nil {
continue
}
metrics.QueueTasks.WithLabelValues("active").Set(float64(q.Active))
metrics.QueueTasks.WithLabelValues("completed").Set(float64(q.Completed))
metrics.QueueTasks.WithLabelValues("pending").Set(float64(q.Pending))
metrics.QueueTasks.WithLabelValues("failed").Set(float64(q.Failed))
case <-f.stopChan:
return
}
}
}()
if err := f.asynqServer.Start(mux); err != nil {
return fmt.Errorf("could not start asynq server: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion app/geopublish/geopublish.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ import (
"github.com/OdyseeTeam/odysee-api/internal/metrics"
"github.com/OdyseeTeam/odysee-api/internal/monitor"
"github.com/OdyseeTeam/odysee-api/models"
"github.com/volatiletech/sqlboiler/boil"

"github.com/gorilla/mux"
werrors "github.com/pkg/errors"
"github.com/sirupsen/logrus"
tusd "github.com/tus/tusd/pkg/handler"
"github.com/volatiletech/sqlboiler/boil"
"github.com/ybbus/jsonrpc"
)

Expand Down
7 changes: 6 additions & 1 deletion app/geopublish/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,17 @@ var (
Namespace: ns,
Name: "blob_upload_errors",
}, []string{"type"})

QueueTasks = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Name: "queue_tasks",
}, []string{"status"})
)

func RegisterMetrics() {
prometheus.MustRegister(
UploadsCreated, UploadsProcessed, UploadsCanceled, UploadsFailed,
QueriesSent, QueriesCompleted, QueriesFailed, QueriesErrored,
BlobUploadErrors,
BlobUploadErrors, QueueTasks,
)
}
16 changes: 8 additions & 8 deletions app/geopublish/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ import (
tushandler "github.com/tus/tusd/pkg/handler"
)

func InstallRoutes(router *mux.Router, userGetter UserGetter, uploadPath, urlPrefix string) error {
func InstallRoutes(router *mux.Router, userGetter UserGetter, uploadPath, urlPrefix string) (*Handler, error) {
redisOpts, err := config.GetRedisOpts()
if err != nil {
return fmt.Errorf("cannot get redis config: %w", err)
return nil, fmt.Errorf("cannot get redis config: %w", err)
}
asynqRedisOpts, err := config.GetAsynqRedisOpts()
if err != nil {
return fmt.Errorf("cannot get redis config: %w", err)
return nil, fmt.Errorf("cannot get redis config: %w", err)
}

composer := tushandler.NewStoreComposer()
Expand All @@ -37,16 +37,16 @@ func InstallRoutes(router *mux.Router, userGetter UserGetter, uploadPath, urlPre
// forklift.WithLogger(logger),
)
if err != nil {
return fmt.Errorf("cannot initialize forklift: %w", err)
return nil, fmt.Errorf("cannot initialize forklift: %w", err)
}
err = fl.Start()
if err != nil {
return fmt.Errorf("cannot start forklift: %w", err)
return nil, fmt.Errorf("cannot start forklift: %w", err)
}

locker, err := redislocker.New(redisOpts)
if err != nil {
return fmt.Errorf("cannot start redislocker: %w", err)
return nil, fmt.Errorf("cannot start redislocker: %w", err)
}
locker.UseIn(composer)

Expand All @@ -63,7 +63,7 @@ func InstallRoutes(router *mux.Router, userGetter UserGetter, uploadPath, urlPre
WithQueue(fl),
)
if err != nil {
return fmt.Errorf("cannot initialize tus handler: %w", err)
return nil, fmt.Errorf("cannot initialize tus handler: %w", err)
}

r := router
Expand All @@ -76,5 +76,5 @@ func InstallRoutes(router *mux.Router, userGetter UserGetter, uploadPath, urlPre
r.HandleFunc("/{id}/status", tusHandler.Status).Methods(http.MethodGet)
r.PathPrefix("/").HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) {}).Methods(http.MethodOptions)

return nil
return tusHandler, nil
}
32 changes: 13 additions & 19 deletions apps/lbrytv/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"os"
"path/filepath"
"sync"
"time"

cfg "github.com/OdyseeTeam/odysee-api/config"
Expand All @@ -17,18 +16,12 @@ import (
)

const (
lbrynetServers = "LbrynetServers"
deprecatedLbrynet = "Lbrynet"
configName = "lbrytv"
lbrynetServers = "LbrynetServers"
deprecatedLbrynetSetting = "Lbrynet"
configName = "oapi"
)

// overriddenValues stores overridden v values
// and is initialized as an empty map in the read method
var (
overriddenValues map[string]interface{}
once sync.Once
Config *cfg.ConfigWrapper
)
var Config *cfg.ConfigWrapper

func init() {
Config = cfg.ReadConfig(configName)
Expand All @@ -45,9 +38,6 @@ func init() {

c.Viper.SetDefault("Address", ":8080")
c.Viper.SetDefault("Host", "http://localhost:8080")
c.Viper.SetDefault("FreeContentURL", "http://localhost:8080/content/")
c.Viper.SetDefault("ReflectorTimeout", int64(10))
c.Viper.SetDefault("RefractorTimeout", int64(10))
}

func ProjectRoot() string {
Expand Down Expand Up @@ -149,17 +139,17 @@ func GetAddress() string {
return Config.Viper.GetString("Address")
}

//GetLbrynetServers returns the names/addresses of every SDK server
// GetLbrynetServers returns the names/addresses of every SDK server
func GetLbrynetServers() map[string]string {
if Config.Viper.GetString(deprecatedLbrynet) != "" &&
if Config.Viper.GetString(deprecatedLbrynetSetting) != "" &&
len(Config.Viper.GetStringMapString(lbrynetServers)) > 0 {
logrus.Panicf("Both %s and %s are set. This is a highlander situation...there can be only 1.", deprecatedLbrynet, lbrynetServers)
logrus.Panicf("Both %s and %s are set. This is a highlander situation...there can be only 1.", deprecatedLbrynetSetting, lbrynetServers)
}

if len(Config.Viper.GetStringMapString(lbrynetServers)) > 0 {
return Config.Viper.GetStringMapString(lbrynetServers)
} else if Config.Viper.GetString(deprecatedLbrynet) != "" {
return map[string]string{"sdk": Config.Viper.GetString(deprecatedLbrynet)}
} else if Config.Viper.GetString(deprecatedLbrynetSetting) != "" {
return map[string]string{"sdk": Config.Viper.GetString(deprecatedLbrynetSetting)}
} else {
servers, err := models.LbrynetServers().AllG()
if err != nil {
Expand Down Expand Up @@ -199,6 +189,10 @@ func GetRPCTimeout(method string) *time.Duration {
return nil
}

func GetProfiling() bool {
return Config.Viper.GetBool("Profiling")
}

func Override(key string, value interface{}) {
Config.Override(key, value)
}
Expand Down
4 changes: 2 additions & 2 deletions apps/lbrytv/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ func TestGetLbrynetServers(t *testing.T) {
}

func TestGetLbrynetServersNoDB(t *testing.T) {
if Config.Viper.GetString(deprecatedLbrynet) != "" &&
if Config.Viper.GetString(deprecatedLbrynetSetting) != "" &&
len(Config.Viper.GetStringMapString(lbrynetServers)) > 0 {
t.Fatalf("Both %s and %s are set. This is a highlander situation...there can be only one.", deprecatedLbrynet, lbrynetServers)
t.Fatalf("Both %s and %s are set. This is a highlander situation...there can be only one.", deprecatedLbrynetSetting, lbrynetServers)
}
}

Expand Down
Loading

0 comments on commit 901d5b1

Please sign in to comment.