Skip to content

Commit

Permalink
Support dynamic configing concurrency factor
Browse files Browse the repository at this point in the history
Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
  • Loading branch information
MyonKeminta committed Jul 2, 2024
1 parent c42aa58 commit a41e40c
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 8 deletions.
25 changes: 20 additions & 5 deletions client/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ import (
)

const (
defaultPDTimeout = 3 * time.Second
maxInitClusterRetries = 100
defaultMaxTSOBatchWaitInterval time.Duration = 0
defaultEnableTSOFollowerProxy = false
defaultEnableFollowerHandle = false
defaultPDTimeout = 3 * time.Second
maxInitClusterRetries = 100
defaultMaxTSOBatchWaitInterval time.Duration = 0
defaultEnableTSOFollowerProxy = false
defaultEnableFollowerHandle = false
defaultTSOClientConcurrencyFactor = 4
)

// DynamicOption is used to distinguish the dynamic option type.
Expand All @@ -44,6 +45,8 @@ const (
// EnableFollowerHandle is the follower handle option.
EnableFollowerHandle

TSOClientConcurrencyFactor

dynamicOptionCount
)

Expand Down Expand Up @@ -76,6 +79,7 @@ func newOption() *option {
co.dynamicOptions[MaxTSOBatchWaitInterval].Store(defaultMaxTSOBatchWaitInterval)
co.dynamicOptions[EnableTSOFollowerProxy].Store(defaultEnableTSOFollowerProxy)
co.dynamicOptions[EnableFollowerHandle].Store(defaultEnableFollowerHandle)
co.dynamicOptions[TSOClientConcurrencyFactor].Store(defaultTSOClientConcurrencyFactor)
return co
}

Expand Down Expand Up @@ -126,3 +130,14 @@ func (o *option) setEnableTSOFollowerProxy(enable bool) {
func (o *option) getEnableTSOFollowerProxy() bool {
return o.dynamicOptions[EnableTSOFollowerProxy].Load().(bool)
}

func (o *option) setTSOClientConcurrencyFactor(value int) {
old := o.getTSOClientConcurrencyFactor()
if value != old {
o.dynamicOptions[TSOClientConcurrencyFactor].Store(value)
}
}

func (o *option) getTSOClientConcurrencyFactor() int {
return o.dynamicOptions[TSOClientConcurrencyFactor].Load().(int)
}
15 changes: 12 additions & 3 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ type tsoDispatcher struct {
lastTSOInfo *tsoInfo

nextBatchedReqID uint64
windowSize int
batchBufferPool sync.Pool
pendingBatches sync.Map

Expand Down Expand Up @@ -119,7 +118,6 @@ func newTSODispatcher(
reqChan: make(chan *tsoRequest, maxBatchSize*2),
tsDeadlineCh: make(chan *deadline, 1),

windowSize: 4,
batchBufferPool: sync.Pool{New: func() any {
return newTSOBatchController(maxBatchSize)
}},
Expand Down Expand Up @@ -216,6 +214,12 @@ func (td *tsoDispatcher) handleDispatcher(wg *sync.WaitGroup) {
streamURL string
stream *tsoStream
)

concurrencyFactor := option.getTSOClientConcurrencyFactor()
// Avoid loading from the dynamic options map too frequently.
lastUpdateConcurrencyFactorTime := time.Now()
const updateConcurrencyFactorInterval = time.Second * 5

// Loop through each batch of TSO requests and send them for processing.
streamLoopTimer := time.NewTimer(option.timeout)
defer streamLoopTimer.Stop()
Expand Down Expand Up @@ -253,6 +257,11 @@ tsoBatchLoop:
//}

currentBatchStartTime := time.Now()
if currentBatchStartTime.Sub(lastUpdateConcurrencyFactorTime) > updateConcurrencyFactorInterval {
lastUpdateConcurrencyFactorTime = currentBatchStartTime
concurrencyFactor = option.getTSOClientConcurrencyFactor()
}

batchController = td.batchBufferPool.Get().(*tsoBatchController)
batchController.collectedRequestCount = 0
// Receive the first request
Expand Down Expand Up @@ -328,7 +337,7 @@ tsoBatchLoop:
// Collected remaining requests for a batch
latency := stream.EstimatedRoundTripLatency()
estimateTSOLatencyGauge.WithLabelValues(td.dispatcherID, streamURL).Set(latency.Seconds())
totalBatchTime := latency / time.Duration(td.windowSize)
totalBatchTime := latency / time.Duration(concurrencyFactor)
remainingBatchTime := totalBatchTime - time.Since(currentBatchStartTime)
if remainingBatchTime > 0 {
if !batchingTimer.Stop() {
Expand Down

0 comments on commit a41e40c

Please sign in to comment.