/
serve_crud.go
74 lines (61 loc) · 1.77 KB
/
serve_crud.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package server
import (
"bytes"
"context"
"fmt"
"net/http"
"os"
"strconv"
"time"
"github.com/AudiusProject/audius-protocol/mediorum/crudr"
"github.com/labstack/echo/v4"
)
const PullLimit = 10000
func (ss *MediorumServer) serveCrudSweep(c echo.Context) error {
ss.crudSweepMutex.Lock()
defer ss.crudSweepMutex.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
after := c.QueryParam("after")
var ops []*crudr.Op
err := ss.crud.DB.
WithContext(ctx).
Where("ulid > ?", after).
Limit(PullLimit).
Order("ulid asc").
Find(&ops).
Error
if err != nil {
return c.String(500, fmt.Sprintf("Failed to query ops: %v", err))
}
// some peers can't talk to each other, so we do some gossip
// before we'd send all ops to all peers gossip style
// but this is a bit excessive what with the bandwidth
// so we only forward ops for which we are an orig upload mirror
// thus using rendezvous for gossip forwarding
filteredOps := make([]*crudr.Op, 0, len(ops)/2)
myHost := []byte(ss.Config.Self.Host)
for _, op := range ops {
// if our host doesn't appear in the record, we are not a mirror
if op.Table == "uploads" && !bytes.Contains(op.Data, myHost) {
continue
}
filteredOps = append(filteredOps, op)
}
c.Response().Header().Set(echo.HeaderCacheControl, "public, max-age=300")
return c.JSON(200, filteredOps)
}
func (ss *MediorumServer) serveCrudPush(c echo.Context) error {
op := new(crudr.Op)
if err := c.Bind(op); err != nil {
return c.String(http.StatusBadRequest, err.Error())
}
if v, _ := strconv.ParseBool(os.Getenv("LOG_CRUD_PUSH")); v {
ss.logger.Info("CRUD_PUSH", "op", op)
}
known := ss.crud.KnownType(op)
if !known {
return c.String(406, "unknown crudr type")
}
return ss.crud.ApplyOp(op)
}