Skip to content

Commit

Permalink
fix: use mutex only if necessary (influxdata#352)
Browse files Browse the repository at this point in the history
  • Loading branch information
vlastahajek committed Sep 13, 2022
1 parent 21f5a82 commit 9d45dd9
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 30 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
## [unreleased]
### Bug fixes
- [#354](https://github.com/influxdata/influxdb-client-go/pull/354) More efficient synchronization in WriteAPIBlocking.


## 2.10.0 [2022-08-25]
### Features
Expand Down
23 changes: 12 additions & 11 deletions api/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,18 @@ type WriteAPIImpl struct {
service *iwrite.Service
writeBuffer []string

errCh chan error
writeCh chan *iwrite.Batch
bufferCh chan string
writeStop chan struct{}
bufferStop chan struct{}
bufferFlush chan struct{}
doneCh chan struct{}
bufferInfoCh chan writeBuffInfoReq
writeInfoCh chan writeBuffInfoReq
writeOptions *write.Options
closingMu *sync.Mutex
errCh chan error
writeCh chan *iwrite.Batch
bufferCh chan string
writeStop chan struct{}
bufferStop chan struct{}
bufferFlush chan struct{}
doneCh chan struct{}
bufferInfoCh chan writeBuffInfoReq
writeInfoCh chan writeBuffInfoReq
writeOptions *write.Options
closingMu *sync.Mutex
// more appropriate Bool type from sync/atomic cannot be used because it is available since go 1.19
isErrChReader int32
}

Expand Down
48 changes: 29 additions & 19 deletions api/writeAPIBlocking.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"strings"
"sync"
"sync/atomic"

http2 "github.com/influxdata/influxdb-client-go/v2/api/http"
"github.com/influxdata/influxdb-client-go/v2/api/write"
Expand Down Expand Up @@ -51,9 +52,10 @@ type WriteAPIBlocking interface {
type writeAPIBlocking struct {
service *iwrite.Service
writeOptions *write.Options
batching bool
batch []string
mu sync.Mutex
// more appropriate Bool type from sync/atomic cannot be used because it is available since go 1.19
batching int32
batch []string
mu sync.Mutex
}

// NewWriteAPIBlocking creates new instance of blocking write client for writing data to bucket belonging to org
Expand All @@ -69,28 +71,26 @@ func NewWriteAPIBlockingWithBatching(org string, bucket string, service http2.Se
}

func (w *writeAPIBlocking) EnableBatching() {
w.mu.Lock()
defer w.mu.Unlock()
if !w.batching {
w.batching = true
if atomic.LoadInt32(&w.batching) == 0 {
w.mu.Lock()
w.batching = 1
w.batch = make([]string, 0, w.writeOptions.BatchSize())
w.mu.Unlock()
}
}

func (w *writeAPIBlocking) write(ctx context.Context, line string) error {
w.mu.Lock()
defer w.mu.Unlock()
body := line
if w.batching {
if atomic.LoadInt32(&w.batching) > 0 {
w.mu.Lock()
defer w.mu.Unlock()
w.batch = append(w.batch, line)
if len(w.batch) == int(w.writeOptions.BatchSize()) {
body = strings.Join(w.batch, "\n")
w.batch = w.batch[:0]
return w.flush(ctx)
} else {
return nil
}
}
err := w.service.WriteBatch(ctx, iwrite.NewBatch(body, w.writeOptions.MaxRetryTime()))
err := w.service.WriteBatch(ctx, iwrite.NewBatch(line, w.writeOptions.MaxRetryTime()))
if err != nil {
return err
}
Expand All @@ -112,13 +112,23 @@ func (w *writeAPIBlocking) WritePoint(ctx context.Context, point ...*write.Point
return w.write(ctx, line)
}

func (w *writeAPIBlocking) Flush(ctx context.Context) error {
w.mu.Lock()
defer w.mu.Unlock()
if w.batching && len(w.batch) > 0 {
// flush is unsychronized helper for creating and sending batch
// Must be called from synchronized block
func (w *writeAPIBlocking) flush(ctx context.Context) error {
if len(w.batch) > 0 {
body := strings.Join(w.batch, "\n")
w.batch = w.batch[:0]
return w.service.WriteBatch(ctx, iwrite.NewBatch(body, w.writeOptions.MaxRetryTime()))
b := iwrite.NewBatch(body, w.writeOptions.MaxRetryTime())
return w.service.WriteBatch(ctx, b)
}
return nil
}

func (w *writeAPIBlocking) Flush(ctx context.Context) error {
if atomic.LoadInt32(&w.batching) > 0 {
w.mu.Lock()
defer w.mu.Unlock()
return w.flush(ctx)
}
return nil
}
2 changes: 2 additions & 0 deletions internal/test/http_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ func (t *HTTPService) DoHTTPRequestWithResponse(_ *http.Request, _ http2.Request
// DoPostRequest reads http request, validates URL and stores data in the request
func (t *HTTPService) DoPostRequest(_ context.Context, url string, body io.Reader, requestCallback http2.RequestCallback, _ http2.ResponseCallback) *http2.Error {
req, err := http.NewRequest("POST", url, nil)
t.lock.Lock()
t.requests++
t.lock.Unlock()
if err != nil {
return http2.NewError(err)
}
Expand Down

0 comments on commit 9d45dd9

Please sign in to comment.