Skip to content

Commit

Permalink
feat: do not restart consumer when foundNewKeys to avoid rebalance
Browse files Browse the repository at this point in the history
  • Loading branch information
YenchangChan committed Jun 11, 2024
1 parent bff611b commit 016f803
Showing 1 changed file with 4 additions and 2 deletions.
6 changes: 4 additions & 2 deletions task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,10 @@ func (service *Service) Put(msg *model.InputMessage, traceId string, flushFn fun
// 2) flush the shards
// 3) apply the schema change.
// 4) recreate the service
util.Logger.Warn("new key detected, consumer is going to restart", zap.String("consumer group", service.taskCfg.ConsumerGroup), zap.Error(err))
go service.consumer.restart()
if len(service.consumer.grpConfig.Configs) > 1 {
util.Logger.Warn("new key detected, consumer is going to restart", zap.String("consumer group", service.taskCfg.ConsumerGroup), zap.Error(err))
go service.consumer.restart()
}
flushFn(traceId, "foundNewKeys and restart")
if err = service.clickhouse.ChangeSchema(&service.newKeys); err != nil {
util.Logger.Fatal("clickhouse.ChangeSchema failed", zap.String("task", taskCfg.Name), zap.Error(err))
Expand Down

0 comments on commit 016f803

Please sign in to comment.