Skip to content

Commit

Permalink
remove writers.Capped
Browse files Browse the repository at this point in the history
  • Loading branch information
candiduslynx committed May 14, 2024
1 parent 40d44a8 commit 1f44030
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 65 deletions.
42 changes: 29 additions & 13 deletions writers/batchwriter/batchwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,48 +122,64 @@ func (w *BatchWriter) Close(context.Context) error {
}

func (w *BatchWriter) worker(ctx context.Context, tableName string, ch <-chan *message.WriteInsert, flush <-chan chan bool) {
bytes, rows := writers.NewCapped(w.batchSizeBytes), writers.NewCapped(w.batchSize)
var bytes, rows int64
resources := make([]*message.WriteInsert, 0, w.batchSize) // at least we have 1 row per record

ticker := writers.NewTicker(w.batchTimeout)
defer ticker.Stop()

tickerCh, ctxDone := ticker.Chan(), ctx.Done()

send := func() {
w.flushTable(ctx, tableName, resources)
clear(resources)
resources = resources[:0]
bytes.Reset()
rows.Reset()
bytes, rows = 0, 0
}
for {
select {
case r, ok := <-ch:
if !ok {
if rows.Current() > 0 {
if rows > 0 {
w.flushTable(ctx, tableName, resources)
}
return
}

dataBytes := util.TotalRecordSize(r.Record)
if rows.OverflownBy(r.Record.NumRows()) || bytes.OverflownBy(dataBytes) {
recordRows, recordBytes := r.Record.NumRows(), util.TotalRecordSize(r.Record)
if (w.batchSize > 0 && rows+recordRows > w.batchSize) ||
(w.batchSizeBytes > 0 && bytes+recordBytes > w.batchSizeBytes) {
if rows == 0 {
// New record overflows batch by itself.
// Flush right away.
// TODO: slice
resources = append(resources, r)
send()
ticker.Reset(w.batchTimeout)
continue
}
// rows > 0
send()
ticker.Reset(w.batchTimeout)
}
resources = append(resources, r)
rows.Add(r.Record.NumRows())
bytes.Add(dataBytes)
if recordRows > 0 {
// only save records with rows
resources = append(resources, r)
rows += recordRows
bytes += recordBytes
}

case <-ticker.Chan():
if rows.Current() > 0 {
case <-tickerCh:
if rows > 0 {
send()
}
case done := <-flush:
if rows.Current() > 0 {
if rows > 0 {
send()
ticker.Reset(w.batchTimeout)
}
done <- true
case <-ctx.Done():
case <-ctxDone:
// this means the request was cancelled
return // after this NO other call will succeed
}
Expand Down
16 changes: 0 additions & 16 deletions writers/cap.go

This file was deleted.

38 changes: 20 additions & 18 deletions writers/mixedbatchwriter/mixedbatchwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ func (w *MixedBatchWriter) Write(ctx context.Context, msgChan <-chan message.Wri
insert := &insertBatchManager{
batch: make([]*message.WriteInsert, 0, w.batchSize),
writeFunc: w.client.InsertBatch,
rows: writers.NewCapped(w.batchSize),
bytes: writers.NewCapped(w.batchSizeBytes),
maxRows: w.batchSize,
maxBytes: w.batchSizeBytes,
logger: w.logger,
}
deleteStale := &batchManager[message.WriteDeleteStales, *message.WriteDeleteStale]{
Expand Down Expand Up @@ -199,45 +199,47 @@ func (m *batchManager[A, T]) flush(ctx context.Context) error {

// special batch manager for insert messages that also keeps track of the total size of the batch
type insertBatchManager struct {
batch []*message.WriteInsert
writeFunc func(ctx context.Context, messages message.WriteInserts) error
bytes writers.Capped[int64]
rows writers.Capped[int64]
logger zerolog.Logger
batch []*message.WriteInsert
writeFunc func(ctx context.Context, messages message.WriteInserts) error
curRows, maxRows int64
curBytes, maxBytes int64
logger zerolog.Logger
}

func (m *insertBatchManager) append(ctx context.Context, msg *message.WriteInsert) error {
dataBytes := util.TotalRecordSize(msg.Record)
if m.rows.OverflownBy(msg.Record.NumRows()) || m.bytes.OverflownBy(dataBytes) {
recordRows, recordBytes := msg.Record.NumRows(), util.TotalRecordSize(msg.Record)
if (m.maxRows > 0 && m.curRows+recordRows > m.maxRows) ||
(m.maxBytes > 0 && m.curBytes+recordBytes > m.maxBytes) {
if err := m.flush(ctx); err != nil {
return err
}
}

m.batch = append(m.batch, msg)
m.rows.Add(msg.Record.NumRows())
m.bytes.Add(dataBytes)
if recordRows > 0 {
// only save records with rows
m.batch = append(m.batch, msg)
m.curRows += recordRows
m.curBytes += recordBytes
}

return nil
}

func (m *insertBatchManager) flush(ctx context.Context) error {
batchSize := m.rows.Current()
if batchSize == 0 {
if m.curRows == 0 {
// no rows to insert
return nil
}
start := time.Now()
err := m.writeFunc(ctx, m.batch)
if err != nil {
m.logger.Err(err).Int64("len", batchSize).Dur("duration", time.Since(start)).Msg("failed to write batch")
m.logger.Err(err).Int64("len", m.curRows).Dur("duration", time.Since(start)).Msg("failed to write batch")
return err
}
m.logger.Debug().Int64("len", batchSize).Dur("duration", time.Since(start)).Msg("batch written successfully")
m.logger.Debug().Int64("len", m.curRows).Dur("duration", time.Since(start)).Msg("batch written successfully")

clear(m.batch) // GC can work
m.batch = m.batch[:0]
m.rows.Reset()
m.bytes.Reset()
m.curRows, m.curBytes = 0, 0
return nil
}
56 changes: 38 additions & 18 deletions writers/streamingbatchwriter/streamingbatchwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,10 +350,10 @@ type streamingWorkerManager[T message.WriteMessage] struct {
func (s *streamingWorkerManager[T]) run(ctx context.Context, wg *sync.WaitGroup, tableName string) {
defer wg.Done()
var (
clientCh chan T
clientErrCh chan error
open bool
bytes, rows = writers.NewCapped(s.batchSizeBytes), writers.NewCapped(s.batchSizeRows)
clientCh chan T
clientErrCh chan error
open bool
sizeBytes, sizeRows int64
)

ensureOpened := func() {
Expand Down Expand Up @@ -382,47 +382,67 @@ func (s *streamingWorkerManager[T]) run(ctx context.Context, wg *sync.WaitGroup,
}
}
open = false
bytes.Reset()
rows.Reset()
sizeBytes, sizeRows = 0, 0
}
defer closeFlush()

ticker := s.tickerFn(s.batchTimeout)
defer ticker.Stop()

tickerCh, ctxDone := ticker.Chan(), ctx.Done()

for {
select {
case r, ok := <-s.ch:
if !ok {
return
}

var recSize int64
rowSize := int64(1) // at least 1 row for messages without records
recordRows := int64(1) // at least 1 row for messages without records
var recordBytes int64
if ins, ok := any(r).(*message.WriteInsert); ok {
recSize = util.TotalRecordSize(ins.Record)
rowSize = ins.Record.NumRows()
recordBytes = util.TotalRecordSize(ins.Record)
recordRows = ins.Record.NumRows()
}

if rows.OverflownBy(rowSize) || bytes.OverflownBy(recSize) {
if (s.batchSizeRows > 0 && sizeRows+recordRows > s.batchSizeRows) ||
(s.batchSizeBytes > 0 && sizeBytes+recordBytes > s.batchSizeBytes) {
if sizeRows == 0 {
// New record overflows batch by itself.
// Flush right away.
// TODO: slice
ensureOpened()
clientCh <- r
closeFlush()
ticker.Reset(s.batchTimeout)
continue
}
// sizeRows > 0
closeFlush()
ticker.Reset(s.batchTimeout)
}

ensureOpened()
clientCh <- r
rows.Add(rowSize)
bytes.Add(recSize)
if recordRows > 0 {
// only save records with rows
ensureOpened()
clientCh <- r
sizeRows += recordRows
sizeBytes += recordBytes
}

case <-ticker.Chan():
if rows.Current() > 0 {
case <-tickerCh:
if sizeRows > 0 {
closeFlush()
}
case done := <-s.flush:
if rows.Current() > 0 {
if sizeRows > 0 {
closeFlush()
ticker.Reset(s.batchTimeout)
}
done <- true
case <-ctxDone:
// this means the request was cancelled
return // after this NO other call will succeed
}
}
}

0 comments on commit 1f44030

Please sign in to comment.