Skip to content

Commit

Permalink
Async publish (#439)
Browse files Browse the repository at this point in the history
* Upgrade sqlboiler

* Keep uploads data in the database

* Improve tus handler declaration

* Add preliminary e2e tests for publish

* Add user metadata to sentry context

* Add stream creation code

* Manual master merge

* Tidy up go.mod

* Implement publish v3

* Initialize reflector uploader per each upload

* Fix reflector config indentation

* Improve geopublish error output

* Improve forklift logging

* Fix livestream url signing

* Complete e2e publish v3 test

* Improve media metadata detection

* Fix docker lbrynet port in wait script

* Clean up failing tests

* Fix test wallet loading routine

* Add ffmpeg and extra variables go github actions

* Unload testing wallet before loading it

* Improve test wallet cleanup

* Fix carriage tests

* Add geopublish metrics

* Remove models tests

* Improve test wallet handling

* Alleviate test wallet race condition
  • Loading branch information
anbsky committed Sep 22, 2022
1 parent e027125 commit 348f447
Show file tree
Hide file tree
Showing 85 changed files with 7,932 additions and 4,253 deletions.
9 changes: 7 additions & 2 deletions .github/workflows/test-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,17 @@ jobs:
- name: Check running containers
run: docker ps -a

- name: Set up Go 1.17
- name: Set up Go 1.18
uses: actions/setup-go@v3
with:
go-version: '1.17.x'
go-version: '1.18.x'
id: go

- name: Run go vet
run: go vet ./...

- uses: FedericoCarboni/setup-ffmpeg@v1

- name: Prepare for tests
run: make prepare_test

Expand All @@ -50,6 +52,9 @@ jobs:
OAUTH_TEST_CLIENT_SECRET: ${{ secrets.OAUTH_TEST_CLIENT_SECRET }}
OAUTH_TEST_USERNAME: ${{ secrets.OAUTH_TEST_USERNAME }}
OAUTH_TEST_PASSWORD: ${{ secrets.OAUTH_TEST_PASSWORD }}
REAL_WALLET_PRIVATE_KEY: ${{ secrets.REAL_WALLET_PRIVATE_KEY }}
REAL_WALLET_PUBLIC_KEY: ${{ secrets.REAL_WALLET_PUBLIC_KEY }}
REFLECTOR_UPLINK: ${{ secrets.REFLECTOR_UPLINK }}
run: |
go install github.com/jandelgado/gcov2lcov@latest
go test -covermode=count -coverprofile=coverage.out ./... && gcov2lcov -infile=coverage.out -outfile=coverage.lcov
Expand Down
5 changes: 1 addition & 4 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@
/assets/bindata.go
/assets/static/app
/dist/
**/rice-box.go
coverage.*
packrd/
*packr.go
secrets.env
/token_privkey.rsa
**/dist/
**/docker-data/
**/rundata/
token_privkey.*
16 changes: 14 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,11 +1,23 @@
FROM alpine
# syntax=docker/dockerfile:1

FROM alpine:3.16 AS gather

WORKDIR /build

ADD https://johnvansickle.com/ffmpeg/builds/ffmpeg-git-amd64-static.tar.xz ./
RUN tar -xf ffmpeg-git-amd64-static.tar.xz && mv ffmpeg-*-static/ffprobe ffmpeg-*-static/ffmpeg ./

RUN chmod a+x ffmpeg ffprobe

FROM alpine:3.16 AS build
EXPOSE 8080
EXPOSE 2112

RUN apk update && \
apk add --no-cache \
openssh-keygen

COPY --from=gather /build/ffprobe /usr/local/bin/

WORKDIR /app
COPY dist/linux_amd64/lbrytv /app
RUN chmod a+x /app/lbrytv
Expand Down
14 changes: 9 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ test_circleci:

.PHONY: clean
clean:
find . -name rice-box.go | xargs rm
rm -rf ./dist

.PHONY: server
Expand All @@ -43,12 +42,12 @@ retag:
git tag $(tag)

get_sqlboiler:
go get -u -t github.com/volatiletech/sqlboiler@v3.4.0
go get -u -t github.com/volatiletech/sqlboiler/drivers/sqlboiler-psql@v3.4.0
go install github.com/volatiletech/sqlboiler
go install github.com/volatiletech/sqlboiler/drivers/sqlboiler-psql

.PHONY: models
models: get_sqlboiler
sqlboiler --add-global-variants --wipe psql --no-context
sqlboiler --add-global-variants --wipe psql --no-context --no-tests

.PHONY: api
api:
Expand All @@ -63,8 +62,13 @@ watchman:
-ldflags "-s -w -X github.com/OdyseeTeam/odysee-api/version.version=$(watchman_version) -X github.com/OdyseeTeam/odysee-api/version.commit=$(git_hash) -X github.com/OdyseeTeam/odysee-api/apps/version.buildDate=$(date)" \
./apps/watchman/cmd/watchman/

cur_branch := $(shell git rev-parse --abbrev-ref HEAD)
.PHONY: image
image:
docker buildx build -t odyseeteam/odysee-api:$(api_version) -t odyseeteam/odysee-api:latest -t odyseeteam/odysee-api:$(cur_branch) --platform linux/amd64 .

watchman_image:
docker build -t odyseeteam/watchman:$(watchman_version) ./apps/watchman
docker buildx build -t odyseeteam/watchman:$(watchman_version) --platform linux/amd64 ./apps/watchman

watchman_design:
goa gen github.com/OdyseeTeam/odysee-api/apps/watchman/design -o apps/watchman
Expand Down
24 changes: 21 additions & 3 deletions api/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/OdyseeTeam/odysee-api/app/auth"
"github.com/OdyseeTeam/odysee-api/app/geopublish"
"github.com/OdyseeTeam/odysee-api/app/proxy"
"github.com/OdyseeTeam/odysee-api/app/publish"
"github.com/OdyseeTeam/odysee-api/app/query/cache"
Expand All @@ -20,7 +21,9 @@ import (
"github.com/OdyseeTeam/odysee-api/internal/status"
"github.com/OdyseeTeam/odysee-api/pkg/redislocker"
"github.com/OdyseeTeam/player-server/pkg/paid"
"github.com/lbryio/transcoder/pkg/logging/zapadapter"

sentryhttp "github.com/getsentry/sentry-go/http"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand Down Expand Up @@ -50,8 +53,9 @@ func InstallRoutes(r *mux.Router, sdkRouter *sdkrouter.Router) {
panic(err)
}
legacyProvider := auth.NewIAPIProvider(sdkRouter, config.GetInternalAPIHost())
sentryHandler := sentryhttp.New(sentryhttp.Options{})

r.Use(methodTimer)
r.Use(methodTimer, sentryHandler.Handle)

r.HandleFunc("/", func(w http.ResponseWriter, _ *http.Request) {
w.Write([]byte("odysee api"))
Expand Down Expand Up @@ -96,7 +100,12 @@ func InstallRoutes(r *mux.Router, sdkRouter *sdkrouter.Router) {
StoreComposer: composer,
}

tusHandler, err := publish.NewTusHandler(oauthAuther, legacyProvider, tusCfg, uploadPath)
tusHandler, err := publish.NewTusHandler(
publish.WithAuther(oauthAuther),
publish.WithLegacyProvider(legacyProvider),
publish.WithTusConfig(tusCfg),
publish.WithUploadPath(uploadPath),
)
if err != nil {
logger.Log().WithError(err).Fatal(err)
}
Expand All @@ -109,12 +118,21 @@ func InstallRoutes(r *mux.Router, sdkRouter *sdkrouter.Router) {

tusRouter := v2Router.PathPrefix("/publish").Subrouter()
tusRouter.Use(tusHandler.Middleware)
tusRouter.HandleFunc("/", tusHandler.PostFile).Methods(http.MethodPost)
tusRouter.HandleFunc("/", tusHandler.PostFile).Methods(http.MethodPost).Name("tus_publish")
tusRouter.HandleFunc("/{id}", tusHandler.HeadFile).Methods(http.MethodHead)
tusRouter.HandleFunc("/{id}", tusHandler.PatchFile).Methods(http.MethodPatch)
tusRouter.HandleFunc("/{id}", tusHandler.DelFile).Methods(http.MethodDelete)
tusRouter.HandleFunc("/{id}/notify", tusHandler.Notify).Methods(http.MethodPost)
tusRouter.PathPrefix("/").HandlerFunc(emptyHandler).Methods(http.MethodOptions)

v3Router := r.PathPrefix("/api/v3").Subrouter()
v3Router.Use(defaultMiddlewares(oauthAuther, legacyProvider, sdkRouter))
ug := auth.NewUniversalUserGetter(oauthAuther, legacyProvider, zapadapter.NewKV(nil))
gPath := config.GetGeoPublishSourceDir()
err = geopublish.InstallRoutes(v3Router.PathPrefix("/publish").Subrouter(), ug, gPath, "/api/v3/publish/")
if err != nil {
panic(err)
}
}

func defaultMiddlewares(oauthAuther auth.Authenticator, legacyProvider auth.Provider, router *sdkrouter.Router) mux.MiddlewareFunc {
Expand Down
16 changes: 0 additions & 16 deletions api/routes_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package api

import (
"bytes"
"fmt"
"io/ioutil"
"net/http"
Expand All @@ -19,21 +18,6 @@ import (
"github.com/stretchr/testify/require"
)

func TestRoutesProxy(t *testing.T) {
r := mux.NewRouter()
rt := sdkrouter.New(config.GetLbrynetServers())

req, err := http.NewRequest("POST", "/api/v1/proxy", bytes.NewBuffer([]byte(`{"method": "status"}`)))
require.NoError(t, err)
rr := httptest.NewRecorder()

InstallRoutes(r, rt)
r.ServeHTTP(rr, req)

assert.Equal(t, http.StatusOK, rr.Code)
assert.Contains(t, rr.Body.String(), `"result":`)
}

func TestRoutesPublish(t *testing.T) {
r := mux.NewRouter()
rt := sdkrouter.New(config.GetLbrynetServers())
Expand Down
1 change: 1 addition & 0 deletions app/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type CurrentUser struct {
type IAPIUserClient interface {
}

// Authenticator authenticates
type Authenticator interface {
Authenticate(token, metaRemoteIP string) (*models.User, error)
GetTokenFromRequest(r *http.Request) (string, error)
Expand Down
13 changes: 11 additions & 2 deletions app/auth/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package auth
import (
"context"
"net/http"
"strconv"
"strings"

"github.com/OdyseeTeam/odysee-api/app/sdkrouter"
"github.com/OdyseeTeam/odysee-api/app/wallet"
"github.com/OdyseeTeam/odysee-api/internal/ip"
"github.com/OdyseeTeam/odysee-api/models"
"github.com/OdyseeTeam/odysee-api/pkg/iapi"
"github.com/getsentry/sentry-go"

"github.com/gorilla/mux"
"github.com/sirupsen/logrus"
Expand All @@ -31,11 +33,15 @@ func Middleware(auther Authenticator) mux.MiddlewareFunc {
user, err = auther.Authenticate(token, ipAddr)
if err != nil {
logger.WithFields(logrus.Fields{"ip": ipAddr}).Debugf("error authenticating user: %s", err)
} else {
}
if user != nil {
iac, _ = iapi.NewClient(
iapi.WithOAuthToken(strings.TrimPrefix(token, wallet.TokenPrefix)),
iapi.WithRemoteIP(ipAddr),
)
if hub := sentry.GetHubFromContext(r.Context()); hub != nil {
hub.Scope().SetUser(sentry.User{ID: strconv.Itoa(user.ID), IPAddress: ipAddr})
}
}

cu := NewCurrentUser(user, err)
Expand All @@ -52,8 +58,8 @@ func LegacyMiddleware(provider Provider) mux.MiddlewareFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var err error
var iac *iapi.Client
user, err := FromRequest(r)
ipAddr := ip.FromRequest(r)
user, err := FromRequest(r)
if err != nil {
token := r.Header.Get(wallet.LegacyTokenHeader)
if token != "" {
Expand All @@ -69,6 +75,9 @@ func LegacyMiddleware(provider Provider) mux.MiddlewareFunc {
iapi.WithLegacyToken(token),
iapi.WithRemoteIP(ipAddr),
)
if hub := sentry.GetHubFromContext(r.Context()); hub != nil {
hub.Scope().SetUser(sentry.User{ID: strconv.Itoa(user.ID), IPAddress: ipAddr})
}
}
cu := NewCurrentUser(user, err)
cu.IP = ipAddr
Expand Down
65 changes: 65 additions & 0 deletions app/auth/usergetter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package auth

import (
"net/http"

"github.com/OdyseeTeam/odysee-api/app/wallet"
"github.com/OdyseeTeam/odysee-api/internal/errors"
"github.com/OdyseeTeam/odysee-api/internal/ip"
"github.com/OdyseeTeam/odysee-api/models"
"github.com/lbryio/transcoder/pkg/logging"
)

type universalUserGetter struct {
logger logging.KVLogger
auther Authenticator
provider Provider
}

func NewUniversalUserGetter(auther Authenticator, provider Provider, logger logging.KVLogger) *universalUserGetter {
return &universalUserGetter{
auther: auther,
provider: provider,
logger: logger,
}
}

func (g *universalUserGetter) GetFromRequest(r *http.Request) (*models.User, error) {
log := g.logger
token, err := g.auther.GetTokenFromRequest(r)
// No oauth token present in request, try legacy method
if errors.Is(err, wallet.ErrNoAuthInfo) {
// TODO: Remove this pathway after legacy tokens go away.
if token, ok := r.Header[wallet.LegacyTokenHeader]; ok {
addr := ip.ForRequest(r)
user, err := g.provider(token[0], addr)
if err != nil {
log.Info("user authentication failed", "err", err, "method", "token")
return nil, err
}
if user == nil {
err := wallet.ErrNoAuthInfo
log.Info("unauthorized user", "err", err, "method", "token")
return nil, err
}
log.Debug("user authenticated", "user", user.ID, "method", "token")
return user, nil
}
return nil, errors.Err(wallet.ErrNoAuthInfo)
} else if err != nil {
return nil, err
}

user, err := g.auther.Authenticate(token, ip.ForRequest(r))
if err != nil {
log.Info("user authentication failed", "err", err, "method", "oauth")
return nil, err
}
if user == nil {
err := wallet.ErrNoAuthInfo
log.Info("unauthorized user", "err", err, "method", "oauth")
return nil, err
}
log.Debug("user authenticated", "user", user.ID, "method", "oauth")
return user, nil
}
Loading

0 comments on commit 348f447

Please sign in to comment.