Skip to content

Commit

Permalink
Merge branch 'm/tso-client-optimization' into m/tso-opt-with-hard-limit
Browse files Browse the repository at this point in the history
  • Loading branch information
MyonKeminta committed Jul 18, 2024
2 parents c109bd3 + d3aabb3 commit 77da938
Show file tree
Hide file tree
Showing 4 changed files with 216 additions and 56 deletions.
170 changes: 170 additions & 0 deletions client/histogram.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package pd

import (
"fmt"
"strings"
"sync/atomic"
"time"

"github.com/pingcap/log"
"go.uber.org/zap"
)

type Histogram struct {
buckets []int
sum float64
sumSquare float64
count int
interval float64
cutoff float64
}

func NewHistogram(interval float64, bucketsCount int, cutoff float64) *Histogram {
return &Histogram{
buckets: make([]int, bucketsCount),
interval: interval,
count: 0,
cutoff: cutoff,
}
}

func (h *Histogram) Observe(value float64) {
if value >= h.cutoff {
return
}

index := int(value / h.interval)
for index >= len(h.buckets) {
h.buckets = append(h.buckets, 0)
}

h.buckets[index]++
h.count++
h.sum += value
h.sumSquare += value * value
}

func (h *Histogram) GetPercentile(p float64) float64 {
if h.count == 0 {
return 0
}
limit := float64(h.count) * p
result := 0.
for i := 0; i < len(h.buckets); i += 1 {
samplesInBucket := float64(h.buckets[i])
if samplesInBucket >= limit {
result += limit / samplesInBucket * h.interval
break
}
result += h.interval
limit -= samplesInBucket
}
return result
}

func (h *Histogram) GetAvg() float64 {
return h.sum / float64(h.count)
}

func (h *Histogram) String() string {
sb := &strings.Builder{}
_, err := fmt.Fprintf(sb, "{ count: %v, sum: %v, sum_square: %v, interval: %v, buckets.len: %v, buckets: [", h.count, h.sum, h.sumSquare, h.interval, len(h.buckets))
if err != nil {
panic("unreachable")
}

if len(h.buckets) > 0 {
put := func(value, count int) {
if count == 1 {
_, err = fmt.Fprintf(sb, "%v;", value)
} else {
_, err = fmt.Fprintf(sb, "%v,%v;", value, count)
}
if err != nil {
panic("unreachable")
}
}

lastValue := h.buckets[0]
lastValueCount := 1

for i := 1; i < len(h.buckets); i++ {
if h.buckets[i] == lastValue {
lastValueCount++
continue
}

put(lastValue, lastValueCount)
lastValue = h.buckets[i]
lastValueCount = 1
}

put(lastValue, lastValueCount)
}

_, err = sb.WriteString("] }")
if err != nil {
panic("unreachable")
}

return sb.String()
}

func (h *Histogram) Clear() {
h.sum = 0
h.sumSquare = 0
h.count = 0
for i := 0; i < len(h.buckets); i++ {
h.buckets[i] = 0
}
}

type AutoDumpHistogram struct {
name string
mainHistogram *Histogram
backHistogram *Histogram
accumulated *Histogram
isDumping atomic.Bool
lastDumpHistogramTime time.Time
dumpInterval time.Duration
}

func NewAutoDumpingHistogram(name string, interval float64, bucketsCount int, cutoff float64, dumpInterval time.Duration) *AutoDumpHistogram {
return &AutoDumpHistogram{
name: name,
mainHistogram: NewHistogram(interval, bucketsCount, cutoff),
backHistogram: NewHistogram(interval, bucketsCount, cutoff),
accumulated: NewHistogram(interval, bucketsCount, cutoff),
lastDumpHistogramTime: time.Now(),
dumpInterval: dumpInterval,
}
}

func (h *AutoDumpHistogram) Observe(value float64, now time.Time) {
// Not thread-safe.
h.mainHistogram.Observe(value)
if now.Sub(h.lastDumpHistogramTime) >= h.dumpInterval && !h.isDumping.Load() {
h.isDumping.Store(true)
h.mainHistogram, h.backHistogram = h.backHistogram, h.mainHistogram
h.lastDumpHistogramTime = now
go h.dump(now)
}
}

