Skip to content

Commit

Permalink
ease gc
Browse files Browse the repository at this point in the history
  • Loading branch information
candiduslynx committed May 13, 2024
1 parent 5da609b commit 5fbbecc
Showing 1 changed file with 16 additions and 21 deletions.
37 changes: 16 additions & 21 deletions scheduler/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,28 +31,21 @@ type batcher struct {
}

type worker struct {
ch chan *schema.Resource
flush chan chan struct{}
rows schema.Resources
builder *array.RecordBuilder // we can reuse that
res chan<- message.SyncMessage
ch chan *schema.Resource
flush chan chan struct{}
curRows, maxRows int // todo: consider using capped int64 from https://github.com/cloudquery/plugin-sdk/pull/1647
builder *array.RecordBuilder // we can reuse that
res chan<- message.SyncMessage
}

// send must be called on len(rows) > 0
func (w *worker) send() {
for _, row := range w.rows {
scalar.AppendToRecordBuilder(w.builder, row.GetValues())
}

w.res <- &message.SyncInsert{Record: w.builder.NewRecord()}
// we need to reserve here as NewRecord (& underlying NewArray calls) reset the memory
w.builder.Reserve(cap(w.rows))

clear(w.rows) // ease GC
w.rows = w.rows[:0]
w.builder.Reserve(w.maxRows)
}

func (w *worker) work(done <-chan struct{}, size int, timeout time.Duration) {
func (w *worker) work(done <-chan struct{}, timeout time.Duration) {
ticker := writers.NewTicker(timeout)
defer ticker.Stop()
tickerCh := ticker.Chan()
Expand All @@ -61,25 +54,27 @@ func (w *worker) work(done <-chan struct{}, size int, timeout time.Duration) {
select {
case r, ok := <-w.ch:
if !ok {
if len(w.rows) > 0 {
if w.curRows > 0 {
w.send()
}
return
}

w.rows = append(w.rows, r)
if len(w.rows) == size {
// append to builder right away
scalar.AppendToRecordBuilder(w.builder, r.GetValues())
w.curRows++
if w.curRows == w.maxRows {
w.send()
ticker.Reset(timeout)
}

case <-tickerCh:
if len(w.rows) > 0 {
if w.curRows > 0 {
w.send()
}

case ch := <-w.flush:
if len(w.rows) > 0 {
if w.curRows > 0 {
w.send()
ticker.Reset(timeout)
}
Expand Down Expand Up @@ -120,13 +115,13 @@ func (b *batcher) process(res *schema.Resource) {

// fill in the worker fields
wr.flush = make(chan chan struct{})
wr.rows = make(schema.Resources, 0, b.size)
wr.maxRows = b.size
wr.builder = array.NewRecordBuilder(memory.DefaultAllocator, table.ToArrowSchema())
wr.res = b.res
wr.builder.Reserve(b.size)

// start processing
wr.work(b.ctxDone, b.size, b.timeout)
wr.work(b.ctxDone, b.timeout)
}()

wr.ch <- res
Expand Down

0 comments on commit 5fbbecc

Please sign in to comment.