-
Notifications
You must be signed in to change notification settings - Fork 87
/
persist.go
97 lines (79 loc) · 2.22 KB
/
persist.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package events
import (
"context"
"fmt"
"sync"
"github.com/bluesky-social/indigo/models"
)
// Note that this interface looks generic, but some persisters might only work with RepoAppend or LabelLabels
type EventPersistence interface {
Persist(ctx context.Context, e *XRPCStreamEvent) error
Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error
TakeDownRepo(ctx context.Context, usr models.Uid) error
Flush(context.Context) error
Shutdown(context.Context) error
SetEventBroadcaster(func(*XRPCStreamEvent))
}
// MemPersister is the most naive implementation of event persistence
// This EventPersistence option works fine with all event types
// ill do better later
type MemPersister struct {
buf []*XRPCStreamEvent
lk sync.Mutex
seq int64
broadcast func(*XRPCStreamEvent)
}
func NewMemPersister() *MemPersister {
return &MemPersister{}
}
func (mp *MemPersister) Persist(ctx context.Context, e *XRPCStreamEvent) error {
mp.lk.Lock()
defer mp.lk.Unlock()
mp.seq++
switch {
case e.RepoCommit != nil:
e.RepoCommit.Seq = mp.seq
case e.RepoHandle != nil:
e.RepoHandle.Seq = mp.seq
case e.RepoIdentity != nil:
e.RepoIdentity.Seq = mp.seq
case e.RepoMigrate != nil:
e.RepoMigrate.Seq = mp.seq
case e.RepoTombstone != nil:
e.RepoTombstone.Seq = mp.seq
case e.LabelLabels != nil:
e.LabelLabels.Seq = mp.seq
default:
panic("no event in persist call")
}
mp.buf = append(mp.buf, e)
mp.broadcast(e)
return nil
}
func (mp *MemPersister) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error {
mp.lk.Lock()
l := len(mp.buf)
mp.lk.Unlock()
if since >= int64(l) {
return nil
}
// TODO: abusing the fact that buf[0].seq is currently always 1
for _, e := range mp.buf[since:l] {
if err := cb(e); err != nil {
return err
}
}
return nil
}
func (mp *MemPersister) TakeDownRepo(ctx context.Context, uid models.Uid) error {
return fmt.Errorf("repo takedowns not currently supported by memory persister, test usage only")
}
func (mp *MemPersister) Flush(ctx context.Context) error {
return nil
}
func (mp *MemPersister) SetEventBroadcaster(brc func(*XRPCStreamEvent)) {
mp.broadcast = brc
}
func (mp *MemPersister) Shutdown(context.Context) error {
return nil
}