Skip to content

Commit

Permalink
allocate and header
Browse files Browse the repository at this point in the history
  • Loading branch information
StyleTang committed Aug 23, 2017
1 parent a0abe7a commit dd2ecfc
Show file tree
Hide file tree
Showing 23 changed files with 48 additions and 15 deletions.
1 change: 0 additions & 1 deletion rocketmq-go/api/model/mq_client_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.

package rocketmqm

//-------SerializeType-------
//SerializeType default serialize type is JSON_SERIALIZE, but ROCKETMQ_SERIALIZE(need version >= ?) is faster
type SerializeType byte

Expand Down
2 changes: 0 additions & 2 deletions rocketmq-go/api/rocketmq_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
)

type mqProducerType int

//MQProducer rocketmq producer
type MQProducer interface {
//send message,default timeout is 3000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,17 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package service_allocate_message
package allocate

import (
"errors"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
)

//AllocateMessageQueueAveragely AllocateMessageQueueAveragely
type AllocateMessageQueueAveragely struct{}

//Allocate message queue
func (a *AllocateMessageQueueAveragely) Allocate(consumerGroup string, currentCID string, mqAll []*model.MessageQueue, cidAll []string) ([]model.MessageQueue, error) {

if currentCID == "" {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,17 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package service_allocate_message
package allocate

import (
"errors"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
)

//AllocateMessageQueueAveragelyByCircle AllocateMessageQueueAveragelyByCircle
type AllocateMessageQueueAveragelyByCircle struct{}

//Allocate message queue
func (a *AllocateMessageQueueAveragelyByCircle) Allocate(consumerGroup string, currentCID string, mqAll []*model.MessageQueue, cidAll []string) ([]model.MessageQueue, error) {
if currentCID == "" {
return nil, errors.New("currentCID is empty")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package service_allocate_message
package allocate

import "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"

//AllocateMessageQueueByConfig AllocateMessageQueueByConfig
type AllocateMessageQueueByConfig struct {
messageQueueList []model.MessageQueue
}

//Allocate message queue
func (a *AllocateMessageQueueByConfig) Allocate(consumerGroup string, currentCID string, mqAll []*model.MessageQueue, cidAll []string) ([]model.MessageQueue, error) {
return a.messageQueueList, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,18 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package service_allocate_message
package allocate

import (
"errors"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
)

//AllocateMessageQueueByMachineRoom AllocateMessageQueueByMachineRoom
type AllocateMessageQueueByMachineRoom struct {
}

//Allocate message queue
func (a *AllocateMessageQueueByMachineRoom) Allocate(consumerGroup string, currentCID string, mqAll []*model.MessageQueue, cidAll []string) ([]model.MessageQueue, error) {
if currentCID == "" {
return nil, errors.New("currentCID is empty")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package service_allocate_message
package allocate

import "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"

//Allocate message queue
type AllocateMessageQueueStrategy interface {
Allocate(consumerGroup string, currentCID string, mqAll []*model.MessageQueue, cidAll []string) ([]model.MessageQueue, error)
}

//GetAllocateMessageQueueStrategyByConfig get allocate message queue strategy by config
func GetAllocateMessageQueueStrategyByConfig(allocateMessageQueueStrategy string) AllocateMessageQueueStrategy {
return new(AllocateMessageQueueAveragely)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ limitations under the License.

package header

//ConsumeMessageDirectlyResultRequestHeader
//ConsumeMessageDirectlyResultRequestHeader of CustomerHeader
type ConsumeMessageDirectlyResultRequestHeader struct {
//ConsumerGroup
ConsumerGroup string `json:"consumerGroup"`
Expand All @@ -29,6 +29,7 @@ type ConsumeMessageDirectlyResultRequestHeader struct {
BrokerName string `json:"brokerName"`
}

//FromMap convert map[string]interface to struct
func (c *ConsumeMessageDirectlyResultRequestHeader) FromMap(headerMap map[string]interface{}) {
c.ConsumerGroup = headerMap["consumerGroup"].(string)
c.ClientId = headerMap["clientId"].(string)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.

package header

//ConsumerSendMsgBackRequestHeader of CustomerHeader
type ConsumerSendMsgBackRequestHeader struct {
Offset int64
Group string
Expand All @@ -27,6 +28,7 @@ type ConsumerSendMsgBackRequestHeader struct {
MaxReconsumeTimes int32
}

//FromMap convert map[string]interface to struct
func (c *ConsumerSendMsgBackRequestHeader) FromMap(headerMap map[string]interface{}) {
return
}
4 changes: 4 additions & 0 deletions rocketmq-go/kernel/header/get_consumer_list_by_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,22 @@ limitations under the License.

package header

//GetConsumerListByGroupRequestHeader of CustomerHeader
type GetConsumerListByGroupRequestHeader struct {
ConsumerGroup string `json:"consumerGroup"`
}

//FromMap convert map[string]interface to struct
func (g *GetConsumerListByGroupRequestHeader) FromMap(headerMap map[string]interface{}) {
return
}

//GetConsumerListByGroupResponseBody of CustomerHeader
type GetConsumerListByGroupResponseBody struct {
ConsumerIdList []string
}

//FromMap convert map[string]interface to struct
func (g *GetConsumerListByGroupResponseBody) FromMap(headerMap map[string]interface{}) {
return
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ limitations under the License.

package header

//GetConsumerRunningInfoRequestHeader of CustomerHeader
type GetConsumerRunningInfoRequestHeader struct {
ConsumerGroup string `json:"consumerGroup"`
ClientId string `json:"clientId"`
JstackEnable bool `json:"jstackEnable"`
}

//FromMap convert map[string]interface to struct
func (g *GetConsumerRunningInfoRequestHeader) FromMap(headerMap map[string]interface{}) {
g.ConsumerGroup = headerMap["consumerGroup"].(string)
g.ClientId = headerMap["clientId"].(string)
Expand Down
2 changes: 2 additions & 0 deletions rocketmq-go/kernel/header/get_max_offset_request_header.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ limitations under the License.

package header

//GetMaxOffsetRequestHeader of CustomerHeader
type GetMaxOffsetRequestHeader struct {
Topic string `json:"topic"`
QueueId int32 `json:"queueId"`
}

//FromMap convert map[string]interface to struct
func (g *GetMaxOffsetRequestHeader) FromMap(headerMap map[string]interface{}) {
return
}
2 changes: 2 additions & 0 deletions rocketmq-go/kernel/header/get_max_offset_response_header.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ package header

import "github.com/apache/incubator-rocketmq-externals/rocketmq-go/util"

//QueryOffsetResponseHeader of CustomerHeader
type QueryOffsetResponseHeader struct {
Offset int64 `json:"offset"`
}

//FromMap convert map[string]interface to struct
func (q *QueryOffsetResponseHeader) FromMap(headerMap map[string]interface{}) {
q.Offset = util.StrToInt64WithDefaultValue(headerMap["offset"].(string), -1)
return
Expand Down
2 changes: 2 additions & 0 deletions rocketmq-go/kernel/header/get_route_info_request_header.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ limitations under the License.

package header

//GetRouteInfoRequestHeader of CustomerHeader
type GetRouteInfoRequestHeader struct {
Topic string `json:"topic"`
}

//FromMap convert map[string]interface to struct
func (g *GetRouteInfoRequestHeader) FromMap(headerMap map[string]interface{}) {
return
}
2 changes: 2 additions & 0 deletions rocketmq-go/kernel/header/pull_message_request_header.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.

package header

//PullMessageRequestHeader of CustomerHeader
type PullMessageRequestHeader struct {
ConsumerGroup string `json:"consumerGroup"`
Topic string `json:"topic"`
Expand All @@ -30,6 +31,7 @@ type PullMessageRequestHeader struct {
SubVersion int64 `json:"subVersion"`
}

//FromMap convert map[string]interface to struct
func (p *PullMessageRequestHeader) FromMap(headerMap map[string]interface{}) {
return
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ limitations under the License.

package header

//QueryConsumerOffsetRequestHeader of CustomerHeader
type QueryConsumerOffsetRequestHeader struct {
ConsumerGroup string `json:"consumerGroup"`
Topic string `json:"topic"`
QueueId int32 `json:"queueId"`
}

//FromMap convert map[string]interface to struct
func (q *QueryConsumerOffsetRequestHeader) FromMap(headerMap map[string]interface{}) {
return
}
2 changes: 2 additions & 0 deletions rocketmq-go/kernel/header/reset_offset_request_header.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ import (
"strconv"
)

//ResetOffsetRequestHeader of CustomerHeader
type ResetOffsetRequestHeader struct {
Topic string `json:"topic"`
Group string `json:"group"`
Timestamp int64 `json:"timestamp"`
IsForce bool `json:"isForce"`
}

//FromMap convert map[string]interface to struct
func (r *ResetOffsetRequestHeader) FromMap(headerMap map[string]interface{}) {
r.Group = headerMap["group"].(string)
r.Topic = headerMap["topic"].(string)
Expand Down
2 changes: 2 additions & 0 deletions rocketmq-go/kernel/header/search_offset_request_header.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ import (
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/util"
)

//SearchOffsetRequestHeader of CustomerHeader
type SearchOffsetRequestHeader struct {
Topic string `json:"topic"`
QueueId int32 `json:"queueId"`
Timestamp int64 `json:"timestamp"`
}

//FromMap convert map[string]interface to struct
func (s *SearchOffsetRequestHeader) FromMap(headerMap map[string]interface{}) {
s.Topic = headerMap["topic"].(string)
s.Topic = headerMap["queueId"].(string)
Expand Down
2 changes: 1 addition & 1 deletion rocketmq-go/kernel/header/send_message_request_header.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ limitations under the License.

package header

//SendMessageRequestHeader <- CustomerHeader
//SendMessageRequestHeader of CustomerHeader
type SendMessageRequestHeader struct {
ProducerGroup string `json:"producerGroup"`
Topic string `json:"topic"`
Expand Down
2 changes: 2 additions & 0 deletions rocketmq-go/kernel/header/send_message_response_header.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.

package header

//SendMessageResponseHeader of CustomerHeader
type SendMessageResponseHeader struct {
MsgId string
QueueId int32
Expand All @@ -25,6 +26,7 @@ type SendMessageResponseHeader struct {
MsgRegion string
}

//FromMap convert map[string]interface to struct
func (header *SendMessageResponseHeader) FromMap(headerMap map[string]interface{}) {
return
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ package header

import "github.com/apache/incubator-rocketmq-externals/rocketmq-go/util"

//UpdateConsumerOffsetRequestHeader of CustomHeader
//UpdateConsumerOffsetRequestHeader of CustomerHeader
type UpdateConsumerOffsetRequestHeader struct {
ConsumerGroup string `json:"consumerGroup"`
Topic string `json:"topic"`
QueueId int32 `json:"queueId"`
CommitOffset int64 `json:"commitOffset"`
}

//FromMap to UpdateConsumerOffsetRequestHeader
//FromMap convert map[string]interface to struct
func (u *UpdateConsumerOffsetRequestHeader) FromMap(headerMap map[string]interface{}) {
u.ConsumerGroup = headerMap["consumerGroup"].(string)
u.QueueId = util.StrToInt32WithDefaultValue(util.ReadString(headerMap["queueId"]), 0)
Expand Down
6 changes: 3 additions & 3 deletions rocketmq-go/kernel/rebalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"encoding/json"
"errors"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/api/model"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/kernel/allocate_message"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/kernel/allocate"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/kernel/header"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
Expand All @@ -40,7 +40,7 @@ type rebalance struct {
subscriptionInner map[string]*model.SubscriptionData
subscriptionInnerLock sync.RWMutex
mqClient RocketMqClient
allocateMessageQueueStrategy service_allocate_message.AllocateMessageQueueStrategy
allocateMessageQueueStrategy allocate.AllocateMessageQueueStrategy
processQueueTable map[model.MessageQueue]*model.ProcessQueue // both subscribe topic and retry group
processQueueTableLock sync.RWMutex
mutex sync.Mutex
Expand Down Expand Up @@ -123,7 +123,7 @@ func newRebalance(groupName string, subscription map[string]string, mqClient Roc
mqClient: mqClient,
offsetStore: offsetStore,
subscriptionInner: subscriptionInner,
allocateMessageQueueStrategy: service_allocate_message.GetAllocateMessageQueueStrategyByConfig("default"),
allocateMessageQueueStrategy: allocate.GetAllocateMessageQueueStrategyByConfig("default"),
messageModel: "CLUSTERING",
processQueueTable: make(map[model.MessageQueue]*model.ProcessQueue),
consumerConfig: consumerConfig,
Expand Down
1 change: 1 addition & 0 deletions rocketmq-go/remoting/custom_header.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.

package remoting

//CustomerHeader see kernel.header
type CustomerHeader interface {
//convert map[string]interface to struct
FromMap(headerMap map[string]interface{})
Expand Down

0 comments on commit dd2ecfc

Please sign in to comment.