diff --git a/admin/admin.go b/admin/admin.go new file mode 100644 index 00000000..e4e1ab24 --- /dev/null +++ b/admin/admin.go @@ -0,0 +1,753 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package admin + +import ( + "context" + "encoding/json" + "fmt" + "math" + "math/rand" + "strconv" + "strings" + "sync" + "time" + + "github.com/apache/rocketmq-client-go/v2/rlog" + "github.com/pkg/errors" + "github.com/tidwall/gjson" + "golang.org/x/sync/errgroup" + + "github.com/apache/rocketmq-client-go/v2/consumer" + "github.com/apache/rocketmq-client-go/v2/internal" + "github.com/apache/rocketmq-client-go/v2/internal/remote" + "github.com/apache/rocketmq-client-go/v2/primitive" +) + +var ( + ErrNotFound = errors.New("no msg found") +) + +type QueueOffset struct { + *primitive.MessageQueue + Offset int64 +} + +func (offset *QueueOffset) String() string { + if offset == nil { + return fmt.Sprintf("queueOffset is nil") + } else { + return fmt.Sprintf("MessageQueue [Topic=%s, brokerName=%s, queueId=%d] offset: %d", offset.Topic, offset.BrokerName, offset.QueueId, offset.Offset) + } +} + +type QueueErr struct { + *primitive.MessageQueue + Err error +} + +func (queueErr *QueueErr) String() string { + if queueErr == nil { + return fmt.Sprintf("queueOffset is nil") + } else { + return fmt.Sprintf("MessageQueue [Topic=%s, brokerName=%s, queueId=%d] offset: %d", queueErr.Topic, queueErr.BrokerName, queueErr.QueueId, queueErr.Err) + } +} + +type Admin interface { + FetchConsumerOffsets(ctx context.Context, topic string, group string) ([]QueueOffset, error) + FetchConsumerOffset(ctx context.Context, group string, mq *primitive.MessageQueue) (int64, error) + + SearchOffsets(ctx context.Context, topic string, expected time.Time) ([]QueueOffset, error) + SearchOffset(ctx context.Context, expected time.Time, mq *primitive.MessageQueue) (int64, error) + + ResetConsumerOffsets(ctx context.Context, topic string, group string, offset int64) ([]QueueErr, error) + ResetConsumerOffset(ctx context.Context, group string, mq *primitive.MessageQueue, offset int64) error + + MinOffsets(ctx context.Context, topic string) ([]QueueOffset, error) + MinOffset(ctx context.Context, mq *primitive.MessageQueue) (int64, error) + + MaxOffsets(ctx context.Context, topic string) ([]QueueOffset, error) + MaxOffset(ctx context.Context, mq *primitive.MessageQueue) (int64, error) + + SearchKey(ctx context.Context, topic string, key string, maxNum int) ([]*primitive.MessageExt, error) + + ViewMessageByPhyOffsets(ctx context.Context, topic string, offset int64) ([]*primitive.MessageExt, error) + ViewMessageByPhyOffset(ctx context.Context, mq *primitive.MessageQueue, offset int64) ([]*primitive.MessageExt, error) + + ViewMessageByQueueOffset(ctx context.Context, queue *primitive.MessageQueue, offset int64) (*primitive.MessageExt, error) + + GetConsumerConnectionList(ctx context.Context, group string) (*ConsumerConnection, error) + GetConsumerIdList(ctx context.Context, group string) ([]string, error) + GetConsumerRunningInfo(ctx context.Context, group string, clientID string) (*internal.ConsumerRunningInfo, error) + Allocation(ctx context.Context, group string) (map[primitive.MessageQueue]string, error) + + Close() error +} + +// TODO: 超时的内容, 全部转移到 ctx +type adminOptions struct { + internal.ClientOptions +} + +type AdminOption func(options *adminOptions) + +func defaultAdminOptions() *adminOptions { + opts := &adminOptions{ + ClientOptions: internal.DefaultClientOptions(), + } + opts.GroupName = "TOOLS_CONSUMER" + opts.InstanceName = time.Now().String() + return opts +} + +// WithResolver nameserver resolver to fetch nameserver addr +func WithResolver(resolver primitive.NsResolver) AdminOption { + return func(options *adminOptions) { + options.Resolver = resolver + } +} + +// WithGroupName consumer group name +func WithGroupName(groupName string) AdminOption { + return func(options *adminOptions) { + options.GroupName = groupName + } +} + +type admin struct { + cli internal.RMQClient + namesrv internal.Namesrvs + + opts *adminOptions + + closeOnce sync.Once +} + +// NewAdmin initialize admin +func NewAdmin(opts ...AdminOption) (Admin, error) { + defaultOpts := defaultAdminOptions() + for _, opt := range opts { + opt(defaultOpts) + } + + cli := internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil) + namesrv, err := internal.NewNamesrv(defaultOpts.Resolver) + if err != nil { + return nil, err + } + + return &admin{ + cli: cli, + namesrv: namesrv, + opts: defaultOpts, + }, nil +} + +func (a *admin) parallel(ctx context.Context, topic string, task func(mq *primitive.MessageQueue)) error { + mqs, err := a.namesrv.FetchPublishMessageQueues(topic) + if err != nil { + return err + } + var group errgroup.Group + for _, mq := range mqs { + tmp := mq + group.Go(func() error { + task(tmp) + return nil + }) + } + return group.Wait() +} + +// FetchConsumerOffsets parallel version of FetchConsumerOffset for multi mq, return Offset=-1 if single `FetchConsumerOffset` failed. +func (a *admin) FetchConsumerOffsets(ctx context.Context, topic string, group string) ([]QueueOffset, error) { + var queueOffsets []QueueOffset + var lock sync.Mutex + + err := a.parallel(ctx, topic, func(mq *primitive.MessageQueue) { + offset, err := a.FetchConsumerOffset(ctx, group, mq) + if err != nil { + offset = -1 + } + + lock.Lock() + queueOffset := QueueOffset{ + MessageQueue: mq, + Offset: offset, + } + queueOffsets = append(queueOffsets, queueOffset) + lock.Unlock() + }) + if err != nil { + return nil, err + } + + return queueOffsets, nil +} + +// FetchConsumerOffset fetch consumer offset of speified queue, use FetchConsumerOffsets to get topic offset of group +func (a *admin) FetchConsumerOffset(ctx context.Context, group string, mq *primitive.MessageQueue) (int64, error) { + broker, err := a.getAddr(mq) + if err != nil { + return -1, err + } + + queryOffsetRequest := &internal.QueryConsumerOffsetRequestHeader{ + ConsumerGroup: group, + Topic: mq.Topic, + QueueId: mq.QueueId, + } + cmd := remote.NewRemotingCommand(internal.ReqQueryConsumerOffset, queryOffsetRequest, nil) + ctx, cancel := context.WithTimeout(ctx, 3*time.Second) + defer cancel() + res, err := a.cli.InvokeSync(ctx, broker, cmd, 3*time.Second) + if err != nil { + return -1, err + } + + if res.Code != internal.ResSuccess { + return -2, fmt.Errorf("broker response code: %d, remarks: %s", res.Code, res.Remark) + } + return strconv.ParseInt(res.ExtFields["offset"], 10, 64) +} + +// SearchOffsets parallel version of SearchOffset +func (a *admin) SearchOffsets(ctx context.Context, topic string, expected time.Time) ([]QueueOffset, error) { + var queueOffsets []QueueOffset + var lock sync.Mutex + + err := a.parallel(ctx, topic, func(mq *primitive.MessageQueue) { + offset, err := a.SearchOffset(ctx, expected, mq) + if err != nil { + offset = -1 + } + + lock.Lock() + queueOffset := QueueOffset{ + MessageQueue: mq, + Offset: offset, + } + queueOffsets = append(queueOffsets, queueOffset) + lock.Unlock() + }) + if err != nil { + return nil, err + } + + return queueOffsets, nil +} + +// SearchOffset get queue offset of speficield time +func (a *admin) SearchOffset(ctx context.Context, expected time.Time, mq *primitive.MessageQueue) (int64, error) { + addr, err := a.getAddr(mq) + if err != nil { + return -1, err + } + + request := &internal.SearchOffsetRequestHeader{ + Topic: mq.Topic, + QueueId: mq.QueueId, + Timestamp: expected.Unix() * 1000, + } + + cmd := remote.NewRemotingCommand(internal.ReqSearchOffsetByTimestamp, request, nil) + response, err := a.cli.InvokeSync(ctx, addr, cmd, 10*time.Second) + if err != nil { + rlog.Error("invoke sync failed", map[string]interface{}{ + rlog.LogKeyUnderlayError: err, + }) + return -1, err + } + + return strconv.ParseInt(response.ExtFields["offset"], 10, 64) +} + +// ResetConsumerOffsets result offset of group of specified topic +func (a *admin) ResetConsumerOffsets(ctx context.Context, topic string, group string, offset int64) ([]QueueErr, error) { + var queueErrs []QueueErr + var lock sync.Mutex + + err := a.parallel(ctx, topic, func(mq *primitive.MessageQueue) { + err := a.ResetConsumerOffset(ctx, group, mq, offset) + + lock.Lock() + queueErr := QueueErr{ + MessageQueue: mq, + Err: err, + } + queueErrs = append(queueErrs, queueErr) + lock.Unlock() + }) + if err != nil { + return nil, err + } + + return queueErrs, nil +} + +// ResetConsumerOffset reset offset back or forward to specified offset +func (a *admin) ResetConsumerOffset(ctx context.Context, group string, mq *primitive.MessageQueue, offset int64) error { + broker, err := a.getAddr(mq) + if err != nil { + return err + } + + updateOffsetRequest := &internal.UpdateConsumerOffsetRequestHeader{ + ConsumerGroup: group, + Topic: mq.Topic, + QueueId: mq.QueueId, + CommitOffset: offset, + } + cmd := remote.NewRemotingCommand(internal.ReqUpdateConsumerOffset, updateOffsetRequest, nil) + return a.cli.InvokeOneWay(ctx, broker, cmd, 5*time.Second) +} + +// SearchKey search key from topic, return empty slice if no msg found +func (a *admin) SearchKey(ctx context.Context, topic string, key string, maxNum int) ([]*primitive.MessageExt, error) { + topicRoute, _, err := a.namesrv.UpdateTopicRouteInfo(topic) + if err != nil { + return nil, err + } + addrs := make(map[string]string) + for _, broker := range topicRoute.BrokerDataList { + slaves := broker.GetSlaves() + if len(slaves) > 0 { + addr := slaves[0] + addrs[broker.BrokerName] = addr + continue + } + + master := broker.MasterAddr() + if len(master) > 0 { + addrs[broker.BrokerName] = master + continue + } + } + + if len(addrs) == 0 { + return nil, fmt.Errorf("no broker found for topic: %s", topic) + } + + queryMsgs := make([]*primitive.MessageExt, 0) + var lock sync.Mutex + var wg sync.WaitGroup + wg.Add(len(addrs)) + for b, baddr := range addrs { + brokerName, addr := b, baddr + header := &internal.QueryMessageRequestHeader{ + Topic: topic, + Key: key, + MaxNum: maxNum, + BeginTimestamp: 0, + EndTimestamp: math.MaxInt64, + } + cmd := remote.NewRemotingCommand(internal.ReqQueryMessage, header, nil) + cmd.ExtFields["_UNIQUE_KEY_QUERY"] = "false" + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + err := a.cli.InvokeAsync(ctx, addr, cmd, func(command *remote.RemotingCommand, e error) { + cancel() + if e != nil { + rlog.Error(fmt.Sprintf("invoke %s failed", addr), map[string]interface{}{ + rlog.LogKeyUnderlayError: err, + }) + return + } + switch command.Code { + case internal.ResSuccess: + lock.Lock() + msgs := primitive.DecodeMessage(command.Body) + for _, msg := range msgs { + msg.Queue = &primitive.MessageQueue{ + Topic: topic, + BrokerName: brokerName, + QueueId: int(msg.QueueId), + } + msg.StoreHost = addr + } + + for _, msg := range msgs { + keys := msg.GetKeys() + kiter := strings.Split(keys, primitive.PropertyKeySeparator) + for _, k := range kiter { + if k == key { + queryMsgs = append(queryMsgs, msg) + } + } + } + lock.Unlock() + default: + rlog.Warning(fmt.Sprintf("invoke addr: %s failed with code: %d", addr, command.Code), + map[string]interface{}{ + rlog.LogKeyUnderlayError: err, + }) + } + wg.Done() + }) + if err != nil { + cancel() + } + } + wg.Wait() + return queryMsgs, nil +} + +// MinOffsets parallel version of MinOffset for multi mq +func (a *admin) MinOffsets(ctx context.Context, topic string) ([]QueueOffset, error) { + var queueOffsets []QueueOffset + var lock sync.Mutex + + err := a.parallel(ctx, topic, func(mq *primitive.MessageQueue) { + offset, err := a.MinOffset(ctx, mq) + if err != nil { + offset = -1 + } + + lock.Lock() + queueOffset := QueueOffset{ + MessageQueue: mq, + Offset: offset, + } + queueOffsets = append(queueOffsets, queueOffset) + lock.Unlock() + }) + if err != nil { + return nil, err + } + + return queueOffsets, nil +} + +// MinOffset get min offset of specified queue, return Offset=-1 if single `MinOffset` failed. +func (a *admin) MinOffset(ctx context.Context, mq *primitive.MessageQueue) (int64, error) { + brokerAddr, err := a.getAddr(mq) + if err != nil { + return -1, err + } + + request := &internal.GetMinOffsetRequestHeader{ + Topic: mq.Topic, + QueueId: mq.QueueId, + } + + cmd := remote.NewRemotingCommand(internal.ReqGetMinOffset, request, nil) + // TODO: zero.xu remove extra timeout param + response, err := a.cli.InvokeSync(ctx, brokerAddr, cmd, 3*time.Second) + if err != nil { + return -1, err + } + + return strconv.ParseInt(response.ExtFields["offset"], 10, 64) +} + +// MaxOffsets parallel version of MaxOffsets for multi mq +func (a *admin) MaxOffsets(ctx context.Context, topic string) ([]QueueOffset, error) { + var queueOffsets []QueueOffset + var lock sync.Mutex + + err := a.parallel(ctx, topic, func(mq *primitive.MessageQueue) { + offset, err := a.MaxOffset(ctx, mq) + if err != nil { + rlog.Error("maxOffset get failed", map[string]interface{}{ + rlog.LogKeyUnderlayError: err, + }) + // mq failed. + offset = -1 + } + + lock.Lock() + queueOffset := QueueOffset{ + MessageQueue: mq, + Offset: offset, + } + queueOffsets = append(queueOffsets, queueOffset) + lock.Unlock() + }) + if err != nil { + return nil, err + } + + return queueOffsets, nil +} + +// MaxOffset fetch max offset of specified queue, return Offset=-1 if single `MaxOffsets` failed +func (a *admin) MaxOffset(ctx context.Context, mq *primitive.MessageQueue) (int64, error) { + brokerAddr, err := a.getAddr(mq) + if err != nil { + return -1, err + } + + request := &internal.GetMaxOffsetRequestHeader{ + Topic: mq.Topic, + QueueId: mq.QueueId, + } + + cmd := remote.NewRemotingCommand(internal.ReqGetMaxOffset, request, nil) + response, err := a.cli.InvokeSync(ctx, brokerAddr, cmd, 3*time.Second) + if err != nil { + return -1, err + } + + return strconv.ParseInt(response.ExtFields["offset"], 10, 64) +} + +func (a *admin) getAddr(mq *primitive.MessageQueue) (string, error) { + broker := a.namesrv.FindBrokerAddrByName(mq.BrokerName) + if len(broker) == 0 { + a.namesrv.UpdateTopicRouteInfo(mq.Topic) + broker = a.namesrv.FindBrokerAddrByName(mq.BrokerName) + + if len(broker) == 0 { + return "", fmt.Errorf("broker: %s address not found", mq.BrokerName) + } + } + return broker, nil +} + +// ViewMessageByPhyOffsets parallel version of ViewMessageByPhyOffset +func (a *admin) ViewMessageByPhyOffsets(ctx context.Context, topic string, offset int64) ([]*primitive.MessageExt, error) { + var msgs []*primitive.MessageExt + var lock sync.Mutex + + err := a.parallel(ctx, topic, func(mq *primitive.MessageQueue) { + queueMsgs, err := a.ViewMessageByPhyOffset(ctx, mq, offset) + if err != nil { + rlog.Error(fmt.Sprintf("maxOffset get failed"), map[string]interface{}{ + rlog.LogKeyUnderlayError: err, + }) + } + + lock.Lock() + msgs = append(msgs, queueMsgs...) + lock.Unlock() + }) + if err != nil { + return nil, err + } + + return msgs, nil +} + +// ViewMessageByPhyOffset get message by commitlog offset +func (a *admin) ViewMessageByPhyOffset(ctx context.Context, mq *primitive.MessageQueue, offset int64) ([]*primitive.MessageExt, error) { + brokerAddr, err := a.getAddr(mq) + if err != nil { + return nil, err + } + + request := &internal.ViewMessageRequestHeader{ + Offset: offset, + } + + cmd := remote.NewRemotingCommand(internal.ReqViewMessageByID, request, nil) + // TODO: zero.xu remove timeout later + response, err := a.cli.InvokeSync(ctx, brokerAddr, cmd, 3*time.Second) + if err != nil { + return nil, err + } + + switch response.Code { + case internal.ResSuccess: + msgs := primitive.DecodeMessage(response.Body) + return msgs, nil + default: + rlog.Warning(fmt.Sprintf("unexpected response code: %d", response.Code), nil) + } + + return nil, nil +} + +// ViewMessageByQueueOffset get message of specified offset of queue +func (a *admin) ViewMessageByQueueOffset(ctx context.Context, queue *primitive.MessageQueue, offset int64) (*primitive.MessageExt, error) { + brokerAddr, err := a.getAddr(queue) + if err != nil { + return nil, err + } + sysFlag := consumer.BuildSysFlag(false, true, true, false) + data := consumer.BuildSubscriptionData(queue.Topic, consumer.MessageSelector{}) + + pullRequest := &internal.PullMessageRequestHeader{ + ConsumerGroup: a.opts.GroupName, + Topic: queue.Topic, + QueueId: int32(queue.QueueId), + QueueOffset: offset, + MaxMsgNums: int32(1), + SysFlag: sysFlag, + CommitOffset: 0, + SuspendTimeoutMillis: 0, + SubExpression: data.SubString, + SubVersion: 0, + ExpressionType: data.ExpType, + } + + res, err := a.cli.PullMessage(ctx, brokerAddr, pullRequest) + if err != nil { + return nil, err + } + switch res.Status { + case primitive.PullFound: + msgs := primitive.DecodeMessage(res.GetBody()) + if len(msgs) == 0 { + return nil, ErrNotFound + } + msgs[0].StoreHost = brokerAddr + msgs[0].Queue = queue + return msgs[0], nil + default: + } + return nil, ErrNotFound +} + +// Allocation get partition-client allocation info of group +func (a *admin) Allocation(ctx context.Context, group string) (map[primitive.MessageQueue]string, error) { + + ids, err := a.GetConsumerIdList(ctx, group) + if err != nil { + return nil, err + } + alloc := make(map[primitive.MessageQueue]string) + retryTopic := "%RETRY%" + group + for i := range ids { + id := ids[i] + runningInfo, err := a.GetConsumerRunningInfo(ctx, group, id) + if err != nil { + return alloc, err + } + for k := range runningInfo.MQTable { + if k.Topic == retryTopic { + continue + } + alloc[k] = id + } + } + return alloc, nil +} + +// GetConsumerConnectionList get all client connection info of group +func (a *admin) GetConsumerConnectionList(ctx context.Context, group string) (*ConsumerConnection, error) { + retryTopic := internal.GetRetryTopic(group) + topicRoute, _, err := a.namesrv.UpdateTopicRouteInfo(retryTopic) + if err != nil { + return nil, err + } + i := rand.Intn(len(topicRoute.BrokerDataList)) + brokerAddr := topicRoute.BrokerDataList[i].SelectBrokerAddr() + + req := &internal.GetConsumerListRequestHeader{ + ConsumerGroup: group, + } + cmd := remote.NewRemotingCommand(internal.ReqGetConsumerListByGroup, req, nil) + response, err := a.cli.InvokeSync(ctx, brokerAddr, cmd, 10*time.Second) + if err != nil { + return nil, err + } + + switch response.Code { + case internal.ResSuccess: + c := &ConsumerConnection{} + err = json.Unmarshal(response.Body, c) + if err != nil { + rlog.Error("unmarshal consumer list info failed", map[string]interface{}{ + rlog.LogKeyUnderlayError: err, + }) + } + return c, nil + default: + rlog.Warning(fmt.Sprintf("unexpected response code: %d", response.Code), nil) + } + return nil, primitive.MQBrokerErr{ResponseCode: response.Code, ErrorMessage: response.Remark} +} + +// GetConsumerIdList get all group client id +func (a *admin) GetConsumerIdList(ctx context.Context, group string) ([]string, error) { + retryTopic := internal.GetRetryTopic(group) + topicRoute, _, err := a.namesrv.UpdateTopicRouteInfo(retryTopic) + if err != nil { + return nil, err + } + i := rand.Intn(len(topicRoute.BrokerDataList)) + brokerAddr := topicRoute.BrokerDataList[i].SelectBrokerAddr() + + req := &internal.GetConsumerListRequestHeader{ + ConsumerGroup: group, + } + cmd := remote.NewRemotingCommand(internal.ReqGetConsumerListByGroup, req, nil) + // TODO: xujianhai666 remove timeout later + response, err := a.cli.InvokeSync(ctx, brokerAddr, cmd, 10*time.Second) + if err != nil { + return nil, err + } + + switch response.Code { + case internal.ResSuccess: + result := gjson.ParseBytes(response.Body) + list := make([]string, 0) + arr := result.Get("consumerIdList").Array() + for idx := range arr { + list = append(list, arr[idx].String()) + } + return list, nil + default: + rlog.Error(fmt.Sprintf("unexpected response code: %d", response.Code), nil) + } + return nil, primitive.MQBrokerErr{ResponseCode: response.Code, ErrorMessage: response.Remark} +} + +// GetConsumerRunningInfo fetch running info of speified client of group +func (a *admin) GetConsumerRunningInfo(ctx context.Context, group string, clientID string) (*internal.ConsumerRunningInfo, error) { + retryTopic := internal.GetRetryTopic(group) + topicRoute, _, err := a.namesrv.UpdateTopicRouteInfo(retryTopic) + if err != nil { + return nil, err + } + i := rand.Intn(len(topicRoute.BrokerDataList)) + brokerAddr := topicRoute.BrokerDataList[i].SelectBrokerAddr() + + req := &internal.GetConsumerRunningInfoRequestHeader{ + ConsumerGroup: group, + ClientID: clientID, + } + cmd := remote.NewRemotingCommand(internal.ReqGetConsumerRunningInfo, req, nil) + response, err := a.cli.InvokeSync(ctx, brokerAddr, cmd, 10*time.Second) + if err != nil { + return nil, err + } + + switch response.Code { + case internal.ResSuccess: + info := &internal.ConsumerRunningInfo{} + err := info.Decode(response.Body) + if err != nil { + rlog.Error("unmarshal failed", map[string]interface{}{ + rlog.LogKeyUnderlayError: err, + }) + return nil, err + } + return info, nil + default: + rlog.Error("unmarshal failed", nil) + } + return nil, primitive.MQBrokerErr{ResponseCode: response.Code, ErrorMessage: response.Remark} +} + +func (a *admin) Close() error { + a.closeOnce.Do(func() { + a.cli.Shutdown() + }) + return nil +} diff --git a/admin/admin_test.go b/admin/admin_test.go new file mode 100644 index 00000000..0c90f7c9 --- /dev/null +++ b/admin/admin_test.go @@ -0,0 +1,48 @@ +package admin + +import ( + "encoding/json" + "testing" + + "github.com/apache/rocketmq-client-go/v2/consumer" + "github.com/apache/rocketmq-client-go/v2/internal" + "github.com/stretchr/testify/assert" +) + +func TestMap(t *testing.T) { + + MQTable := map[string]internal.ProcessQueueInfo{ + "hahah": { + Locked: true, + }, + } + data, err := json.Marshal(MQTable) + assert.Nil(t, err) + t.Log("data info: ", string(data)) + + b := map[string]internal.ProcessQueueInfo{} + err = json.Unmarshal(data, &b) + assert.Nil(t, err) + t.Log("b: ", b) +} + +func TestOffset(t *testing.T) { + + MQTable := map[consumer.MessageQueueKey]internal.ProcessQueueInfo{ + { + Topic: "a", + BrokerName: "B-a", + QueueId: 1, + }: { + Locked: true, + }, + } + data, err := json.Marshal(MQTable) + assert.Nil(t, err) + t.Log("data info: ", string(data)) + + b := map[consumer.MessageQueueKey]internal.ProcessQueueInfo{} + err = json.Unmarshal(data, &b) + assert.Nil(t, err) + t.Log("b: ", b) +} diff --git a/admin/protocol.go b/admin/protocol.go new file mode 100644 index 00000000..07b1bea1 --- /dev/null +++ b/admin/protocol.go @@ -0,0 +1,38 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package admin + +import ( + "github.com/apache/rocketmq-client-go/v2/consumer" + "github.com/apache/rocketmq-client-go/v2/internal/remote" +) + +type Connection struct { + ClientId string `json:"clientId"` + ClientAddr string `json:"clientAddr"` + Language remote.LanguageCode `json:"language"` + Version int `json:"version"` +} + +// ConsumerConnection consumer connection info fetched from broker, consumer register info by heartbeat +type ConsumerConnection struct { + connectionSet map[Connection]struct{} + consumeType consumer.ConsumeType + messageModel consumer.MessageModel + consumeFromWhere consumer.ConsumeFromWhere +} diff --git a/consumer/consumer.go b/consumer/consumer.go index 3129b8d1..10c8318b 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -277,7 +277,7 @@ func (dc *defaultConsumer) start() error { if dc.model == Clustering { // set retry topic retryTopic := internal.GetRetryTopic(dc.consumerGroup) - sub := buildSubscriptionData(retryTopic, MessageSelector{TAG, _SubAll}) + sub := BuildSubscriptionData(retryTopic, MessageSelector{TAG, _SubAll}) dc.subscriptionDataTable.Store(retryTopic, sub) } @@ -336,7 +336,7 @@ func (dc *defaultConsumer) subscriptionAutomatically(topic string) { s := MessageSelector{ Expression: _SubAll, } - dc.subscriptionDataTable.Store(topic, buildSubscriptionData(topic, s)) + dc.subscriptionDataTable.Store(topic, BuildSubscriptionData(topic, s)) } } @@ -979,7 +979,7 @@ func (dc *defaultConsumer) searchOffsetByTimestamp(mq *primitive.MessageQueue, t return strconv.ParseInt(response.ExtFields["offset"], 10, 64) } -func buildSubscriptionData(topic string, selector MessageSelector) *internal.SubscriptionData { +func BuildSubscriptionData(topic string, selector MessageSelector) *internal.SubscriptionData { subData := &internal.SubscriptionData{ Topic: topic, SubString: selector.Expression, @@ -1014,7 +1014,7 @@ func buildSubscriptionData(topic string, selector MessageSelector) *internal.Sub return subData } -func buildSysFlag(commitOffset, suspend, subscription, classFilter bool) int32 { +func BuildSysFlag(commitOffset, suspend, subscription, classFilter bool) int32 { var flag int32 = 0 if commitOffset { flag |= 0x1 << 0 diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go index 0523f08d..f764753c 100644 --- a/consumer/pull_consumer.go +++ b/consumer/pull_consumer.go @@ -119,7 +119,7 @@ func (c *defaultPullConsumer) Pull(ctx context.Context, topic string, selector M return nil, fmt.Errorf("prepard to pull topic: %s, but no queue is founded", topic) } - data := buildSubscriptionData(mq.Topic, selector) + data := BuildSubscriptionData(mq.Topic, selector) result, err := c.pull(context.Background(), mq, data, c.nextOffsetOf(mq), numbers) if err != nil { @@ -197,7 +197,7 @@ func (c *defaultPullConsumer) pull(ctx context.Context, mq *primitive.MessageQue c.subscriptionAutomatically(mq.Topic) - sysFlag := buildSysFlag(false, true, true, false) + sysFlag := BuildSysFlag(false, true, true, false) pullResp, err := c.pullInner(ctx, mq, data, offset, numbers, sysFlag, 0) if err != nil { @@ -226,7 +226,7 @@ func (c *defaultPullConsumer) PullFrom(ctx context.Context, queue *primitive.Mes } selector := MessageSelector{} - data := buildSubscriptionData(queue.Topic, selector) + data := BuildSubscriptionData(queue.Topic, selector) return c.pull(ctx, queue, data, offset, numbers) } diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go index d2aee5b7..7b069987 100644 --- a/consumer/push_consumer.go +++ b/consumer/push_consumer.go @@ -228,7 +228,7 @@ func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector, if pc.option.Namespace != "" { topic = pc.option.Namespace + "%" + topic } - data := buildSubscriptionData(topic, selector) + data := BuildSubscriptionData(topic, selector) pc.subscriptionDataTable.Store(topic, data) pc.subscribedTopic[topic] = "" @@ -612,7 +612,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) { subExpression = sd.SubString } - sysFlag := buildSysFlag(commitOffsetEnable, true, subExpression != "", classFilter) + sysFlag := BuildSysFlag(commitOffsetEnable, true, subExpression != "", classFilter) pullRequest := &internal.PullMessageRequestHeader{ ConsumerGroup: pc.consumerGroup, diff --git a/examples/admin/main.go b/examples/admin/main.go new file mode 100644 index 00000000..1147f21e --- /dev/null +++ b/examples/admin/main.go @@ -0,0 +1,262 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "log" + "testing" + "time" + + "github.com/apache/rocketmq-client-go/v2/admin" + + "github.com/apache/rocketmq-client-go/v2/primitive" +) + +func main() { + TestFetchConsumerOffset() +} + +func initAdmin() admin.Admin { + var err error + + testAdmin, err := admin.NewAdmin(admin.WithResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"}))) + assert(err) + return testAdmin +} + +func TestFetchConsumerOffset() { + testAdmin := initAdmin() + + ctx := context.Background() + topic := "Test" + group := "test_group" + + mq := &primitive.MessageQueue{ + Topic: topic, + BrokerName: "sandbox_boe1", + QueueId: 0, + } + offset, err := testAdmin.FetchConsumerOffset(ctx, group, mq) + if err != nil { + panic(err) + } + + log.Printf("get offset: %v", offset) +} + +func TestFetchConsumerOffsets() { + testAdmin := initAdmin() + + ctx := context.Background() + topic := "Test" + group := "test_group" + + offsets, err := testAdmin.FetchConsumerOffsets(ctx, topic, group) + assert(err) + for _, offset := range offsets { + log.Printf("topic: %s brokerName: %s queueId: %d get Offset: %d", offset.Topic, offset.BrokerName, offset.QueueId, offset.Offset) + } +} + +func TestSearchOffset(t *testing.T) { + testAdmin := initAdmin() + + ctx := context.Background() + topic := "Test" + tm, err := time.ParseInLocation("2006-01-02 15:04:05", "2019-11-03 19:00:00", time.Local) + assert(err) + mq := &primitive.MessageQueue{ + Topic: topic, + BrokerName: "Default", + QueueId: 1, + } + + offset, err := testAdmin.SearchOffset(ctx, tm, mq) + assert(err) + log.Printf("Offset val: %v", offset) +} + +func TestResetConsumerOffset() { + TestFetchConsumerOffset() + testAdmin := initAdmin() + + ctx := context.Background() + topic := "Test" + group := "test_group" + mq := &primitive.MessageQueue{ + Topic: topic, + BrokerName: "Default", + QueueId: 0, + } + offset := int64(272572362) + err := testAdmin.ResetConsumerOffset(ctx, group, mq, offset) + assert(err) + log.Printf("reset Offset success.") +} + +func TestConcurrentSearchkey(t *testing.T) { + for i := 0; i < 100; i++ { + TestSearchKey() + } +} + +func TestSearchKey() { + testAdmin := initAdmin() + + ctx := context.Background() + topic := "Test" + key := "6716311733805435655" + maxNum := 32 + msgs, err := testAdmin.SearchKey(ctx, topic, key, maxNum) + assert(err) + for _, msg := range msgs { + log.Printf("msg: body:%v queue:%v\n", msg.StoreHost, *msg.Queue) + } +} + +func TestMinOffset() { + testAdmin := initAdmin() + + ctx := context.Background() + topic := "Test" + + mq := &primitive.MessageQueue{ + Topic: topic, + BrokerName: "Default", + QueueId: 0, + } + offset, err := testAdmin.MinOffset(ctx, mq) + assert(err) + + log.Printf("get topic min Offset: %v", offset) +} + +func TestMaxOffset(t *testing.T) { + testAdmin := initAdmin() + + ctx := context.Background() + topic := "Test" + + mq := &primitive.MessageQueue{ + Topic: topic, + BrokerName: "Default", + QueueId: 0, + } + offset, err := testAdmin.MaxOffset(ctx, mq) + assert(err) + + log.Printf("get topic max Offset: %v", offset) +} + +func TestMaxOffsets() { + testAdmin := initAdmin() + + ctx := context.Background() + topic := "Test" + + offsets, err := testAdmin.MaxOffsets(ctx, topic) + assert(err) + for _, offset := range offsets { + log.Printf("get topic max Offset: %v", offset.String()) + } +} + +func TestViewMessageByQueueOffset() { + testAdmin := initAdmin() + + ctx := context.Background() + topic := "Test" + + offsets, err := testAdmin.MaxOffsets(ctx, topic) + assert(err) + for _, offset := range offsets { + log.Printf("get topic max Offset: %v", offset.String()) + } + + if len(offsets) > 0 { + offset := offsets[0] + + msg, err := testAdmin.ViewMessageByQueueOffset(ctx, offset.MessageQueue, offset.Offset) + if err != nil { + log.Printf("pull msgs get err: %v", err) + } + log.Printf("get msgs: %v", msg) + } +} + +func TestView(t *testing.T) { + testAdmin := initAdmin() + topic := "Test" + _startTime := 1577203200000 + _endTime := 1577289600000 + + startOffsets, err := testAdmin.SearchOffsets(context.Background(), topic, time.Unix(int64(_startTime/1000), 0)) + assert(err) + endOffsets, err := testAdmin.SearchOffsets(context.Background(), topic, time.Unix(int64(_endTime/1000), 0)) + assert(err) + + for _, end := range endOffsets { + if end.Offset > 0 { + for _, startOffset := range startOffsets { + if startOffset.BrokerName == end.BrokerName && startOffset.QueueId == end.QueueId { + for offset := startOffset.Offset; offset <= end.Offset && offset < startOffset.Offset+5; offset++ { + messageExts, err := testAdmin.ViewMessageByQueueOffset(context.Background(), end.MessageQueue, int64(offset)) + if err != nil { + log.Printf("view broker:%v,queue:%v,offset:%v message by offset error!%v", end.BrokerName, end.QueueId, end.Offset, err) + continue + } + if messageExts != nil { + log.Printf("message ext: %v for queue: %v with offset: %d", messageExts, end.MessageQueue, messageExts.QueueOffset) + } + } + } + } + } + } +} + +func TestGetConsumerConnectionList() { + testAdmin := initAdmin() + + ids, err := testAdmin.GetConsumerIdList(context.Background(), "test_group") + assert(err) + log.Printf("consumer ids: %v", ids) +} + +func TestAllocation() { + testAdmin := initAdmin() + + alloc, err := testAdmin.Allocation(context.Background(), "test_group") + assert(err) + log.Printf("consumer alloc: %#v", alloc) +} + +func TestGetConsumerRunningInfo() { + testAdmin := initAdmin() + + ids, err := testAdmin.GetConsumerRunningInfo(context.Background(), "test_group", "custom_client_id") + assert(err) + log.Printf("consumer info: %#v", ids) +} + +func assert(err error) { + if err != nil { + panic(err) + } +} diff --git a/go.mod b/go.mod index 83889596..a5647253 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/tidwall/match v1.0.1 // indirect github.com/tidwall/pretty v0.0.0-20190325153808-1166b9ac2b65 // indirect go.uber.org/atomic v1.5.1 + golang.org/x/sync v0.0.0-20190423024810-112230192c58 stathat.com/c/consistent v1.0.0 ) diff --git a/go.sum b/go.sum index 3b724dbd..4c5052c4 100644 --- a/go.sum +++ b/go.sum @@ -41,6 +41,8 @@ github.com/tidwall/match v1.0.1 h1:PnKP62LPNxHKTwvHHZZzdOAOCtsJTjo6dZLCwpKm5xc= github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E= github.com/tidwall/pretty v0.0.0-20190325153808-1166b9ac2b65 h1:rQ229MBgvW68s1/g6f1/63TgYwYxfF4E+bi/KC19P8g= github.com/tidwall/pretty v0.0.0-20190325153808-1166b9ac2b65/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= +github.com/xujianhai666/go v1.1.10-0.20200118093701-ae604587eee1 h1:E4Uy91X4R0AmnkufDruLlvPJQIruQFmpAhKgqmzYqO0= +github.com/xujianhai666/go v1.1.10-0.20200118093701-ae604587eee1/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= go.uber.org/atomic v1.5.1 h1:rsqfU5vBkVknbhUGbAUwQKR2H4ItV8tjJ+6kJX4cxHM= go.uber.org/atomic v1.5.1/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -48,6 +50,7 @@ golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/internal/client.go b/internal/client.go index 7e07c5cc..5e5a19c2 100644 --- a/internal/client.go +++ b/internal/client.go @@ -226,17 +226,17 @@ func GetOrNewRocketMQClient(option ClientOptions, callbackCh chan interface{}) R client.remoteClient.RegisterRequestFunc(ReqGetConsumerRunningInfo, func(req *remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand { rlog.Info("receive get consumer running info request...", nil) - header := new(GetConsumerRunningInfoHeader) + header := new(GetConsumerRunningInfoRequestHeader) header.Decode(req.ExtFields) - val, exist := clientMap.Load(header.clientID) + val, exist := clientMap.Load(header.ClientID) res := remote.NewRemotingCommand(ResError, nil, nil) if !exist { - res.Remark = fmt.Sprintf("Can't find specified client instance of: %s", header.clientID) + res.Remark = fmt.Sprintf("Can't find specified client instance of: %s", header.ClientID) } else { cli, ok := val.(*rmqClient) var runningInfo *ConsumerRunningInfo if ok { - runningInfo = cli.getConsumerRunningInfo(header.consumerGroup) + runningInfo = cli.getConsumerRunningInfo(header.ConsumerGroup) } if runningInfo != nil { res.Code = ResSuccess diff --git a/internal/model.go b/internal/model.go index 0ee9ccc1..262ff565 100644 --- a/internal/model.go +++ b/internal/model.go @@ -253,6 +253,52 @@ func (info ConsumerRunningInfo) Encode() ([]byte, error) { return []byte(jsonData), nil } +func (info *ConsumerRunningInfo) Decode(data []byte) error { + iter := jsoniter.ParseBytes(jsoniter.ConfigCompatibleWithStandardLibrary, data) + + decodeMQKV := func() { + q := primitive.MessageQueue{} + iter.ReadVal(&q) + iter.NextToken() + pq := ProcessQueueInfo{} + iter.ReadVal(&pq) + info.MQTable[q] = pq + } + decodeMQTable := func() { + decodeMQKV() + t := iter.NextToken() + for t == ',' { + decodeMQKV() + t = iter.NextToken() + } + } + + _ = iter.ReadMapCB(func(iterator *jsoniter.Iterator, key string) bool { + switch key { + case "mqTable": + iter.NextToken() + info.MQTable = make(map[primitive.MessageQueue]ProcessQueueInfo) + decodeMQTable() + case "properties": + info.Properties = make(map[string]string) + iter.ReadVal(&info.Properties) + case "statusTable": + info.StatusTable = make(map[string]ConsumeStatus) + iter.ReadVal(&info.StatusTable) + case "subscriptionSet": + subs := make([]*SubscriptionData, 0) + iter.ReadVal(&subs) + info.SubscriptionData = make(map[*SubscriptionData]bool) + for i := range subs { + info.SubscriptionData[subs[i]] = true + } + } + return true + }) + + return nil +} + func NewConsumerRunningInfo() *ConsumerRunningInfo { return &ConsumerRunningInfo{ Properties: make(map[string]string), diff --git a/internal/request.go b/internal/request.go index 5438790f..ab58603b 100644 --- a/internal/request.go +++ b/internal/request.go @@ -26,10 +26,13 @@ import ( const ( ReqSendMessage = int16(10) ReqPullMessage = int16(11) + ReqQueryMessage = int16(12) ReqQueryConsumerOffset = int16(14) ReqUpdateConsumerOffset = int16(15) ReqSearchOffsetByTimestamp = int16(29) ReqGetMaxOffset = int16(30) + ReqGetMinOffset = int16(31) + ReqViewMessageByID = int16(33) ReqHeartBeat = int16(34) ReqConsumerSendMsgBack = int16(36) ReqENDTransaction = int16(37) @@ -228,6 +231,18 @@ func (request *GetConsumerListRequestHeader) Encode() map[string]string { return maps } +type GetMinOffsetRequestHeader struct { + Topic string + QueueId int +} + +func (request *GetMinOffsetRequestHeader) Encode() map[string]string { + maps := make(map[string]string) + maps["topic"] = request.Topic + maps["queueId"] = strconv.Itoa(request.QueueId) + return maps +} + type GetMaxOffsetRequestHeader struct { Topic string QueueId int @@ -294,27 +309,61 @@ func (request *GetRouteInfoRequestHeader) Encode() map[string]string { return maps } -type GetConsumerRunningInfoHeader struct { - consumerGroup string - clientID string +type GetConsumerRunningInfoRequestHeader struct { + ConsumerGroup string + ClientID string } -func (request *GetConsumerRunningInfoHeader) Encode() map[string]string { +func (request *GetConsumerRunningInfoRequestHeader) Encode() map[string]string { maps := make(map[string]string) - maps["consumerGroup"] = request.consumerGroup - maps["clientId"] = request.clientID + maps["consumerGroup"] = request.ConsumerGroup + maps["clientId"] = request.ClientID return maps } -func (request *GetConsumerRunningInfoHeader) Decode(properties map[string]string) { +func (request *GetConsumerRunningInfoRequestHeader) Decode(properties map[string]string) { if len(properties) == 0 { return } - if v, existed := properties["consumerGroup"]; existed { - request.consumerGroup = v + if v, existed := properties["ConsumerGroup"]; existed { + request.ConsumerGroup = v } if v, existed := properties["clientId"]; existed { - request.clientID = v + request.ClientID = v } } + +type QueryMessageRequestHeader struct { + Topic string + Key string + MaxNum int + BeginTimestamp int64 + EndTimestamp int64 +} + +func (request *QueryMessageRequestHeader) Encode() map[string]string { + maps := make(map[string]string) + maps["topic"] = request.Topic + maps["key"] = request.Key + maps["maxNum"] = fmt.Sprintf("%d", request.MaxNum) + maps["beginTimestamp"] = strconv.FormatInt(request.BeginTimestamp, 10) + maps["endTimestamp"] = fmt.Sprintf("%d", request.EndTimestamp) + + return maps +} + +func (request *QueryMessageRequestHeader) Decode(properties map[string]string) error { + return nil +} + +type ViewMessageRequestHeader struct { + Offset int64 +} + +func (request *ViewMessageRequestHeader) Encode() map[string]string { + maps := make(map[string]string) + maps["offset"] = strconv.FormatInt(request.Offset, 10) + + return maps +} diff --git a/internal/route.go b/internal/route.go index 09b6e53a..c4f90327 100644 --- a/internal/route.go +++ b/internal/route.go @@ -619,3 +619,35 @@ func (b *BrokerData) Equals(bd *BrokerData) bool { return true } + +func (b *BrokerData) GetSlaves() []string { + addrs := make([]string, 0) + for id, addr := range b.BrokerAddresses { + if id != MasterId { + addrs = append(addrs, addr) + } + } + return addrs +} + +func (b *BrokerData) MasterAddr() string { + return b.BrokerAddresses[MasterId] +} + +func (b *BrokerData) brokers() []string { + ls := make([]string, len(b.BrokerAddresses)) + for _, v := range b.BrokerAddresses { + ls = append(ls, v) + } + return ls +} + +func (b *BrokerData) SelectBrokerAddr() string { + addr := b.BrokerAddresses[MasterId] + if len(addr) == 0 { + addrs := b.brokers() + i := rand.Intn(len(addrs)) + addr = addrs[i] + } + return addr +} diff --git a/primitive/message.go b/primitive/message.go index 6a84477b..69802e0f 100644 --- a/primitive/message.go +++ b/primitive/message.go @@ -237,6 +237,7 @@ func (m *Message) Marshal() []byte { type MessageExt struct { Message MsgId string + QueueId int32 OffsetMsgId string StoreSize int32 QueueOffset int64 @@ -295,7 +296,9 @@ func DecodeMessage(data []byte) []*MessageExt { // 4. queueID var qId int32 binary.Read(buf, binary.BigEndian, &qId) + // TODO: xujianhai666 wrong usage of Queue, which is only valid for msg produce msg.Queue.QueueId = int(qId) + msg.QueueId = qId count += 4 // 5. Flag @@ -393,15 +396,6 @@ func (mq *MessageQueue) String() string { return fmt.Sprintf("MessageQueue [topic=%s, brokerName=%s, queueId=%d]", mq.Topic, mq.BrokerName, mq.QueueId) } -func (mq *MessageQueue) HashCode() int { - result := 1 - result = 31*result + utils.HashString(mq.BrokerName) - result = 31*result + mq.QueueId - result = 31*result + utils.HashString(mq.Topic) - - return result -} - type AccessChannel int const ( diff --git a/rlog/log.go b/rlog/log.go index 426f6981..b1a10a5d 100644 --- a/rlog/log.go +++ b/rlog/log.go @@ -92,7 +92,7 @@ func (l *defaultLogger) Error(msg string, fields map[string]interface{}) { if msg == "" && len(fields) == 0 { return } - l.logger.WithFields(fields).WithFields(fields).Error(msg) + l.logger.WithFields(fields).Error(msg) } func (l *defaultLogger) Fatal(msg string, fields map[string]interface{}) {