Skip to content

Commit

Permalink
WIP: allow event persister to broadcast events as it sees fit to allo…
Browse files Browse the repository at this point in the history
…w future optimizations
  • Loading branch information
whyrusleeping committed Jun 4, 2023
1 parent 8912d95 commit f4ec84f
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 7 deletions.
13 changes: 13 additions & 0 deletions events/dbpersist.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

comatproto "github.com/bluesky-social/indigo/api/atproto"
Expand All @@ -21,6 +22,10 @@ type DbPersistence struct {
db *gorm.DB

cs *carstore.CarStore

lk sync.Mutex

broadcast func(*XRPCStreamEvent)
}

type RepoEventRecord struct {
Expand Down Expand Up @@ -48,6 +53,10 @@ func NewDbPersistence(db *gorm.DB, cs *carstore.CarStore) (*DbPersistence, error
}, nil
}

func (p *DbPersistence) SetEventBroadcaster(brc func(*XRPCStreamEvent)) {
p.broadcast = brc
}

func (p *DbPersistence) Persist(ctx context.Context, e *XRPCStreamEvent) error {
if e.RepoCommit == nil {
return nil
Expand Down Expand Up @@ -101,12 +110,16 @@ func (p *DbPersistence) Persist(ctx context.Context, e *XRPCStreamEvent) error {
}
rer.Ops = opsb

p.lk.Lock()
defer p.lk.Unlock()
if err := p.db.Create(&rer).Error; err != nil {
return err
}

e.RepoCommit.Seq = int64(rer.Seq)

p.broadcast(e)

return nil
}

Expand Down
14 changes: 7 additions & 7 deletions events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,14 @@ type EventManager struct {
}

func NewEventManager(persister EventPersistence) *EventManager {
return &EventManager{
em := &EventManager{
bufferSize: 1024,
persister: persister,
}

persister.SetEventBroadcaster(em.broadcastEvent)

return em
}

const (
Expand All @@ -45,7 +49,8 @@ type Operation struct {
}

func (em *EventManager) broadcastEvent(evt *XRPCStreamEvent) {
// NOTE: Assumes subsLk is held
em.subsLk.Lock()
defer em.subsLk.Unlock()

// TODO: for a larger fanout we should probably have dedicated goroutines
// for subsets of the subscriber set, and tiered channels to distribute
Expand All @@ -68,14 +73,9 @@ func (em *EventManager) broadcastEvent(evt *XRPCStreamEvent) {
}

func (em *EventManager) persistAndSendEvent(ctx context.Context, evt *XRPCStreamEvent) {
em.subsLk.Lock()
defer em.subsLk.Unlock()

if err := em.persister.Persist(context.TODO(), evt); err != nil {
log.Errorf("failed to persist outbound event: %s", err)
}

em.broadcastEvent(evt)
}

type Subscriber struct {
Expand Down
10 changes: 10 additions & 0 deletions events/persist.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ type EventPersistence interface {
Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error
TakeDownRepo(ctx context.Context, usr util.Uid) error
RebaseRepoEvents(ctx context.Context, usr util.Uid) error

SetEventBroadcaster(func(*XRPCStreamEvent))
}

// MemPersister is the most naive implementation of event persistence
Expand All @@ -23,6 +25,8 @@ type MemPersister struct {
buf []*XRPCStreamEvent
lk sync.Mutex
seq int64

broadcast func(*XRPCStreamEvent)
}

func NewMemPersister() *MemPersister {
Expand All @@ -49,6 +53,8 @@ func (mp *MemPersister) Persist(ctx context.Context, e *XRPCStreamEvent) error {
}
mp.buf = append(mp.buf, e)

mp.broadcast(e)

return nil
}

Expand Down Expand Up @@ -78,3 +84,7 @@ func (mp *MemPersister) TakeDownRepo(ctx context.Context, uid util.Uid) error {
func (mp *MemPersister) RebaseRepoEvents(ctx context.Context, usr util.Uid) error {
return fmt.Errorf("repo rebases not currently supported by memory persister, test usage only")
}

func (mp *MemPersister) SetEventBroadcaster(brc func(*XRPCStreamEvent)) {
mp.broadcast = brc
}

0 comments on commit f4ec84f

Please sign in to comment.