diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000..b73ff00 Binary files /dev/null and b/.DS_Store differ diff --git a/admin.go b/admin.go new file mode 100644 index 0000000..22559b4 --- /dev/null +++ b/admin.go @@ -0,0 +1,11 @@ +package rocketmq + +type Admin interface { + createTopic(key string, newTopic string, queueNum int) + createTopic1(key string, newTopic string, queueNum int, topicSysFlag int) + searchOffset(mq MessageQueue, timestamp int64) error + maxOffset(mq MessageQueue) error + minOffset(mq MessageQueue) error + earliestMsgStoreTime(mq MessageQueue) error + queryMessage(topic string, key, string, maxNum int, begin int64, end int64) error +} diff --git a/client_test.go b/client_test.go new file mode 100644 index 0000000..94ed1e7 --- /dev/null +++ b/client_test.go @@ -0,0 +1,107 @@ +package rocketmq + +import ( + "encoding/json" + "io/ioutil" + "log" + "strings" + "testing" + "time" +) + +var ch = make(chan *RemotingCommand) +var client = NewDefaultRemotingClient() + +func TestConnect(t *testing.T) { + log.SetFlags(log.Lshortfile | log.LstdFlags) + + broker := "192.168.1.197:10911" + namesrv := "192.168.1.234:9876" + + data, err := ioutil.ReadFile("request.txt") + if err != nil { + log.Print(err) + } + + lines := strings.Split(string(data), "\n") + + var lastHeader, lastBody []byte + for _, line := range lines { + if strings.HasPrefix(line, "header=") { + if lastHeader != nil { + cmd := new(RemotingCommand) + cmd.Body = lastBody + + err = json.Unmarshal(lastHeader, cmd) + if err != nil { + log.Print(err) + return + } + callback := func(responseFuture *ResponseFuture) { + } + switch cmd.Code { + case 101: + getKvCallback := func(responseFuture *ResponseFuture) { + jsonCmd, _ := json.Marshal(responseFuture.responseCommand) + log.Printf("resp=%s", string(jsonCmd)) + } + err := client.invokeAsync(namesrv, cmd, 5000, getKvCallback) + if err != nil { + log.Print(err) + } + case 105: + getRouteInfoCallback := func(responseFuture *ResponseFuture) { + jsonCmd, _ := json.Marshal(responseFuture.responseCommand) + + log.Printf("resp=%s", string(jsonCmd)) + log.Print(string(responseFuture.responseCommand.Body)) + } + err := client.invokeAsync(namesrv, cmd, 5000, getRouteInfoCallback) + if err != nil { + log.Print(err) + } + case 34: + err := client.invokeAsync(broker, cmd, 5000, callback) + if err != nil { + log.Print(err) + } + case 38: + log.Print("getConsumerListCallback") + getConsumerListCallback := func(responseFuture *ResponseFuture) { + jsonCmd, _ := json.Marshal(responseFuture.responseCommand) + + log.Printf("getConsumerListCallback=%s", string(jsonCmd)) + log.Print(string(responseFuture.responseCommand.Body)) + } + log.Print(cmd) + err := client.invokeAsync(broker, cmd, 5000, getConsumerListCallback) + if err != nil { + log.Print(err) + } + case 11: + pullCallback := func(responseFuture *ResponseFuture) { + //if responseFuture.responseCommand.Code == 0 && len(responseFuture.responseCommand.Body) > 0 { + //msgs := decodeMessage(responseFuture.responseCommand.Body) + //for _, msg := range msgs { + //log.Print(string(msg.Body)) + //} + //} + } + err := client.invokeAsync(broker, cmd, 5000, pullCallback) + if err != nil { + log.Print(err) + } + } + } + } + + if strings.HasPrefix(line, "header=") { + lastHeader = []byte(strings.TrimLeft(line, "header=")) + } + if strings.HasPrefix(line, "body=") { + lastBody = []byte(strings.TrimLeft(line, "body=")) + } + } + + time.Sleep(1000 * time.Second) +} diff --git a/consumer.go b/consumer.go new file mode 100644 index 0000000..7c39794 --- /dev/null +++ b/consumer.go @@ -0,0 +1,222 @@ +package rocketmq + +import "log" +import "net" +import "os" +import "strconv" +import "sync/atomic" + +const ( + BrokerSuspendMaxTimeMillis = 1000 * 15 +) + +type MessageListener func(msgs []*MessageExt) + +type Config struct { + Nameserver string + ClientIp string + InstanceName string +} + +type Consumer interface { + //Admin + Start() error + Shutdown() + RegisterMessageListener(listener MessageListener) + Subscribe(topic string, subExpression string) + UnSubcribe(topic string) + SendMessageBack(msg MessageExt, delayLevel int) error + SendMessageBack1(msg MessageExt, delayLevel int, brokerName string) error + fetchSubscribeMessageQueues(topic string) error +} + +type DefaultConsumer struct { + conf *Config + consumerGroup string + consumeFromWhere string + consumerType string + messageModel string + unitMode bool + + subscription map[string]string + messageListener MessageListener + offsetStore OffsetStore + brokers map[string]net.Conn + + rebalance *Rebalance + remotingClient RemotingClient + mqClient *MqClient +} + +func NewDefaultConsumer(name string, conf *Config) (Consumer, error) { + if conf == nil { + conf = &Config{ + Nameserver: os.Getenv("ROCKETMQ_NAMESVR"), + InstanceName: "DEFAULT", + } + } + + remotingClient := NewDefaultRemotingClient() + mqClient := NewMqClient() + + rebalance := NewRebalance() + rebalance.groupName = name + rebalance.mqClient = mqClient + + offsetStore := new(RemoteOffsetStore) + offsetStore.mqClient = mqClient + offsetStore.groupName = name + + pullMessageService := NewPullMessageService() + + consumer := &DefaultConsumer{ + conf: conf, + consumerGroup: name, + consumeFromWhere: "CONSUME_FROM_LAST_OFFSET", + subscription: make(map[string]string), + offsetStore: offsetStore, + brokers: make(map[string]net.Conn), + rebalance: rebalance, + remotingClient: remotingClient, + mqClient: mqClient, + } + + mqClient.consumerTable[name] = consumer + mqClient.remotingClient = remotingClient + mqClient.conf = conf + mqClient.clientId = conf.ClientIp + "@" + strconv.Itoa(os.Getpid()) + mqClient.pullMessageService = pullMessageService + + rebalance.consumer = consumer + pullMessageService.consumer = consumer + + return consumer, nil +} + +func (self *DefaultConsumer) Start() error { + self.mqClient.start() + return nil +} + +func (self *DefaultConsumer) Shutdown() { +} + +func (self *DefaultConsumer) RegisterMessageListener(messageListener MessageListener) { + self.messageListener = messageListener +} + +func (self *DefaultConsumer) Subscribe(topic string, subExpression string) { + self.subscription[topic] = subExpression + + subData := &SubscriptionData{ + Topic: topic, + SubString: subExpression, + } + self.rebalance.subscriptionInner[topic] = subData +} + +func (self *DefaultConsumer) UnSubcribe(topic string) { + delete(self.subscription, topic) +} + +func (self *DefaultConsumer) SendMessageBack(msg MessageExt, delayLevel int) error { + return nil +} + +func (self *DefaultConsumer) SendMessageBack1(msg MessageExt, delayLevel int, brokerName string) error { + return nil +} + +func (self *DefaultConsumer) fetchSubscribeMessageQueues(topic string) error { + return nil +} + +func (self *DefaultConsumer) pullMessage(pullRequest *PullRequest) { + + requestHeader := new(PullMessageRequestHeader) + requestHeader.ConsumerGroup = pullRequest.consumerGroup + requestHeader.Topic = pullRequest.messageQueue.topic + requestHeader.QueueId = pullRequest.messageQueue.queueId + requestHeader.QueueOffset = pullRequest.nextOffset + + requestHeader.SysFlag = 2 + requestHeader.CommitOffset = 0 + requestHeader.SuspendTimeoutMillis = BrokerSuspendMaxTimeMillis + requestHeader.Subscription = "*" + + subscriptionData, ok := self.rebalance.subscriptionInner[pullRequest.messageQueue.topic] + + if ok { + requestHeader.SubVersion = subscriptionData.SubVersion + } + + currOpaque := atomic.AddInt32(&opaque, 1) + remotingCommand := new(RemotingCommand) + remotingCommand.Code = PULL_MESSAGE + remotingCommand.Opaque = currOpaque + remotingCommand.Flag = 0 + remotingCommand.Language = "JAVA" + remotingCommand.Version = 79 + + remotingCommand.ExtFields = requestHeader + + brokerAddr, _, found := self.mqClient.findBrokerAddressInSubscribe(pullRequest.messageQueue.brokerName, 0, false) + + pullCallback := func(responseFuture *ResponseFuture) { + if responseFuture.responseCommand.Code == 0 && len(responseFuture.responseCommand.Body) > 0 { + var nextBeginOffset int64 + var err error + pullResult, ok := responseFuture.responseCommand.ExtFields.(map[string]interface{}) + if ok { + if nextBeginOffsetInter, ok := pullResult["nextBeginOffset"]; ok { + if nextBeginOffsetStr, ok := nextBeginOffsetInter.(string); ok { + nextBeginOffset, err = strconv.ParseInt(nextBeginOffsetStr, 10, 64) + if err != nil { + log.Print(err) + return + } + + } + + } + + } + + nextPullRequest := &PullRequest{ + consumerGroup: pullRequest.consumerGroup, + nextOffset: nextBeginOffset, + messageQueue: pullRequest.messageQueue, + } + + self.mqClient.pullMessageService.pullRequestQueue <- nextPullRequest + + msgs := decodeMessage(responseFuture.responseCommand.Body) + self.messageListener(msgs) + } + } + + if found { + self.remotingClient.invokeAsync(brokerAddr, remotingCommand, 1000, pullCallback) + } +} + +func (self *DefaultConsumer) updateTopicSubscribeInfo(topic string, info []*MessageQueue) { + if self.rebalance.subscriptionInner != nil { + _, ok := self.rebalance.subscriptionInner[topic] + if ok { + self.rebalance.topicSubscribeInfoTable[topic] = info + } + } +} + +func (self *DefaultConsumer) subscriptions() []*SubscriptionData { + subscriptions := make([]*SubscriptionData, 0) + for _, subscription := range self.rebalance.subscriptionInner { + subscriptions = append(subscriptions, subscription) + } + return subscriptions +} + +func (self *DefaultConsumer) doRebalance() { + self.rebalance.doRebalance() +} diff --git a/example/main.go b/example/main.go new file mode 100644 index 0000000..b1ba789 --- /dev/null +++ b/example/main.go @@ -0,0 +1,30 @@ +package main + +import ( + rocketmq "didapinche.com/go_rocket_mq" + "log" + "runtime" + "time" +) + +func main() { + runtime.GOMAXPROCS(runtime.NumCPU()) + log.SetFlags(log.Lshortfile | log.LstdFlags) + conf := &rocketmq.Config{ + Nameserver: "192.168.1.234:9876", + ClientIp: "192.168.1.23", + InstanceName: "DEFAULT", + } + consumer, err := rocketmq.NewDefaultConsumer("C_TEST", conf) + if err != nil { + log.Panic(err) + } + consumer.Subscribe("test3", "*") + consumer.RegisterMessageListener(func(msgs []*rocketmq.MessageExt) { + for i, msg := range msgs { + log.Print(i, string(msg.Body)) + } + }) + consumer.Start() + time.Sleep(1000 * time.Second) +} diff --git a/message.go b/message.go new file mode 100644 index 0000000..ed2e336 --- /dev/null +++ b/message.go @@ -0,0 +1,89 @@ +package rocketmq + +import ( + "bytes" + "encoding/binary" +) + +type Message struct { + Topic string + Flag int32 + Properties map[string]string + Body []byte +} + +type MessageExt struct { + Message + QueueId int32 + StoreSize int32 + QueueOffset int64 + SysFlag int32 + BornTimestamp int64 + //bornHost + StoreTimestamp int64 + //storeHost + MsgId string + CommitLogOffset int64 + BodyCRC int32 + ReconsumeTimes int32 + PreparedTransactionOffset int64 +} + +func decodeMessage(data []byte) []*MessageExt { + buf := bytes.NewBuffer(data) + var storeSize, magicCode, bodyCRC, queueId, flag, sysFlag, reconsumeTimes, bodyLength, bornPort, storePort int32 + var queueOffset, physicOffset, preparedTransactionOffset, bornTimeStamp, storeTimestamp int64 + var topicLen byte + var topic, body, properties, bornHost, storeHost []byte + var propertiesLength int16 + + msgs := make([]*MessageExt, 0, 32) + for buf.Len() > 0 { + msg := new(MessageExt) + binary.Read(buf, binary.LittleEndian, &storeSize) + binary.Read(buf, binary.BigEndian, &magicCode) + binary.Read(buf, binary.BigEndian, &bodyCRC) + binary.Read(buf, binary.BigEndian, &queueId) + binary.Read(buf, binary.BigEndian, &flag) + binary.Read(buf, binary.BigEndian, &queueOffset) + binary.Read(buf, binary.BigEndian, &physicOffset) + binary.Read(buf, binary.BigEndian, &sysFlag) + binary.Read(buf, binary.BigEndian, &bornTimeStamp) + bornHost = make([]byte, 4) + binary.Read(buf, binary.BigEndian, &bornHost) + binary.Read(buf, binary.BigEndian, &bornPort) + binary.Read(buf, binary.BigEndian, &storeTimestamp) + storeHost = make([]byte, 4) + binary.Read(buf, binary.BigEndian, &storeHost) + binary.Read(buf, binary.BigEndian, &storePort) + binary.Read(buf, binary.BigEndian, &reconsumeTimes) + binary.Read(buf, binary.BigEndian, &preparedTransactionOffset) + binary.Read(buf, binary.BigEndian, &bodyLength) + if bodyLength > 0 { + body = make([]byte, bodyLength) + binary.Read(buf, binary.BigEndian, body) + } + binary.Read(buf, binary.BigEndian, &topicLen) + topic = make([]byte, topicLen) + binary.Read(buf, binary.BigEndian, &topic) + binary.Read(buf, binary.BigEndian, &propertiesLength) + properties = make([]byte, propertiesLength) + binary.Read(buf, binary.BigEndian, &properties) + msg.Topic = string(topic) + msg.QueueId = queueId + msg.SysFlag = sysFlag + msg.QueueOffset = queueOffset + msg.BodyCRC = bodyCRC + msg.StoreSize = storeSize + msg.BornTimestamp = bornTimeStamp + msg.ReconsumeTimes = reconsumeTimes + msg.Flag = flag + //msg.commitLogOffset=physicOffset + msg.StoreTimestamp = storeTimestamp + msg.PreparedTransactionOffset = preparedTransactionOffset + msg.Body = body + msgs = append(msgs, msg) + } + + return msgs +} diff --git a/message_queue.go b/message_queue.go new file mode 100644 index 0000000..3742650 --- /dev/null +++ b/message_queue.go @@ -0,0 +1,48 @@ +package rocketmq + +type MessageQueue struct { + topic string + brokerName string + queueId int32 +} + +func (self *MessageQueue) clone() *MessageQueue { + no := new(MessageQueue) + no.topic = self.topic + no.queueId = self.queueId + no.brokerName = self.brokerName + return no +} + +type MessageQueues []*MessageQueue + +func (self MessageQueues) Less(i, j int) bool { + imq := self[i] + jmq := self[j] + + if imq.topic < jmq.topic { + return true + } else if imq.topic < jmq.topic { + return false + } + + if imq.brokerName < jmq.brokerName { + return true + } else if imq.brokerName < jmq.brokerName { + return false + } + + if imq.queueId < jmq.queueId { + return true + } else { + return false + } +} + +func (self MessageQueues) Swap(i, j int) { + self[i], self[j] = self[j], self[i] +} + +func (self MessageQueues) Len() int { + return len(self) +} diff --git a/mqclient.go b/mqclient.go new file mode 100644 index 0000000..baf96e1 --- /dev/null +++ b/mqclient.go @@ -0,0 +1,399 @@ +package rocketmq + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "log" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" +) + +type GetRouteInfoRequestHeader struct { + topic string +} + +func (self *GetRouteInfoRequestHeader) MarshalJSON() ([]byte, error) { + var buf bytes.Buffer + buf.WriteString("{\"topic\":\"") + buf.WriteString(self.topic) + buf.WriteString("\"}") + return buf.Bytes(), nil +} + +type QueueData struct { + BrokerName string + ReadQueueNums int32 + WriteQueueNums int32 + Perm int32 + TopicSynFlag int32 +} + +type BrokerData struct { + BrokerName string + BrokerAddrs map[string]string +} + +type TopicRouteData struct { + OrderTopicConf string + QueueDatas []*QueueData + BrokerDatas []*BrokerData +} + +type MqClient struct { + clientId string + conf *Config + brokerAddrTable map[string]map[string]string //map[topic]map[bokerId]addrs + consumerTable map[string]*DefaultConsumer + topicRouteTable map[string]*TopicRouteData + remotingClient RemotingClient + pullMessageService *PullMessageService + mutex sync.Mutex +} + +func NewMqClient() *MqClient { + return &MqClient{ + brokerAddrTable: make(map[string]map[string]string), + consumerTable: make(map[string]*DefaultConsumer), + topicRouteTable: make(map[string]*TopicRouteData), + } +} +func (self *MqClient) findBrokerAddressInSubscribe(brokerName string, brokerId int64, onlyThisBroker bool) (brokerAddr string, slave bool, found bool) { + slave = false + found = false + + brokerMap, ok := self.brokerAddrTable[brokerName] + if ok { + brokerAddr, ok = brokerMap[strconv.FormatInt(brokerId, 10)] + slave = (brokerId != 0) + found = ok + + if !found && !onlyThisBroker { + var id string + for id, brokerAddr = range brokerMap { + slave = (id != "0") + found = true + break + } + } + } + + return +} + +func (self *MqClient) findBrokerAddrByTopic(topic string) (addr string, ok bool) { + topicRouteData, ok := self.topicRouteTable[topic] + if !ok { + return "", ok + } + + brokers := topicRouteData.BrokerDatas + if brokers != nil && len(brokers) > 0 { + brokerData := brokers[0] + if ok { + addr, ok = brokerData.BrokerAddrs["0"] + + if ok { + return + } + for _, addr = range brokerData.BrokerAddrs { + return addr, ok + } + } + } + return +} +func (self *MqClient) findConsumerIdList(topic string, groupName string) ([]string, error) { + brokerAddr, ok := self.findBrokerAddrByTopic(topic) + if !ok { + self.updateTopicRouteInfoFromNameServerByTopic(topic) + brokerAddr, ok = self.findBrokerAddrByTopic(topic) + } + + if ok { + return self.getConsumerIdListByGroup(brokerAddr, groupName, 3000) + } + + return nil, errors.New("can't find broker") + +} + +type GetConsumerListByGroupRequestHeader struct { + ConsumerGroup string `json:"consumerGroup"` +} + +type GetConsumerListByGroupResponseBody struct { + ConsumerIdList []string +} + +func (self *MqClient) getConsumerIdListByGroup(addr string, consumerGroup string, timeoutMillis int64) ([]string, error) { + requestHeader := new(GetConsumerListByGroupRequestHeader) + requestHeader.ConsumerGroup = consumerGroup + + currOpaque := atomic.AddInt32(&opaque, 1) + request := &RemotingCommand{ + Code: GET_CONSUMER_LIST_BY_GROUP, + Language: "JAVA", + Version: 79, + Opaque: currOpaque, + Flag: 0, + ExtFields: requestHeader, + } + + response, err := self.remotingClient.invokeSync(addr, request, timeoutMillis) + if err != nil { + log.Print(err) + return nil, err + } + + if response.Code == SUCCESS { + getConsumerListByGroupResponseBody := new(GetConsumerListByGroupResponseBody) + bodyjson := strings.Replace(string(response.Body), "0:", "\"0\":", -1) + bodyjson = strings.Replace(bodyjson, "1:", "\"1\":", -1) + err := json.Unmarshal([]byte(bodyjson), getConsumerListByGroupResponseBody) + if err != nil { + log.Print(err) + return nil, err + } + return getConsumerListByGroupResponseBody.ConsumerIdList, nil + } + + return nil, errors.New(fmt.Sprintf("MQBrokerException:{code:%d,desc:%s}", response.Code, response.remark)) +} + +func (self *MqClient) getTopicRouteInfoFromNameServer(topic string, timeoutMillis int64) (*TopicRouteData, error) { + requestHeader := &GetRouteInfoRequestHeader{ + topic: topic, + } + + remotingCommand := new(RemotingCommand) + remotingCommand.Code = GET_ROUTEINTO_BY_TOPIC + currOpaque := atomic.AddInt32(&opaque, 1) + remotingCommand.Opaque = currOpaque + remotingCommand.Flag = 0 + remotingCommand.Language = "JAVA" + remotingCommand.Version = 79 + + remotingCommand.ExtFields = requestHeader + response, err := self.remotingClient.invokeSync(self.conf.Nameserver, remotingCommand, timeoutMillis) + if err != nil { + return nil, err + } + topicRouteData := new(TopicRouteData) + bodyjson := strings.Replace(string(response.Body), "0:", "\"0\":", -1) + bodyjson = strings.Replace(bodyjson, "1:", "\"1\":", -1) + err = json.Unmarshal([]byte(bodyjson), topicRouteData) + if err != nil { + log.Print(err) + return nil, err + } + + return topicRouteData, nil + +} + +func (self *MqClient) updateTopicRouteInfoFromNameServer() { + for _, consumer := range self.consumerTable { + subscriptions := consumer.subscriptions() + for _, subData := range subscriptions { + self.updateTopicRouteInfoFromNameServerByTopic(subData.Topic) + } + } +} + +func (self *MqClient) updateTopicRouteInfoFromNameServerByTopic(topic string) error { + self.mutex.Lock() + defer self.mutex.Lock() + + topicRouteData, err := self.getTopicRouteInfoFromNameServer(topic, 3000*1000) + if err != nil { + log.Print(err) + return err + } + + for _, bd := range topicRouteData.BrokerDatas { + self.brokerAddrTable[bd.BrokerName] = bd.BrokerAddrs + } + + mqList := make([]*MessageQueue, 0) + for _, queueData := range topicRouteData.QueueDatas { + var i int32 + for i = 0; i < queueData.ReadQueueNums; i++ { + mq := &MessageQueue{ + topic: topic, + brokerName: queueData.BrokerName, + queueId: i, + } + + mqList = append(mqList, mq) + } + } + + for _, consumer := range self.consumerTable { + consumer.updateTopicSubscribeInfo(topic, mqList) + } + + self.topicRouteTable[topic] = topicRouteData + + return nil +} + +type ConsumerData struct { + GroupName string + ConsumerType string + MessageModel string + ConsumeFromWhere string + SubscriptionDataSet []*SubscriptionData + UnitMode bool +} + +type HeartbeatData struct { + ClientId string + ConsumerDataSet []*ConsumerData +} + +func (self *MqClient) prepareHeartbeatData() *HeartbeatData { + heartbeatData := new(HeartbeatData) + heartbeatData.ClientId = self.clientId + heartbeatData.ConsumerDataSet = make([]*ConsumerData, 0) + for group, consumer := range self.consumerTable { + consumerData := new(ConsumerData) + consumerData.GroupName = group + consumerData.ConsumerType = consumer.consumerType + consumerData.ConsumeFromWhere = consumer.consumeFromWhere + consumerData.MessageModel = consumer.messageModel + consumerData.SubscriptionDataSet = consumer.subscriptions() + consumerData.UnitMode = consumer.unitMode + + heartbeatData.ConsumerDataSet = append(heartbeatData.ConsumerDataSet, consumerData) + } + return heartbeatData +} + +func (self *MqClient) sendHeartbeatToAllBrokerWithLock() error { + heartbeatData := self.prepareHeartbeatData() + if len(heartbeatData.ConsumerDataSet) == 0 { + return errors.New("send heartbeat error") + } + + for _, brokerTable := range self.brokerAddrTable { + for _, addr := range brokerTable { + if addr == "" { + continue + } + currOpaque := atomic.AddInt32(&opaque, 1) + remotingCommand := &RemotingCommand{ + Code: HEART_BEAT, + Language: "JAVA", + Version: 79, + Opaque: currOpaque, + Flag: 0, + } + + data, err := json.Marshal(*heartbeatData) + if err != nil { + log.Print(err) + return err + } + remotingCommand.Body = data + response, err := self.remotingClient.invokeSync(addr, remotingCommand, 3000) + if err != nil { + log.Print(err) + return err + } else { + + if response.Code != 0 { + return errors.New("send heartbeat error") + } else { + return nil + } + } + } + } + return nil +} + +func (self *MqClient) startScheduledTask() { + updateTopicRouteTimer := time.NewTimer(5 * time.Second) + go func() { + for { + <-updateTopicRouteTimer.C + self.updateTopicRouteInfoFromNameServer() + updateTopicRouteTimer.Reset(10 * time.Second) + } + }() + + heartbeatTimer := time.NewTimer(10 * time.Second) + go func() { + for { + <-heartbeatTimer.C + self.sendHeartbeatToAllBrokerWithLock() + heartbeatTimer.Reset(10 * time.Second) + } + }() + + rebalanceTimer := time.NewTimer(15 * time.Second) + go func() { + for { + <-rebalanceTimer.C + self.doRebalance() + rebalanceTimer.Reset(10 * time.Second) + } + }() +} + +func (self *MqClient) doRebalance() { + for _, consumer := range self.consumerTable { + consumer.doRebalance() + } +} + +func (self *MqClient) start() { + self.startScheduledTask() + go self.pullMessageService.start() +} + +type QueryConsumerOffsetRequestHeader struct { + ConsumerGroup string `json:"consumerGroup"` + Topic string `json:"topic"` + QueueId int32 `json:"queueId"` +} + +func (self *MqClient) queryConsumerOffset(addr string, requestHeader *QueryConsumerOffsetRequestHeader, timeoutMillis int64) (int64, error) { + currOpaque := atomic.AddInt32(&opaque, 1) + remotingCommand := &RemotingCommand{ + Code: QUERY_CONSUMER_OFFSET, + Language: "JAVA", + Version: 79, + Opaque: currOpaque, + Flag: 0, + } + + remotingCommand.ExtFields = requestHeader + reponse, err := self.remotingClient.invokeSync(addr, remotingCommand, timeoutMillis) + + if err != nil { + log.Print(err) + return 0, err + } + + if extFields, ok := (reponse.ExtFields).(map[string]interface{}); ok { + if offsetInter, ok := extFields["offset"]; ok { + if offsetStr, ok := offsetInter.(string); ok { + offset, err := strconv.ParseInt(offsetStr, 10, 64) + if err != nil { + log.Print(err) + return 0, err + } + return offset, nil + + } + } + } + + return 0, errors.New("query offset error") +} diff --git a/pull_message.go b/pull_message.go new file mode 100644 index 0000000..ac3b678 --- /dev/null +++ b/pull_message.go @@ -0,0 +1,38 @@ +package rocketmq + +type PullRequest struct { + consumerGroup string + messageQueue *MessageQueue + nextOffset int64 +} + +type PullMessageRequestHeader struct { + ConsumerGroup string `json:"consumerGroup"` + Topic string `json:"topic"` + QueueId int32 `json:"queueId"` + QueueOffset int64 `json:"queueOffset"` + MaxMsgNums int32 `json:"maxMsgNums"` + SysFlag int32 `json:"sysFlag"` + CommitOffset int64 `json:"commitOffset"` + SuspendTimeoutMillis int64 `json:"suspendTimeoutMillis"` + Subscription string `json:"subscription"` + SubVersion int64 `json:"subVersion"` +} + +type PullMessageService struct { + pullRequestQueue chan *PullRequest + consumer *DefaultConsumer +} + +func NewPullMessageService() *PullMessageService { + return &PullMessageService{ + pullRequestQueue: make(chan *PullRequest, 1024), + } +} + +func (self *PullMessageService) start() { + for { + pullRequest := <-self.pullRequestQueue + self.consumer.pullMessage(pullRequest) + } +} diff --git a/rebalance.go b/rebalance.go new file mode 100644 index 0000000..2fb580c --- /dev/null +++ b/rebalance.go @@ -0,0 +1,166 @@ +package rocketmq + +import ( + "errors" + "log" + "sort" +) + +type SubscriptionData struct { + Topic string + SubString string + ClassFilterMode bool + TagsSet []string + CodeSet []string + SubVersion int64 +} +type Rebalance struct { + groupName string + messageModel string + topicSubscribeInfoTable map[string][]*MessageQueue + subscriptionInner map[string]*SubscriptionData + mqClient *MqClient + allocateMessageQueueStrategy AllocateMessageQueueStrategy + consumer *DefaultConsumer + processQueueTable map[MessageQueue]int32 +} + +func NewRebalance() *Rebalance { + return &Rebalance{ + topicSubscribeInfoTable: make(map[string][]*MessageQueue), + subscriptionInner: make(map[string]*SubscriptionData), + allocateMessageQueueStrategy: new(AllocateMessageQueueAveragely), + messageModel: "CLUSTERING", + processQueueTable: make(map[MessageQueue]int32), + } +} + +func (self *Rebalance) doRebalance() { + for topic, _ := range self.subscriptionInner { + self.rebalanceByTopic(topic) + } +} + +type ConsumerIdSorter []string + +func (self ConsumerIdSorter) Len() int { return len(self) } +func (self ConsumerIdSorter) Swap(i, j int) { self[i], self[j] = self[j], self[i] } +func (self ConsumerIdSorter) Less(i, j int) bool { + if self[i] < self[j] { + return true + } + return false +} + +type AllocateMessageQueueStrategy interface { + allocate(consumerGroup string, currentCID string, mqAll []*MessageQueue, cidAll []string) ([]*MessageQueue, error) +} +type AllocateMessageQueueAveragely struct{} + +func (self *AllocateMessageQueueAveragely) allocate(consumerGroup string, currentCID string, mqAll []*MessageQueue, cidAll []string) ([]*MessageQueue, error) { + if currentCID == "" { + return nil, errors.New("currentCID is empty") + } + + if mqAll == nil || len(mqAll) == 0 { + return nil, errors.New("mqAll is nil or mqAll empty") + } + + if cidAll == nil || len(cidAll) == 0 { + return nil, errors.New("cidAll is nil or cidAll empty") + } + + result := make([]*MessageQueue, 0) + for i, cid := range cidAll { + if cid == currentCID { + mqLen := len(mqAll) + cidLen := len(cidAll) + mod := mqLen % cidLen + var averageSize int + if mqLen < cidLen { + averageSize = 1 + } else { + if mod > 0 && i < mod { + averageSize = mqLen/cidLen + 1 + } else { + averageSize = mqLen / cidLen + } + } + + var startIndex int + if mod > 0 && i < mod { + startIndex = i * averageSize + } else { + startIndex = i*averageSize + mod + } + + var min int + if averageSize > mqLen-startIndex { + min = mqLen - startIndex + } else { + min = averageSize + } + + for j := 0; j < min; j++ { + result = append(result, mqAll[(startIndex+j)%mqLen]) + } + return result, nil + + } + } + + return nil, errors.New("cant't find currentCID") +} + +func (self *Rebalance) rebalanceByTopic(topic string) error { + cidAll, err := self.mqClient.findConsumerIdList(topic, self.groupName) + if err != nil { + log.Print(err) + return err + } + + mqs, ok := self.topicSubscribeInfoTable[topic] + if ok && len(mqs) > 0 && len(cidAll) > 0 { + var messageQueues MessageQueues = mqs + var consumerIdSorter ConsumerIdSorter = cidAll + + sort.Sort(messageQueues) + sort.Sort(consumerIdSorter) + } + + allocateResult, err := self.allocateMessageQueueStrategy.allocate(self.groupName, self.mqClient.clientId, mqs, cidAll) + + if err != nil { + log.Print(err) + return err + } + + self.updateProcessQueueTableInRebalance(topic, allocateResult) + return nil +} + +func (self *Rebalance) updateProcessQueueTableInRebalance(topic string, mqSet []*MessageQueue) { + for _, mq := range mqSet { + _, ok := self.processQueueTable[*mq] + if !ok { + pullRequest := new(PullRequest) + pullRequest.consumerGroup = self.groupName + pullRequest.messageQueue = mq + pullRequest.nextOffset = 0 //self.computePullFromWhere(mq) + self.mqClient.pullMessageService.pullRequestQueue <- pullRequest + self.processQueueTable[*mq] = 1 + } + } + +} + +func (self *Rebalance) computePullFromWhere(mq *MessageQueue) int64 { + var result int64 = -1 + lastOffset := self.consumer.offsetStore.readOffset(mq, "READ_FROM_STORE") + if lastOffset >= 0 { + result = lastOffset + } else { + result = 0 + } + return result +} diff --git a/remote_cmd.go b/remote_cmd.go new file mode 100644 index 0000000..1962960 --- /dev/null +++ b/remote_cmd.go @@ -0,0 +1,98 @@ +package rocketmq + +import ( + "bytes" + "encoding/binary" + "encoding/json" + "log" + "sync" +) + +const ( + RPC_TYPE int = 0 + RPC_ONEWAYint = 1 +) + +var opaque int32 +var decodeLock sync.Mutex + +var ( + remotingVersionKey string = "rocketmq.remoting.version" + ConfigVersion int = -1 + requestId int32 = 0 +) + +type RemotingCommand struct { + //header + Code int `json:"code"` + Language string `json:"language"` + Version int `json:"version"` + Opaque int32 `json:"opaque"` + Flag int `json:"flag"` + remark string `json:"remark"` + ExtFields interface{} `json:"extFields"` + //body + Body []byte `json:"body,omitempty"` +} + +func (self *RemotingCommand) encodeHeader() []byte { + length := 4 + headerData := self.buildHeader() + length += len(headerData) + + if self.Body != nil { + length += len(self.Body) + } + + buf := bytes.NewBuffer([]byte{}) + binary.Write(buf, binary.BigEndian, length) + binary.Write(buf, binary.BigEndian, len(self.Body)) + buf.Write(headerData) + + return buf.Bytes() +} + +func (self *RemotingCommand) buildHeader() []byte { + buf, err := json.Marshal(self) + if err != nil { + return nil + } + return buf +} + +func (self *RemotingCommand) encode() []byte { + length := 4 + + headerData := self.buildHeader() + length += len(headerData) + + if self.Body != nil { + length += len(self.Body) + } + + buf := bytes.NewBuffer([]byte{}) + binary.Write(buf, binary.LittleEndian, length) + binary.Write(buf, binary.LittleEndian, len(self.Body)) + buf.Write(headerData) + + if self.Body != nil { + buf.Write(self.Body) + } + + return buf.Bytes() +} + +func decodeRemoteCommand(header, body []byte) *RemotingCommand { + decodeLock.Lock() + defer decodeLock.Unlock() + + cmd := &RemotingCommand{} + cmd.ExtFields = make(map[string]string) + err := json.Unmarshal(header, cmd) + if err != nil { + log.Print(err) + return nil + } + cmd.Body = body + return cmd +} diff --git a/remoting_client.go b/remoting_client.go new file mode 100644 index 0000000..a2dede4 --- /dev/null +++ b/remoting_client.go @@ -0,0 +1,267 @@ +package rocketmq + +import ( + "bytes" + "encoding/binary" + "encoding/json" + "log" + "net" + "sync" + "time" +) + +type InvokeCallback func(responseFuture *ResponseFuture) + +type ResponseFuture struct { + responseCommand *RemotingCommand + sendRequestOK bool + err error + opaque int32 + timeoutMillis int64 + invokeCallback InvokeCallback + beginTimestamp int64 + done chan bool +} + +type RemotingClient interface { + connect(addr string) (net.Conn, error) + invokeAsync(addr string, request *RemotingCommand, timeoutMillis int64, invokeCallback InvokeCallback) error + invokeSync(addr string, request *RemotingCommand, timeoutMillis int64) (*RemotingCommand, error) +} + +type DefalutRemotingClient struct { + mutex sync.Mutex + connTables map[string]net.Conn + responseTable map[int32]*ResponseFuture + namesrvAddrList []string + namesrvAddrChoosed string +} + +func NewDefaultRemotingClient() RemotingClient { + return &DefalutRemotingClient{ + connTables: make(map[string]net.Conn), + responseTable: make(map[int32]*ResponseFuture), + } +} + +func (self *DefalutRemotingClient) connect(addr string) (conn net.Conn, err error) { + self.mutex.Lock() + defer self.mutex.Unlock() + if addr == "" { + addr = self.namesrvAddrChoosed + } + + conn, ok := self.connTables[addr] + if !ok { + log.Print(addr) + conn, err = net.Dial("tcp", addr) + if err != nil { + log.Print(err) + return nil, err + } + + self.connTables[addr] = conn + go self.handlerConn(conn) + } + + return conn, nil +} + +func (self *DefalutRemotingClient) invokeSync(addr string, request *RemotingCommand, timeoutMillis int64) (*RemotingCommand, error) { + + conn, err := self.connect(addr) + + response := &ResponseFuture{ + sendRequestOK: false, + opaque: request.Opaque, + timeoutMillis: timeoutMillis, + beginTimestamp: time.Now().Unix(), + done: make(chan bool), + } + + header := request.encodeHeader() + body := request.Body + + self.mutex.Lock() + self.responseTable[request.Opaque] = response + self.mutex.Unlock() + err = self.sendRequest(header, body, conn) + if err != nil { + log.Print(err) + return nil, err + } + <-response.done + return response.responseCommand, nil +} + +func (self *DefalutRemotingClient) invokeAsync(addr string, request *RemotingCommand, timeoutMillis int64, invokeCallback InvokeCallback) error { + conn, ok := self.connTables[addr] + var err error + if !ok { + conn, err = self.connect(addr) + if err != nil { + log.Print(err) + return err + } + } + + response := &ResponseFuture{ + sendRequestOK: false, + opaque: request.Opaque, + timeoutMillis: timeoutMillis, + beginTimestamp: time.Now().Unix(), + invokeCallback: invokeCallback, + } + + self.mutex.Lock() + self.responseTable[request.Opaque] = response + self.mutex.Unlock() + + header := request.encodeHeader() + body := request.Body + err = self.sendRequest(header, body, conn) + if err != nil { + log.Print(err) + return err + } + return nil +} + +func (self *DefalutRemotingClient) handlerConn(conn net.Conn) { + b := make([]byte, 1024) + var length, headerLength, bodyLength int32 + var buf = bytes.NewBuffer([]byte{}) + var header, body []byte + var flag int = 0 + for { + + n, err := conn.Read(b) + if err != nil { + log.Print(err, conn.RemoteAddr()) + return + } + + _, err = buf.Write(b[:n]) + if err != nil { + log.Print(err, conn.RemoteAddr()) + return + } + + for { + if flag == 0 { + if buf.Len() >= 4 { + err = binary.Read(buf, binary.BigEndian, &length) + if err != nil { + log.Print(err) + return + } + flag = 1 + } else { + break + } + } + + if flag == 1 { + if buf.Len() >= 4 { + err = binary.Read(buf, binary.BigEndian, &headerLength) + if err != nil { + log.Print(err) + return + } + flag = 2 + } else { + break + } + + } + + if flag == 2 { + if (buf.Len() > 0) && (buf.Len() >= int(headerLength)) { + header = make([]byte, headerLength) + _, err = buf.Read(header) + if err != nil { + log.Print(err) + return + } + flag = 3 + } else { + break + } + } + + if flag == 3 { + bodyLength = length - 4 - headerLength + if bodyLength == 0 { + flag = 0 + } else { + + if buf.Len() >= int(bodyLength) { + body = make([]byte, int(bodyLength)) + _, err = buf.Read(body) + if err != nil { + log.Print(err) + return + } + flag = 0 + } else { + break + } + } + } + + if flag == 0 { + cmd := decodeRemoteCommand(header, body) + response, ok := self.responseTable[cmd.Opaque] + self.mutex.Lock() + delete(self.responseTable, cmd.Opaque) + self.mutex.Unlock() + + if ok { + response.responseCommand = cmd + if response.invokeCallback != nil { + response.invokeCallback(response) + } + + if response.done != nil { + response.done <- true + } + } else { + jsonCmd, err := json.Marshal(cmd) + if err != nil { + log.Print(err) + } + log.Print(string(jsonCmd)) + } + } + } + + } +} + +func (self *DefalutRemotingClient) sendRequest(header, body []byte, conn net.Conn) error { + self.mutex.Lock() + defer self.mutex.Unlock() + + buf := bytes.NewBuffer([]byte{}) + binary.Write(buf, binary.BigEndian, int32(len(header)+len(body)+4)) + binary.Write(buf, binary.BigEndian, int32(len(header))) + _, err := conn.Write(buf.Bytes()) + + if err != nil { + return err + } + + _, err = conn.Write(header) + if err != nil { + return err + } + + if body != nil && len(body) > 0 { + _, err = conn.Write(body) + if err != nil { + return err + } + } + + return nil +} diff --git a/request.txt b/request.txt new file mode 100644 index 0000000..2a43176 --- /dev/null +++ b/request.txt @@ -0,0 +1,18 @@ +header={"code":101,"extFields":{"key":"192.168.1.23","namespace":"PROJECT_CONFIG"},"flag":0,"language":"JAVA","opaque":1000,"version":79} +header={"code":105,"extFields":{"topic":"test2"},"flag":0,"language":"JAVA","opaque":1,"version":79} +header={"code":105,"extFields":{"topic":"test2"},"flag":0,"language":"JAVA","opaque":4,"version":79} +body={"clientID":"192.168.1.23@25528","consumerDataSet":[{"consumeFromWhere":"CONSUME_FROM_LAST_OFFSET","consumeType":"CONSUME_PASSIVELY","groupName":"CIM","messageModel":"CLUSTERING","subscriptionDataSet":[{"classFilterMode":false,"codeSet":[],"subString":"*","subVersion":1448610566507,"tagsSet":[],"topic":"%RETRY%CIM"}],"unitMode":false}],"producerDataSet":[{"groupName":"CLIENT_INNER_PRODUCER"},{"groupName":"PIM"}]} +header={"code":14,"extFields":{"topic":"test2","queueId":"0","consumerGroup":"CIM"},"flag":0,"language":"JAVA","opaque":17,"version":79} +header={"code":38,"extFields":{"consumerGroup":"CIM"},"flag":0,"language":"JAVA","opaque":24,"version":79} +header={"code":11,"extFields":{"topic":"test2","suspendTimeoutMillis":"15000","subVersion":"1448610566507","queueId":"0","consumerGroup":"CIM","maxMsgNums":"32","sysFlag":"2","commitOffset":"0","queueOffset":"0"},"flag":0,"language":"JAVA","opaque":2828,"version":79} +header={"code":38,"extFields":{"consumerGroup":"CIM"},"flag":0,"language":"JAVA","opaque":30,"version":79} +header={"code":105,"extFields":{"topic":"TBW102"},"flag":0,"language":"JAVA","opaque":36,"version":79} +header={"code":38,"extFields":{"consumerGroup":"CIM"},"flag":0,"language":"JAVA","opaque":43,"version":79} +header={"code":34,"flag":0,"language":"JAVA","opaque":16,"version":79} +body={"clientID":"192.168.1.23@25528","consumerDataSet":[{"consumeFromWhere":"CONSUME_FROM_FIRST_OFFSET","consumeType":"CONSUME_PASSIVELY","groupName":"CIM","messageModel":"CLUSTERING","subscriptionDataSet":[{"classFilterMode":false,"codeSet":[],"subString":"*","subVersion":1448867704884,"tagsSet":[],"topic":"test2"}],"unitMode":false}],"producerDataSet":[{"groupName":"PIM"},{"groupName":"CLIENT_INNER_PRODUCER"}]} +header={"code":11,"extFields":{"topic":"test2","suspendTimeoutMillis":"15000","subVersion":"1448610566507","queueId":"0","consumerGroup":"CIM","maxMsgNums":"32","sysFlag":"2","commitOffset":"0","queueOffset":"0"},"flag":0,"language":"JAVA","opaque":45,"version":79} +header={"code":11,"extFields":{"topic":"test2","suspendTimeoutMillis":"15000","subVersion":"1448610566507","queueId":"0","consumerGroup":"CIM","maxMsgNums":"32","sysFlag":"2","commitOffset":"3418","queueOffset":"0"},"flag":0,"language":"JAVA","opaque":48,"version":79} +header={"code":11,"extFields":{"topic":"test2","suspendTimeoutMillis":"15000","subVersion":"1448610566507","queueId":"0","consumerGroup":"CIM","maxMsgNums":"32","sysFlag":"2","commitOffset":"0","queueOffset":"0"},"flag":0,"language":"JAVA","opaque":49,"version":79} +header={"code":11,"extFields":{"topic":"test2","suspendTimeoutMillis":"15000","subVersion":"1448871972395","queueId":"2","consumerGroup":"CIM","maxMsgNums":"32","sysFlag":"3","commitOffset":"3411","queueOffset":"3411"},"flag":0,"language":"JAVA","opaque":422,"version":79} +header={"code":11,"extFields":{"topic":"test2","suspendTimeoutMillis":"15000","subVersion":"1448871972395","queueId":"2","consumerGroup":"CIM","maxMsgNums":"32","sysFlag":"3","commitOffset":"3411","queueOffset":"3411"},"flag":0,"language":"JAVA","opaque":422,"version":79} +header={"code":11,"extFields":{"topic":"test2","suspendTimeoutMillis":"15000","subVersion":"1448871972395","queueId":"2","consumerGroup":"CIM","maxMsgNums":"32","sysFlag":"3","commitOffset":"3411","queueOffset":"3411"},"flag":0,"language":"JAVA","opaque":422,"version":79} diff --git a/request_code.go b/request_code.go new file mode 100644 index 0000000..8ce3931 --- /dev/null +++ b/request_code.go @@ -0,0 +1,172 @@ +package rocketmq + +const ( + // Broker 发送消息 + SEND_MESSAGE = 10 + // Broker 订阅消息 + PULL_MESSAGE = 11 + // Broker 查询消息 + QUERY_MESSAGE = 12 + // Broker 查询Broker Offset + QUERY_BROKER_OFFSET = 13 + // Broker 查询Consumer Offset + QUERY_CONSUMER_OFFSET = 14 + // Broker 更新Consumer Offset + UPDATE_CONSUMER_OFFSET = 15 + // Broker 更新或者增加一个Topic + UPDATE_AND_CREATE_TOPIC = 17 + // Broker 获取所有Topic的配置(Slave和Namesrv都会向Master请求此配置) + GET_ALL_TOPIC_CONFIG = 21 + // Broker 获取所有Topic配置(Slave和Namesrv都会向Master请求此配置) + GET_TOPIC_CONFIG_LIST = 22 + // Broker 获取所有Topic名称列表 + GET_TOPIC_NAME_LIST = 23 + // Broker 更新Broker上的配置 + UPDATE_BROKER_CONFIG = 25 + // Broker 获取Broker上的配置 + GET_BROKER_CONFIG = 26 + // Broker 触发Broker删除文件 + TRIGGER_DELETE_FILES = 27 + // Broker 获取Broker运行时信息 + GET_BROKER_RUNTIME_INFO = 28 + // Broker 根据时间查询队列的Offset + SEARCH_OFFSET_BY_TIMESTAMP = 29 + // Broker 查询队列最大Offset + GET_MAX_OFFSET = 30 + // Broker 查询队列最小Offset + GET_MIN_OFFSET = 31 + // Broker 查询队列最早消息对应时间 + GET_EARLIEST_MSG_STORETIME = 32 + // Broker 根据消息ID来查询消息 + VIEW_MESSAGE_BY_ID = 33 + // Broker Client向Client发送心跳,并注册自身 + HEART_BEAT = 34 + // Broker Client注销 + UNREGISTER_CLIENT = 35 + // Broker Consumer将处理不了的消息发回服务器 + CONSUMER_SEND_MSG_BACK = 36 + // Broker Commit或者Rollback事务 + END_TRANSACTION = 37 + // Broker 获取ConsumerId列表通过GroupName + GET_CONSUMER_LIST_BY_GROUP = 38 + // Broker 主动向Producer回查事务状态 + CHECK_TRANSACTION_STATE = 39 + // Broker Broker通知Consumer列表变化 + NOTIFY_CONSUMER_IDS_CHANGED = 40 + // Broker Consumer向Master锁定队列 + LOCK_BATCH_MQ = 41 + // Broker Consumer向Master解锁队列 + UNLOCK_BATCH_MQ = 42 + // Broker 获取所有Consumer Offset + GET_ALL_CONSUMER_OFFSET = 43 + // Broker 获取所有定时进度 + GET_ALL_DELAY_OFFSET = 45 + // Namesrv 向Namesrv追加KV配置 + PUT_KV_CONFIG = 100 + // Namesrv 从Namesrv获取KV配置 + GET_KV_CONFIG = 101 + // Namesrv 从Namesrv获取KV配置 + DELETE_KV_CONFIG = 102 + // Namesrv 注册一个Broker,数据都是持久化的,如果存在则覆盖配置 + REGISTER_BROKER = 103 + // Namesrv 卸载一个Broker,数据都是持久化的 + UNREGISTER_BROKER = 104 + // Namesrv 根据Topic获取Broker Name、队列数(包含读队列与写队列) + GET_ROUTEINTO_BY_TOPIC = 105 + // Namesrv 获取注册到Name Server的所有Broker集群信息 + GET_BROKER_CLUSTER_INFO = 106 + UPDATE_AND_CREATE_SUBSCRIPTIONGROUP = 200 + GET_ALL_SUBSCRIPTIONGROUP_CONFIG = 201 + GET_TOPIC_STATS_INFO = 202 + GET_CONSUMER_CONNECTION_LIST = 203 + GET_PRODUCER_CONNECTION_LIST = 204 + WIPE_WRITE_PERM_OF_BROKER = 205 + + // 从Name Server获取完整Topic列表 + GET_ALL_TOPIC_LIST_FROM_NAMESERVER = 206 + // 从Broker删除订阅组 + DELETE_SUBSCRIPTIONGROUP = 207 + // 从Broker获取消费状态(进度) + GET_CONSUME_STATS = 208 + // Suspend Consumer消费过程 + SUSPEND_CONSUMER = 209 + // Resume Consumer消费过程 + RESUME_CONSUMER = 210 + // 重置Consumer Offset + RESET_CONSUMER_OFFSET_IN_CONSUMER = 211 + // 重置Consumer Offset + RESET_CONSUMER_OFFSET_IN_BROKER = 212 + // 调整Consumer线程池数量 + ADJUST_CONSUMER_THREAD_POOL = 213 + // 查询消息被哪些消费组消费 + WHO_CONSUME_THE_MESSAGE = 214 + + // 从Broker删除Topic配置 + DELETE_TOPIC_IN_BROKER = 215 + // 从Namesrv删除Topic配置 + DELETE_TOPIC_IN_NAMESRV = 216 + // Namesrv 通过 project 获取所有的 server ip 信息 + GET_KV_CONFIG_BY_VALUE = 217 + // Namesrv 删除指定 project group 下的所有 server ip 信息 + DELETE_KV_CONFIG_BY_VALUE = 218 + // 通过NameSpace获取所有的KV List + GET_KVLIST_BY_NAMESPACE = 219 + + // offset 重置 + RESET_CONSUMER_CLIENT_OFFSET = 220 + // 客户端订阅消息 + GET_CONSUMER_STATUS_FROM_CLIENT = 221 + // 通知 broker 调用 offset 重置处理 + INVOKE_BROKER_TO_RESET_OFFSET = 222 + // 通知 broker 调用客户端订阅消息处理 + INVOKE_BROKER_TO_GET_CONSUMER_STATUS = 223 + + // Broker 查询topic被谁消费 + // 2014-03-21 Add By shijia + QUERY_TOPIC_CONSUME_BY_WHO = 300 + + // 获取指定集群下的所有 topic + // 2014-03-26 + GET_TOPICS_BY_CLUSTER = 224 + + // 向Broker注册Filter Server + // 2014-04-06 Add By shijia + REGISTER_FILTER_SERVER = 301 + // 向Filter Server注册Class + // 2014-04-06 Add By shijia + REGISTER_MESSAGE_FILTER_CLASS = 302 + // 根据 topic 和 group 获取消息的时间跨度 + QUERY_CONSUME_TIME_SPAN = 303 + // 获取所有系统内置 Topic 列表 + GET_SYSTEM_TOPIC_LIST_FROM_NS = 304 + GET_SYSTEM_TOPIC_LIST_FROM_BROKER = 305 + + // 清理失效队列 + CLEAN_EXPIRED_CONSUMEQUEUE = 306 + + // 通过Broker查询Consumer内存数据 + // 2014-07-19 Add By shijia + GET_CONSUMER_RUNNING_INFO = 307 + + // 查找被修正 offset (转发组件) + QUERY_CORRECTION_OFFSET = 308 + + // 通过Broker直接向某个Consumer发送一条消息,并立刻消费,返回结果给broker,再返回给调用方 + // 2014-08-11 Add By shijia + CONSUME_MESSAGE_DIRECTLY = 309 + + // Broker 发送消息,优化网络数据包 + SEND_MESSAGE_V2 = 310 + + // 单元化相关 topic + GET_UNIT_TOPIC_LIST = 311 + // 获取含有单元化订阅组的 Topic 列表 + GET_HAS_UNIT_SUB_TOPIC_LIST = 312 + // 获取含有单元化订阅组的非单元化 Topic 列表 + GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST = 313 + // 克隆某一个组的消费进度到新的组 + CLONE_GROUP_OFFSET = 314 + + // 查看Broker上的各种统计信息 + VIEW_BROKER_STATS_DATA = 315 +) diff --git a/response_code.go b/response_code.go new file mode 100644 index 0000000..fa1a1f4 --- /dev/null +++ b/response_code.go @@ -0,0 +1,67 @@ +package rocketmq + +const ( + // 成功 + SUCCESS = 0 + // 发生了未捕获异常 + SYSTEM_ERROR = 1 + // 由于线程池拥堵,系统繁忙 + SYSTEM_BUSY = 2 + // 请求代码不支持 + REQUEST_CODE_NOT_SUPPORTED = 3 + //事务失败,添加db失败 + TRANSACTION_FAILED = 4 + // Broker 刷盘超时 + FLUSH_DISK_TIMEOUT = 10 + // Broker 同步双写,Slave不可用 + SLAVE_NOT_AVAILABLE = 11 + // Broker 同步双写,等待Slave应答超时 + FLUSH_SLAVE_TIMEOUT = 12 + // Broker 消息非法 + MESSAGE_ILLEGAL = 13 + // Broker, Namesrv 服务不可用,可能是正在关闭或者权限问题 + SERVICE_NOT_AVAILABLE = 14 + // Broker, Namesrv 版本号不支持 + VERSION_NOT_SUPPORTED = 15 + // Broker, Namesrv 无权限执行此操作,可能是发、收、或者其他操作 + NO_PERMISSION = 16 + // Broker, Topic不存在 + TOPIC_NOT_EXIST = 17 + // Broker, Topic已经存在,创建Topic + TOPIC_EXIST_ALREADY = 18 + // Broker 拉消息未找到(请求的Offset等于最大Offset,最大Offset无对应消息) + PULL_NOT_FOUND = 19 + // Broker 可能被过滤,或者误通知等 + PULL_RETRY_IMMEDIATELY = 20 + // Broker 拉消息请求的Offset不合法,太小或太大 + PULL_OFFSET_MOVED = 21 + // Broker 查询消息未找到 + QUERY_NOT_FOUND = 22 + // Broker 订阅关系解析失败 + SUBSCRIPTION_PARSE_FAILED = 23 + // Broker 订阅关系不存在 + SUBSCRIPTION_NOT_EXIST = 24 + // Broker 订阅关系不是最新的 + SUBSCRIPTION_NOT_LATEST = 25 + // Broker 订阅组不存在 + SUBSCRIPTION_GROUP_NOT_EXIST = 26 + // Producer 事务应该被提交 + TRANSACTION_SHOULD_COMMIT = 200 + // Producer 事务应该被回滚 + TRANSACTION_SHOULD_ROLLBACK = 201 + // Producer 事务状态未知 + TRANSACTION_STATE_UNKNOW = 202 + // Producer ProducerGroup错误 + TRANSACTION_STATE_GROUP_WRONG = 203 + // 单元化消息,需要设置 buyerId + NO_BUYER_ID = 204 + + // 单元化消息,非本单元消息 + NOT_IN_CURRENT_UNIT = 205 + + // Consumer不在线 + CONSUMER_NOT_ONLINE = 206 + + // Consumer消费消息超时 + CONSUME_MSG_TIMEOUT = 207 +) diff --git a/store.go b/store.go new file mode 100644 index 0000000..cb184a4 --- /dev/null +++ b/store.go @@ -0,0 +1,55 @@ +package rocketmq + +import ( + "errors" + "log" +) + +type OffsetStore interface { + //load() error + //updateOffset(mq MessageQueue, offset int64, increaseOnly bool) + readOffset(mq *MessageQueue, flag string) int64 + //persistAll(mqs []MessageQueue) + //persist(mq MessageQueue) + //removeOffset(mq MessageQueue) + //cloneOffsetTable(topic string) map[MessageQueue]int64 +} +type RemoteOffsetStore struct { + groupName string + mqClient *MqClient +} + +func (self *RemoteOffsetStore) readOffset(mq *MessageQueue, readType string) int64 { + if readType == "READ_FROM_STORE" { + offset, err := self.fetchConsumeOffsetFromBroker(mq) + if err != nil { + log.Print(err) + return -1 + } + return offset + } + return -1 + +} + +func (self *RemoteOffsetStore) fetchConsumeOffsetFromBroker(mq *MessageQueue) (int64, error) { + brokerAddr, _, found := self.mqClient.findBrokerAddressInSubscribe(mq.brokerName, 0, false) + + if !found { + self.mqClient.updateTopicRouteInfoFromNameServerByTopic(mq.topic) + brokerAddr, _, found = self.mqClient.findBrokerAddressInSubscribe(mq.brokerName, 0, false) + } + + if found { + requestHeader := &QueryConsumerOffsetRequestHeader{} + requestHeader.Topic = mq.topic + requestHeader.QueueId = mq.queueId + requestHeader.ConsumerGroup = self.groupName + return self.mqClient.queryConsumerOffset(brokerAddr, requestHeader, 3000) + } + + return 0, errors.New("fetch consumer offset error") +} + +func (self *RemoteOffsetStore) persist(mqs []MessageQueue) { +}