From 9d45dd9fa4883e23b5cd0933ec5daab9c64a01e7 Mon Sep 17 00:00:00 2001 From: vlastahajek Date: Fri, 9 Sep 2022 12:54:39 +0200 Subject: [PATCH] fix: use mutex only if necessary (#352) --- CHANGELOG.md | 3 +++ api/write.go | 23 +++++++++-------- api/writeAPIBlocking.go | 48 +++++++++++++++++++++-------------- internal/test/http_service.go | 2 ++ 4 files changed, 46 insertions(+), 30 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0ee03ab8..3394a48e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,7 @@ ## [unreleased] +### Bug fixes +- [#354](https://github.com/influxdata/influxdb-client-go/pull/354) More efficient synchronization in WriteAPIBlocking. + ## 2.10.0 [2022-08-25] ### Features diff --git a/api/write.go b/api/write.go index b6d60ceb..313d21a1 100644 --- a/api/write.go +++ b/api/write.go @@ -51,17 +51,18 @@ type WriteAPIImpl struct { service *iwrite.Service writeBuffer []string - errCh chan error - writeCh chan *iwrite.Batch - bufferCh chan string - writeStop chan struct{} - bufferStop chan struct{} - bufferFlush chan struct{} - doneCh chan struct{} - bufferInfoCh chan writeBuffInfoReq - writeInfoCh chan writeBuffInfoReq - writeOptions *write.Options - closingMu *sync.Mutex + errCh chan error + writeCh chan *iwrite.Batch + bufferCh chan string + writeStop chan struct{} + bufferStop chan struct{} + bufferFlush chan struct{} + doneCh chan struct{} + bufferInfoCh chan writeBuffInfoReq + writeInfoCh chan writeBuffInfoReq + writeOptions *write.Options + closingMu *sync.Mutex + // more appropriate Bool type from sync/atomic cannot be used because it is available since go 1.19 isErrChReader int32 } diff --git a/api/writeAPIBlocking.go b/api/writeAPIBlocking.go index d348aa8f..116ebc43 100644 --- a/api/writeAPIBlocking.go +++ b/api/writeAPIBlocking.go @@ -8,6 +8,7 @@ import ( "context" "strings" "sync" + "sync/atomic" http2 "github.com/influxdata/influxdb-client-go/v2/api/http" "github.com/influxdata/influxdb-client-go/v2/api/write" @@ -51,9 +52,10 @@ type WriteAPIBlocking interface { type writeAPIBlocking struct { service *iwrite.Service writeOptions *write.Options - batching bool - batch []string - mu sync.Mutex + // more appropriate Bool type from sync/atomic cannot be used because it is available since go 1.19 + batching int32 + batch []string + mu sync.Mutex } // NewWriteAPIBlocking creates new instance of blocking write client for writing data to bucket belonging to org @@ -69,28 +71,26 @@ func NewWriteAPIBlockingWithBatching(org string, bucket string, service http2.Se } func (w *writeAPIBlocking) EnableBatching() { - w.mu.Lock() - defer w.mu.Unlock() - if !w.batching { - w.batching = true + if atomic.LoadInt32(&w.batching) == 0 { + w.mu.Lock() + w.batching = 1 w.batch = make([]string, 0, w.writeOptions.BatchSize()) + w.mu.Unlock() } } func (w *writeAPIBlocking) write(ctx context.Context, line string) error { - w.mu.Lock() - defer w.mu.Unlock() - body := line - if w.batching { + if atomic.LoadInt32(&w.batching) > 0 { + w.mu.Lock() + defer w.mu.Unlock() w.batch = append(w.batch, line) if len(w.batch) == int(w.writeOptions.BatchSize()) { - body = strings.Join(w.batch, "\n") - w.batch = w.batch[:0] + return w.flush(ctx) } else { return nil } } - err := w.service.WriteBatch(ctx, iwrite.NewBatch(body, w.writeOptions.MaxRetryTime())) + err := w.service.WriteBatch(ctx, iwrite.NewBatch(line, w.writeOptions.MaxRetryTime())) if err != nil { return err } @@ -112,13 +112,23 @@ func (w *writeAPIBlocking) WritePoint(ctx context.Context, point ...*write.Point return w.write(ctx, line) } -func (w *writeAPIBlocking) Flush(ctx context.Context) error { - w.mu.Lock() - defer w.mu.Unlock() - if w.batching && len(w.batch) > 0 { +// flush is unsychronized helper for creating and sending batch +// Must be called from synchronized block +func (w *writeAPIBlocking) flush(ctx context.Context) error { + if len(w.batch) > 0 { body := strings.Join(w.batch, "\n") w.batch = w.batch[:0] - return w.service.WriteBatch(ctx, iwrite.NewBatch(body, w.writeOptions.MaxRetryTime())) + b := iwrite.NewBatch(body, w.writeOptions.MaxRetryTime()) + return w.service.WriteBatch(ctx, b) + } + return nil +} + +func (w *writeAPIBlocking) Flush(ctx context.Context) error { + if atomic.LoadInt32(&w.batching) > 0 { + w.mu.Lock() + defer w.mu.Unlock() + return w.flush(ctx) } return nil } diff --git a/internal/test/http_service.go b/internal/test/http_service.go index ff72e3a0..6f35f686 100644 --- a/internal/test/http_service.go +++ b/internal/test/http_service.go @@ -123,7 +123,9 @@ func (t *HTTPService) DoHTTPRequestWithResponse(_ *http.Request, _ http2.Request // DoPostRequest reads http request, validates URL and stores data in the request func (t *HTTPService) DoPostRequest(_ context.Context, url string, body io.Reader, requestCallback http2.RequestCallback, _ http2.ResponseCallback) *http2.Error { req, err := http.NewRequest("POST", url, nil) + t.lock.Lock() t.requests++ + t.lock.Unlock() if err != nil { return http2.NewError(err) }