func (h *AutoDumpHistogram) dump(now time.Time) {
defer h.isDumping.Store(false)

h.accumulated.sum += h.backHistogram.sum
h.accumulated.sumSquare += h.backHistogram.sumSquare
h.accumulated.count += h.backHistogram.count
for i := 0; i < len(h.accumulated.buckets) && i < len(h.backHistogram.buckets); i++ {
h.accumulated.buckets[i] += h.backHistogram.buckets[i]
}
if len(h.backHistogram.buckets) > len(h.accumulated.buckets) {
h.accumulated.buckets = append(h.accumulated.buckets, h.backHistogram.buckets[len(h.accumulated.buckets):]...)
}

log.Info("dumping histogram", zap.String("name", h.name), zap.Time("time", now), zap.Stringer("histogram", h.accumulated))

h.backHistogram.Clear()
}
11 changes: 7 additions & 4 deletions client/tso_batch_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package pd

import (
"math"
"runtime/trace"
"time"

Expand Down Expand Up @@ -113,9 +114,10 @@ func newTSOBatchController(maxBatchSize int) *tsoBatchController {
// return nil
//}

func (tbc *tsoBatchController) pushRequest(tsoReq *tsoRequest) {
func (tbc *tsoBatchController) pushRequest(tsoReq *tsoRequest, beforeReceiveDurationHist *AutoDumpHistogram, now time.Time) {
tbc.collectedRequests[tbc.collectedRequestCount] = tsoReq
tbc.collectedRequestCount++
beforeReceiveDurationHist.Observe(math.Max(now.Sub(tsoReq.start).Seconds(), 0), now)
}

func (tbc *tsoBatchController) getCollectedRequests() []*tsoRequest {
Expand All @@ -142,16 +144,17 @@ func (tbc *tsoBatchController) finishCollectedRequests(physical, firstLogical in

}

func finishCollectedRequests(requests []*tsoRequest, physical, firstLogical int64, suffixBits uint32, err error, statFunc func(latency time.Duration)) {
func finishCollectedRequests(requests []*tsoRequest, physical, firstLogical int64, suffixBits uint32, err error, statFunc func(latency time.Duration, now time.Time)) {
now := time.Now()
for i := 0; i < len(requests); i++ {
tsoReq := requests[i]
// Retrieve the request context before the request is done to trace without race.
requestCtx := tsoReq.requestCtx
tsoReq.physical, tsoReq.logical = physical, tsoutil.AddLogical(firstLogical, int64(i), suffixBits)
tsoReq.tryDone(err)
if statFunc != nil {
statFunc(time.Since(tsoReq.start))
statFunc(now.Sub(tsoReq.start), now)
}
tsoReq.tryDone(err)
trace.StartRegion(requestCtx, "pdclient.tsoReqDequeue").End()
}
}
Expand Down
29 changes: 22 additions & 7 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ type tsoDispatcher struct {
tsoReqTokenCh chan struct{}

dispatcherID string

beforeHandleDurationHist *AutoDumpHistogram
batchWaitTimerDuration *AutoDumpHistogram
batchNoWaitCollectDuration *AutoDumpHistogram
}

func newTSODispatcher(
Expand All @@ -109,6 +113,7 @@ func newTSODispatcher(
// maxBatchSize,
// )
//})
id := fmt.Sprintf("%d", dispatcherIDAlloc.Add(1))
td := &tsoDispatcher{
ctx: dispatcherCtx,
cancel: dispatcherCancel,
Expand All @@ -126,7 +131,11 @@ func newTSODispatcher(
updateConnectionCtxsCh: make(chan struct{}, 1),
tsoReqTokenCh: make(chan struct{}, 256),

dispatcherID: fmt.Sprintf("%d", dispatcherIDAlloc.Add(1)),
dispatcherID: id,

beforeHandleDurationHist: NewAutoDumpingHistogram("beforeHandleDurationHist-"+id, 2e-5, 2000, 1, time.Minute),
batchWaitTimerDuration: NewAutoDumpingHistogram("batchWaitTimerDurationHist-"+id, 2e-5, 2000, 1, time.Minute),
batchNoWaitCollectDuration: NewAutoDumpingHistogram("batchNoWaitCollectDurationHist-"+id, 2e-5, 2000, 1, time.Minute),
}
go td.watchTSDeadline()
return td
Expand Down Expand Up @@ -302,7 +311,7 @@ tsoBatchLoop:
return
case firstRequest := <-td.reqChan:
batchController.batchStartTime = time.Now()
batchController.pushRequest(firstRequest)
batchController.pushRequest(firstRequest, td.beforeHandleDurationHist, batchController.batchStartTime)
// Token is not ready. Continue the loop to wait for the token or another request.
continue
case <-td.tsoReqTokenCh:
Expand All @@ -317,7 +326,7 @@ tsoBatchLoop:
return
case firstRequest := <-td.reqChan:
batchController.batchStartTime = time.Now()
batchController.pushRequest(firstRequest)
batchController.pushRequest(firstRequest, td.beforeHandleDurationHist, batchController.batchStartTime)
// Token is not ready. Continue the loop to wait for the token or another request.
}
}
Expand Down Expand Up @@ -388,7 +397,8 @@ tsoBatchLoop:
latency := stream.EstimatedRoundTripLatency()
estimateTSOLatencyGauge.WithLabelValues(td.dispatcherID, streamURL).Set(latency.Seconds())
totalBatchTime := latency / time.Duration(concurrencyFactor)
remainingBatchTime := totalBatchTime - time.Since(currentBatchStartTime)
waitTimerStart := time.Now()
remainingBatchTime := totalBatchTime - waitTimerStart.Sub(currentBatchStartTime)
if remainingBatchTime > 0 && !batchNoWait {
if !batchingTimer.Stop() {
select {
Expand All @@ -405,13 +415,16 @@ tsoBatchLoop:
zap.String("dc-location", dc))
return
case req := <-td.reqChan:
batchController.pushRequest(req)
batchController.pushRequest(req, td.beforeHandleDurationHist, time.Now())
case <-batchingTimer.C:
break batchingLoop
}
}
}
waitTimerEnd := time.Now()
td.batchWaitTimerDuration.Observe(waitTimerEnd.Sub(waitTimerStart).Seconds(), waitTimerEnd)

nowaitCollectStart := time.Now()
// Continue collecting as many as possible without blocking
nonWaitingBatchLoop:
for {
Expand All @@ -421,11 +434,13 @@ tsoBatchLoop:
zap.String("dc-location", dc))
return
case req := <-td.reqChan:
batchController.pushRequest(req)
batchController.pushRequest(req, td.beforeHandleDurationHist, nowaitCollectStart)
default:
break nonWaitingBatchLoop
}
}
nowaitCollectEnd := time.Now()
td.batchWaitTimerDuration.Observe(nowaitCollectEnd.Sub(nowaitCollectStart).Seconds(), nowaitCollectEnd)

//done := make(chan struct{})
//dl := newTSDeadline(option.timeout, done, cancel)
Expand Down Expand Up @@ -598,7 +613,7 @@ func (td *tsoDispatcher) cancelCollectedRequests(tbc *tsoBatchController, err er
td.tsoReqTokenCh <- struct{}{}
}

func (td *tsoDispatcher) onBatchedRespReceived(reqID uint64, result tsoRequestResult, err error, statFunc func(latency time.Duration)) {
func (td *tsoDispatcher) onBatchedRespReceived(reqID uint64, result tsoRequestResult, err error, statFunc func(latency time.Duration, now time.Time)) {
tbc, loaded := td.pendingBatches.LoadAndDelete(reqID)
if !loaded {
log.Info("received response for already abandoned requests")
Expand Down
Loading

0 comments on commit 77da938

Please sign in to comment.