Skip to content

Commit

Permalink
feat: add record pool size to avoid using too much memory
Browse files Browse the repository at this point in the history
  • Loading branch information
YenchangChan committed Mar 5, 2024
1 parent 3121b22 commit d9ab557
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 3 deletions.
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type Config struct {
Assignment Assignment
LogLevel string
LogTrace bool
RecordPoolSize int64
ReloadSeriesMapInterval int
ActiveSeriesRange int

Expand Down Expand Up @@ -340,6 +341,9 @@ func (cfg *Config) Normallize(constructGroup bool, httpAddr string, cred util.Cr
}
}
}
if cfg.RecordPoolSize == 0 {
cfg.RecordPoolSize = MaxBufferSize
}
switch strings.ToLower(cfg.LogLevel) {
case "debug", "info", "warn", "error", "dpanic", "panic", "fatal":
default:
Expand Down
5 changes: 4 additions & 1 deletion docs/configuration/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,9 @@
// utilization, only active series from the last "activeSeriesRange" seconds will be cached, and the map in the cache will be updated every
// "reloadSeriesMapInterval" seconds. By default, series from the last 24 hours will be cached, and the cache will be updated every hour.
"reloadSeriesMapInterval": 3600,
"activeSeriesRange": 86400
"activeSeriesRange": 86400,
"logTrace": false,
// It is recommended that recordPoolSize be 3 or 4 times the bufferSize, for the backpressure mechanism, to avoid using too much memory.
"recordPoolSize": 1048576
}
```
8 changes: 6 additions & 2 deletions input/kafka_franz.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"context"
"crypto/tls"
"fmt"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -181,6 +180,9 @@ func (k *KafkaFranz) Run() {
defer k.wgRun.Done()
LOOP:
for {
if !util.Rs.Allow() {
continue
}
traceId := util.GenTraceId()
util.LogTrace(traceId, util.TraceKindFetchStart, zap.String("consumer group", k.grpConfig.Name), zap.Int("buffersize", k.grpConfig.BufferSize))
fetches := k.cl.PollRecords(k.ctx, k.grpConfig.BufferSize)
Expand All @@ -192,7 +194,9 @@ LOOP:
err = errors.Wrapf(err, "")
util.Logger.Info("kgo.Client.PollFetchs() got an error", zap.Error(err))
}
util.LogTrace(traceId, util.TraceKindFetchEnd, zap.String("consumer group", k.grpConfig.Name), zap.String("records", strconv.Itoa(fetches.NumRecords())))
fetchRecords := fetches.NumRecords()
util.Rs.Inc(int64(fetchRecords))
util.LogTrace(traceId, util.TraceKindFetchEnd, zap.String("consumer group", k.grpConfig.Name), zap.Int64("records", int64(fetchRecords)))
// Automatically end the program if it remains inactive for a specific duration of time.
t := time.NewTimer(processTimeOut * time.Minute)
select {
Expand Down
2 changes: 2 additions & 0 deletions output/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ func (c *ClickHouse) Send(batch *model.Batch, traceId string) {
statistics.WritingPoolBacklog.WithLabelValues(c.taskCfg.Name).Dec()
}); err != nil {
batch.Wg.Done()
util.Rs.Dec(int64(batch.RealSize))
return
}

Expand Down Expand Up @@ -256,6 +257,7 @@ func (c *ClickHouse) loopWrite(batch *model.Batch, sc *pool.ShardConn, traceId s

util.LogTrace(traceId, util.TraceKindWriteStart, zap.Int("realsize", batch.RealSize))
defer func() {
util.Rs.Dec(int64(batch.RealSize))
util.LogTrace(traceId, util.TraceKindWriteEnd, zap.Int("success", batch.RealSize))
}()
times := c.cfg.Clickhouse.RetryTimes
Expand Down
2 changes: 2 additions & 0 deletions task/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ func (c *Consumer) processFetch() {
if e := tsk.Put(msg, traceId, flushFn); e != nil {
atomic.StoreInt64(&done, items)
err = e
// decrise the error record
util.Rs.Dec(1)
return false
}
}
Expand Down
2 changes: 2 additions & 0 deletions task/sinker.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ func (s *Sinker) stopAllTasks() {
func (s *Sinker) applyConfig(newCfg *config.Config) (err error) {
util.SetLogLevel(newCfg.LogLevel)
util.SetLogTrace(newCfg.LogTrace)
util.Rs.SetPoolSize(newCfg.RecordPoolSize)
util.Rs.Reset()
if s.curCfg == nil {
// The first time invoking of applyConfig
err = s.applyFirstConfig(newCfg)
Expand Down
35 changes: 35 additions & 0 deletions util/recordpoolsize.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package util

import "sync/atomic"

type RecordSize struct {
poolSize int64
realSize int64
}

func (rs *RecordSize) SetPoolSize(size int64) {
rs.poolSize = size
}

func (rs *RecordSize) Inc(size int64) {
atomic.AddInt64(&rs.realSize, size)
}

func (rs *RecordSize) Reset() {
atomic.StoreInt64(&rs.realSize, 0)
}

func (rs *RecordSize) Dec(size int64) {
atomic.AddInt64(&rs.realSize, size*(-1))
}

func (rs *RecordSize) Get() int64 {
return atomic.LoadInt64(&rs.realSize)
}

func (rs *RecordSize) Allow() bool {
realSize := atomic.LoadInt64(&rs.realSize)
return realSize < rs.poolSize
}

var Rs RecordSize

0 comments on commit d9ab557

Please sign in to comment.