Skip to content

Commit

Permalink
Batch persist writes and emits
Browse files Browse the repository at this point in the history
  • Loading branch information
ericvolp12 authored and whyrusleeping committed Jun 4, 2023
1 parent f4ec84f commit a0fcefb
Show file tree
Hide file tree
Showing 7 changed files with 508 additions and 113 deletions.
2 changes: 1 addition & 1 deletion cmd/bigsky/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func run(args []string) {

repoman := repomgr.NewRepoManager(db, cstore, kmgr)

dbp, err := events.NewDbPersistence(db, cstore)
dbp, err := events.NewDbPersistence(db, cstore, nil)
if err != nil {
return fmt.Errorf("setting up db event persistence: %w", err)
}
Expand Down
115 changes: 103 additions & 12 deletions events/dbpersist.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,25 @@ import (
"gorm.io/gorm"
)

type PersistenceBatchItem struct {
Record *RepoEventRecord
Event *XRPCStreamEvent
}

type BatchOptions struct {
MaxBatchSize int
MinBatchSize int
MaxTimeBetweenFlush time.Duration
}

func DefaultBatchOptions() *BatchOptions {
return &BatchOptions{
MaxBatchSize: 200,
MinBatchSize: 10,
MaxTimeBetweenFlush: 500 * time.Millisecond,
}
}

type DbPersistence struct {
db *gorm.DB

Expand All @@ -26,6 +45,10 @@ type DbPersistence struct {
lk sync.Mutex

broadcast func(*XRPCStreamEvent)

batch []*PersistenceBatchItem
batchOptions BatchOptions
lastFlush time.Time
}

type RepoEventRecord struct {
Expand All @@ -42,21 +65,95 @@ type RepoEventRecord struct {
Ops []byte
}

func NewDbPersistence(db *gorm.DB, cs *carstore.CarStore) (*DbPersistence, error) {
func NewDbPersistence(db *gorm.DB, cs *carstore.CarStore, batchOptions *BatchOptions) (*DbPersistence, error) {
if err := db.AutoMigrate(&RepoEventRecord{}); err != nil {
return nil, err
}

return &DbPersistence{
db: db,
cs: cs,
}, nil
if batchOptions == nil {
batchOptions = DefaultBatchOptions()
}

p := DbPersistence{
db: db,
cs: cs,
batchOptions: *batchOptions,
batch: []*PersistenceBatchItem{},
}

go func() {
for {
time.Sleep(100 * time.Millisecond)
p.lk.Lock()
if len(p.batch) > 0 &&
(len(p.batch) >= p.batchOptions.MinBatchSize ||
time.Since(p.lastFlush) >= p.batchOptions.MaxTimeBetweenFlush) {
p.lk.Unlock()
if err := p.FlushBatch(context.Background()); err != nil {
log.Errorf("failed to flush batch: %s", err)
}
} else {
p.lk.Unlock()
}
}
}()

return &p, nil
}

func (p *DbPersistence) SetEventBroadcaster(brc func(*XRPCStreamEvent)) {
p.broadcast = brc
}

func (p *DbPersistence) FlushBatch(ctx context.Context) error {
p.lk.Lock()
defer p.lk.Unlock()

records := make([]*RepoEventRecord, len(p.batch))
for i, item := range p.batch {
records[i] = item.Record
}

if err := p.db.CreateInBatches(records, 50).Error; err != nil {
return fmt.Errorf("failed to create records: %w", err)
}

for i, item := range records {
e := p.batch[i].Event
e.RepoCommit.Seq = int64(item.Seq)
p.broadcast(e)
}

p.batch = []*PersistenceBatchItem{}
p.lastFlush = time.Now()

return nil
}

func (p *DbPersistence) AddItemToBatch(ctx context.Context, rec *RepoEventRecord, evt *XRPCStreamEvent) error {
p.lk.Lock()
if p.batch == nil {
p.batch = []*PersistenceBatchItem{}
}

if len(p.batch) >= p.batchOptions.MaxBatchSize {
p.lk.Unlock()
if err := p.FlushBatch(ctx); err != nil {
return fmt.Errorf("failed to flush batch at max size: %w", err)
}
p.lk.Lock()
}

p.batch = append(p.batch, &PersistenceBatchItem{
Record: rec,
Event: evt,
})

p.lk.Unlock()

return nil
}

func (p *DbPersistence) Persist(ctx context.Context, e *XRPCStreamEvent) error {
if e.RepoCommit == nil {
return nil
Expand Down Expand Up @@ -110,16 +207,10 @@ func (p *DbPersistence) Persist(ctx context.Context, e *XRPCStreamEvent) error {
}
rer.Ops = opsb

p.lk.Lock()
defer p.lk.Unlock()
if err := p.db.Create(&rer).Error; err != nil {
if err := p.AddItemToBatch(ctx, &rer, e); err != nil {
return err
}

e.RepoCommit.Seq = int64(rer.Seq)

p.broadcast(e)

return nil
}

Expand Down

0 comments on commit a0fcefb

Please sign in to comment.