Skip to content

Commit

Permalink
comment on kernel
Browse files Browse the repository at this point in the history
  • Loading branch information
StyleTang committed Aug 25, 2017
1 parent 0d38422 commit 6306f14
Show file tree
Hide file tree
Showing 14 changed files with 70 additions and 52 deletions.
2 changes: 1 addition & 1 deletion rocketmq-go/kernel/consume_message_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (c *consumeMessageConcurrentlyServiceImpl) processConsumeResult(result rock
}
commitOffset := processQueue.RemoveMessage(successMessages)
if commitOffset > 0 && !processQueue.IsDropped() {
c.offsetStore.UpdateOffset(messageQueue, commitOffset, true)
c.offsetStore.updateOffset(messageQueue, commitOffset, true)
}

}
Expand Down
8 changes: 4 additions & 4 deletions rocketmq-go/kernel/mq_client_manage.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type MqClientManager struct {
mqClient RocketMqClient
pullMessageController *PullMessageController
cleanExpireMsgController *cleanExpireMsgController
rebalanceControllr *RebalanceController
rebalanceControllr *rebalanceController
defaultProducerService *DefaultProducerService
}

Expand All @@ -50,7 +50,7 @@ func MqClientManagerInit(clientConfig *rocketmqm.MqClientConfig) (rocketMqManage
rocketMqManager.mqClient = MqClientInit(clientConfig, rocketMqManager.initClientRequestProcessor()) // todo todo todo
rocketMqManager.pullMessageController = NewPullMessageController(rocketMqManager.mqClient, rocketMqManager.clientFactory)
rocketMqManager.cleanExpireMsgController = newCleanExpireMsgController(rocketMqManager.mqClient, rocketMqManager.clientFactory)
rocketMqManager.rebalanceControllr = NewRebalanceController(rocketMqManager.clientFactory)
rocketMqManager.rebalanceControllr = newRebalanceController(rocketMqManager.clientFactory)

return
}
Expand All @@ -60,14 +60,14 @@ func (m *MqClientManager) Start() {
}

func (m *MqClientManager) RegisterProducer(producer *DefaultMQProducer) {
producer.producerService = NewDefaultProducerService(producer.producerGroup, producer.ProducerConfig, m.mqClient)
producer.producerService = newDefaultProducerService(producer.producerGroup, producer.ProducerConfig, m.mqClient)
m.clientFactory.producerTable[producer.producerGroup] = producer
return
}

func (m *MqClientManager) RegisterConsumer(consumer *DefaultMQPushConsumer) {
if m.defaultProducerService == nil {
m.defaultProducerService = NewDefaultProducerService(constant.CLIENT_INNER_PRODUCER_GROUP, rocketmqm.NewProducerConfig(), m.mqClient)
m.defaultProducerService = newDefaultProducerService(constant.CLIENT_INNER_PRODUCER_GROUP, rocketmqm.NewProducerConfig(), m.mqClient)
}
consumer.mqClient = m.mqClient
consumer.offsetStore = RemoteOffsetStoreInit(consumer.consumerGroup, m.mqClient)
Expand Down
4 changes: 2 additions & 2 deletions rocketmq-go/kernel/mq_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ func NewDefaultMQProducer(producerGroup string, producerConfig *rocketmqm.MqProd
}

func (d *DefaultMQProducer) Send(msg rocketmqm.Message) (sendResult *model.SendResult, err error) {
sendResult, err = d.producerService.SendDefaultImpl(msg.(*message.MessageImpl), constant.COMMUNICATIONMODE_SYNC, "", d.ProducerConfig.SendMsgTimeout)
sendResult, err = d.producerService.sendDefaultImpl(msg.(*message.MessageImpl), constant.COMMUNICATIONMODE_SYNC, "", d.ProducerConfig.SendMsgTimeout)
return
}
func (d *DefaultMQProducer) SendWithTimeout(msg rocketmqm.Message, timeout int64) (sendResult *model.SendResult, err error) {
sendResult, err = d.producerService.SendDefaultImpl(msg.(*message.MessageImpl), constant.COMMUNICATIONMODE_SYNC, "", timeout)
sendResult, err = d.producerService.sendDefaultImpl(msg.(*message.MessageImpl), constant.COMMUNICATIONMODE_SYNC, "", timeout)
return
}
11 changes: 9 additions & 2 deletions rocketmq-go/kernel/mq_push_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"time"
)

//DefaultMQPushConsumer no order/cluster
type DefaultMQPushConsumer struct {
consumerGroup string
consumeType string
Expand All @@ -42,6 +43,7 @@ type DefaultMQPushConsumer struct {
ConsumerConfig *rocketmqm.MqConsumerConfig
}

//NewDefaultMQPushConsumer create a DefaultMQPushConsumer instance
func NewDefaultMQPushConsumer(consumerGroup string, consumerConfig *rocketmqm.MqConsumerConfig) (defaultMQPushConsumer *DefaultMQPushConsumer) {
defaultMQPushConsumer = &DefaultMQPushConsumer{
consumerGroup: consumerGroup,
Expand All @@ -54,6 +56,8 @@ func NewDefaultMQPushConsumer(consumerGroup string, consumerConfig *rocketmqm.Mq

return
}

//Subscribe subscribe topic, filter by subExpression
func (d *DefaultMQPushConsumer) Subscribe(topic string, subExpression string) {
d.subscription[topic] = subExpression
if len(subExpression) == 0 || subExpression == "*" {
Expand All @@ -73,6 +77,7 @@ func (d *DefaultMQPushConsumer) Subscribe(topic string, subExpression string) {
}
}

//RegisterMessageListener register message listener to this consumer
func (d *DefaultMQPushConsumer) RegisterMessageListener(messageListener rocketmqm.MessageListener) {
d.consumeMessageService = NewConsumeMessageConcurrentlyServiceImpl(messageListener)
}
Expand All @@ -96,13 +101,14 @@ func (d *DefaultMQPushConsumer) resetOffset(offsetTable map[model.MessageQueue]i
if processQueue == nil || offset < 0 {
continue
}
glog.V(2).Info("now we UpdateOffset", messageQueue, offset)
d.offsetStore.UpdateOffset(&messageQueue, offset, false)
glog.V(2).Info("now we updateOffset", messageQueue, offset)
d.offsetStore.updateOffset(&messageQueue, offset, false)
d.rebalance.removeProcessQueue(&messageQueue)
}
}()
}

//Subscriptions get this consumer's subscription data
func (d *DefaultMQPushConsumer) Subscriptions() []*model.SubscriptionData {
subscriptions := make([]*model.SubscriptionData, 0)
for _, subscription := range d.rebalance.subscriptionInner {
Expand All @@ -111,6 +117,7 @@ func (d *DefaultMQPushConsumer) Subscriptions() []*model.SubscriptionData {
return subscriptions
}

//CleanExpireMsg cleanExpireMsg
func (d *DefaultMQPushConsumer) CleanExpireMsg() {
nowTime := util.CurrentTimeMillisInt64() //will cause nowTime - consumeStartTime <0 ,but no matter
messageQueueList, processQueueList := d.rebalance.getProcessQueueList()
Expand Down
28 changes: 17 additions & 11 deletions rocketmq-go/kernel/offset_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,27 @@ import (
)

const (
//MEMORY_FIRST_THEN_STORE memory first then store
MEMORY_FIRST_THEN_STORE = 0
READ_FROM_MEMORY = 1
READ_FROM_STORE = 2
//READ_FROM_MEMORY READ_FROM_MEMORY
READ_FROM_MEMORY = 1
//READ_FROM_STORE READ_FROM_STORE
READ_FROM_STORE = 2
)

//OffsetStore OffsetStore
type OffsetStore interface {
//update local offsetTable's offset
UpdateOffset(mq *model.MessageQueue, offset int64, increaseOnly bool)
updateOffset(mq *model.MessageQueue, offset int64, increaseOnly bool)
//read offset,from memory or broker
ReadOffset(mq *model.MessageQueue, readType int) int64
readOffset(mq *model.MessageQueue, readType int) int64
//update broker's offset
Persist(mq *model.MessageQueue)
persist(mq *model.MessageQueue)
//remove local offsetTable's offset
RemoveOffset(mq *model.MessageQueue)
removeOffset(mq *model.MessageQueue)
}

//RemoteOffsetStore offset store on remote
type RemoteOffsetStore struct {
groupName string
mqClient RocketMqClient
Expand All @@ -58,13 +64,13 @@ func RemoteOffsetStoreInit(groupName string, mqClient RocketMqClient) OffsetStor
offsetStore.offsetTableLock = new(sync.RWMutex)
return offsetStore
}
func (r *RemoteOffsetStore) RemoveOffset(mq *model.MessageQueue) {
func (r *RemoteOffsetStore) removeOffset(mq *model.MessageQueue) {
defer r.offsetTableLock.Unlock()
r.offsetTableLock.Lock()
delete(r.offsetTable, *mq)
}

func (r *RemoteOffsetStore) Persist(mq *model.MessageQueue) {
func (r *RemoteOffsetStore) persist(mq *model.MessageQueue) {
brokerAddr := r.mqClient.FetchMasterBrokerAddress(mq.BrokerName)
if len(brokerAddr) == 0 {
r.mqClient.TryToFindTopicPublishInfo(mq.Topic)
Expand All @@ -78,7 +84,7 @@ func (r *RemoteOffsetStore) Persist(mq *model.MessageQueue) {
r.mqClient.GetRemotingClient().InvokeOneWay(brokerAddr, requestCommand, 1000*5)
}

func (r *RemoteOffsetStore) ReadOffset(mq *model.MessageQueue, readType int) int64 {
func (r *RemoteOffsetStore) readOffset(mq *model.MessageQueue, readType int) int64 {

switch readType {
case MEMORY_FIRST_THEN_STORE:
Expand All @@ -98,7 +104,7 @@ func (r *RemoteOffsetStore) ReadOffset(mq *model.MessageQueue, readType int) int
return -1
}
glog.V(2).Info("READ_FROM_STORE", offset)
r.UpdateOffset(mq, offset, false)
r.updateOffset(mq, offset, false)
return offset
}

Expand Down Expand Up @@ -151,7 +157,7 @@ func (r RemoteOffsetStore) queryConsumerOffset(addr string, requestHeader *heade
return -1, errors.New("query offset error")
}

func (r *RemoteOffsetStore) UpdateOffset(mq *model.MessageQueue, offset int64, increaseOnly bool) {
func (r *RemoteOffsetStore) updateOffset(mq *model.MessageQueue, offset int64, increaseOnly bool) {
defer r.offsetTableLock.Unlock()
r.offsetTableLock.Lock()
if mq != nil {
Expand Down
14 changes: 8 additions & 6 deletions rocketmq-go/kernel/producer_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,33 +29,35 @@ import (
"github.com/golang/glog"
)

//ProducerService producerService, for send message
type ProducerService interface {
CheckConfig() (err error)
SendDefaultImpl(message *message.MessageImpl, communicationMode string, sendCallback string, timeout int64) (sendResult *model.SendResult, err error)
checkConfig() (err error)
sendDefaultImpl(message *message.MessageImpl, communicationMode string, sendCallback string, timeout int64) (sendResult *model.SendResult, err error)
}

//ProducerService ProducerService's implement
type DefaultProducerService struct {
producerGroup string
producerConfig *rocketmqm.MqProducerConfig
mqClient RocketMqClient
mqFaultStrategy mqFaultStrategy
}

func NewDefaultProducerService(producerGroup string, producerConfig *rocketmqm.MqProducerConfig, mqClient RocketMqClient) (defaultProducerService *DefaultProducerService) {
func newDefaultProducerService(producerGroup string, producerConfig *rocketmqm.MqProducerConfig, mqClient RocketMqClient) (defaultProducerService *DefaultProducerService) {
defaultProducerService = &DefaultProducerService{
mqClient: mqClient,
producerGroup: producerGroup,
producerConfig: producerConfig,
}
defaultProducerService.CheckConfig()
defaultProducerService.checkConfig()
return
}
func (d *DefaultProducerService) CheckConfig() (err error) {
func (d *DefaultProducerService) checkConfig() (err error) {
// todo check if not pass panic
return
}

func (d *DefaultProducerService) SendDefaultImpl(message *message.MessageImpl, communicationMode string, sendCallback string, timeout int64) (sendResult *model.SendResult, err error) {
func (d *DefaultProducerService) sendDefaultImpl(message *message.MessageImpl, communicationMode string, sendCallback string, timeout int64) (sendResult *model.SendResult, err error) {
var (
topicPublishInfo *model.TopicPublishInfo
)
Expand Down
2 changes: 1 addition & 1 deletion rocketmq-go/kernel/producer_service_for_send_back.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (s *sendMessageBackProducerServiceImpl) sendRetryMessageBack(messageExt *me
retryMessage.SetDelayTimeLevel(3 + messageExt.GetReconsumeTimes())
pp, _ := json.Marshal(retryMessage)
glog.Info("look retryMessage ", string(pp), string(messageExt.Body()))
sendResult, err := s.defaultProducerService.SendDefaultImpl(retryMessage, constant.COMMUNICATIONMODE_SYNC, "", s.defaultProducerService.producerConfig.SendMsgTimeout)
sendResult, err := s.defaultProducerService.sendDefaultImpl(retryMessage, constant.COMMUNICATIONMODE_SYNC, "", s.defaultProducerService.producerConfig.SendMsgTimeout)
if err != nil {
glog.Error(err)
return err
Expand Down
12 changes: 6 additions & 6 deletions rocketmq-go/kernel/pull_message_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (p *PullMessageController) pullMessage(pullRequest *model.PullRequest) {
p.pullMessageLater(pullRequest, delayPullTime)
return
}
commitOffsetValue := defaultMQPullConsumer.offsetStore.ReadOffset(pullRequest.MessageQueue, READ_FROM_MEMORY)
commitOffsetValue := defaultMQPullConsumer.offsetStore.readOffset(pullRequest.MessageQueue, READ_FROM_MEMORY)

subscriptionData, ok := defaultMQPullConsumer.rebalance.subscriptionInner[pullRequest.MessageQueue.Topic]
if !ok {
Expand Down Expand Up @@ -122,7 +122,7 @@ func (p *PullMessageController) pullMessage(pullRequest *model.PullRequest) {
msgs = FilterMessageAgainByTags(msgs, defaultMQPullConsumer.subscriptionTag[pullRequest.MessageQueue.Topic])
if len(msgs) == 0 {
if pullRequest.ProcessQueue.GetMsgCount() == 0 {
defaultMQPullConsumer.offsetStore.UpdateOffset(pullRequest.MessageQueue, nextBeginOffset, true)
defaultMQPullConsumer.offsetStore.updateOffset(pullRequest.MessageQueue, nextBeginOffset, true)
}
}
pullRequest.ProcessQueue.PutMessage(msgs)
Expand All @@ -145,7 +145,7 @@ func (p *PullMessageController) pullMessage(pullRequest *model.PullRequest) {
if responseCommand.Code == remoting.PULL_NOT_FOUND || responseCommand.Code == remoting.PULL_RETRY_IMMEDIATELY {
//NO_NEW_MSG //NO_MATCHED_MSG
if pullRequest.ProcessQueue.GetMsgCount() == 0 {
defaultMQPullConsumer.offsetStore.UpdateOffset(pullRequest.MessageQueue, nextBeginOffset, true)
defaultMQPullConsumer.offsetStore.updateOffset(pullRequest.MessageQueue, nextBeginOffset, true)
}
//update offset increase only
//failedPullRequest, _ := json.Marshal(pullRequest)
Expand All @@ -157,7 +157,7 @@ func (p *PullMessageController) pullMessage(pullRequest *model.PullRequest) {
go func() {
executeTaskLater := time.NewTimer(10 * time.Second)
<-executeTaskLater.C
defaultMQPullConsumer.offsetStore.UpdateOffset(pullRequest.MessageQueue, nextBeginOffset, false)
defaultMQPullConsumer.offsetStore.updateOffset(pullRequest.MessageQueue, nextBeginOffset, false)
defaultMQPullConsumer.rebalance.removeProcessQueue(pullRequest.MessageQueue)
}()
} else {
Expand All @@ -179,7 +179,7 @@ func (p *PullMessageController) pullMessage(pullRequest *model.PullRequest) {
//func (p *PullMessageController) updateOffsetIfNeed(msgs []message.MessageExtImpl, pullRequest *model.PullRequest, defaultMQPullConsumer *DefaultMQPushConsumer, nextBeginOffset int64) {
// if len(msgs) == 0 {
// if pullRequest.ProcessQueue.GetMsgCount() == 0 {
// defaultMQPullConsumer.offsetStore.UpdateOffset(pullRequest.MessageQueue, nextBeginOffset, true)
// defaultMQPullConsumer.OffsetStore.updateOffset(pullRequest.MessageQueue, nextBeginOffset, true)
// }
// }
//}
Expand Down Expand Up @@ -318,7 +318,7 @@ func DecodeMessage(data []byte) []message.MessageExtImpl {
// >= 3.5.8 use clientUniqMsgId
msg.SetMsgId(msg.GetMsgUniqueKey())
if len(msg.MsgId()) == 0 {
msg.SetMsgId(util.GeneratorMessageOffsetId(storeHost, storePort, msg.CommitLogOffset))
msg.SetMsgId(message.GeneratorMessageOffsetId(storeHost, storePort, msg.CommitLogOffset))
}
msgs = append(msgs, msg)
}
Expand Down
6 changes: 3 additions & 3 deletions rocketmq-go/kernel/rebalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ func (r *rebalance) getProcessQueueList() (messageQueueList []model.MessageQueue

//removeUnnecessaryMessageQueue you should drop it first
func (r *rebalance) removeProcessQueue(messageQueue *model.MessageQueue) {
r.offsetStore.Persist(messageQueue)
r.offsetStore.RemoveOffset(messageQueue)
r.offsetStore.persist(messageQueue)
r.offsetStore.removeOffset(messageQueue)
r.removeMessageQueueFromMap(*messageQueue)
}
func (r *rebalance) removeMessageQueueFromMap(messageQueue model.MessageQueue) {
Expand Down Expand Up @@ -226,7 +226,7 @@ func (r *rebalance) putTheQueueToProcessQueueTable(topic string, mqSet []model.M
}
func (r *rebalance) computePullFromWhere(mq *model.MessageQueue) int64 {
var result int64 = -1
lastOffset := r.offsetStore.ReadOffset(mq, READ_FROM_STORE)
lastOffset := r.offsetStore.readOffset(mq, READ_FROM_STORE)
switch r.consumerConfig.ConsumeFromWhere {
case rocketmqm.CONSUME_FROM_LAST_OFFSET:
if lastOffset >= 0 {
Expand Down
11 changes: 6 additions & 5 deletions rocketmq-go/kernel/rebalance_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,19 @@ limitations under the License.

package kernel

type RebalanceController struct {
//rebalanceController rebalanceController
type rebalanceController struct {
clientFactory *clientFactory
}

func NewRebalanceController(clientFactory *clientFactory) *RebalanceController {
return &RebalanceController{
func newRebalanceController(clientFactory *clientFactory) *rebalanceController {
return &rebalanceController{
clientFactory: clientFactory,
}
}

func (self *RebalanceController) doRebalance() {
for _, consumer := range self.clientFactory.consumerTable {
func (r *rebalanceController) doRebalance() {
for _, consumer := range r.clientFactory.consumerTable {
consumer.rebalance.doRebalance()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package util
package message

import (
"bytes"
"encoding/binary"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/util"
"os"
"strconv"
"strings"
Expand All @@ -28,7 +29,7 @@ import (
)

var (
counter int16 = 0
counter int16
startTime int64
nextStartTime int64
idPrefix string
Expand All @@ -42,7 +43,7 @@ var (

//2 bytes for counter,
//4 bytes for timediff, //(time.Now().UnixNano() - startTime) / 1000000) divide 1000000 because use time millis
func GeneratorMessageClientId() (uniqMessageId string) {
func generatorMessageClientId() (uniqMessageId string) {
defer lock.Unlock()
lock.Lock()
if len(idPrefix) == 0 {
Expand All @@ -59,6 +60,7 @@ func GeneratorMessageClientId() (uniqMessageId string) {
return
}

//GeneratorMessageOffsetId generator message offsetId
func GeneratorMessageOffsetId(storeHost []byte, port int32, commitOffset int64) (messageOffsetId string) {
var buf = bytes.NewBuffer([]byte{})
binary.Write(buf, binary.BigEndian, storeHost)
Expand All @@ -75,7 +77,7 @@ func generatorMessageClientIdPrefix() (messageClientIdPrefix string) {
pid int16
classloaderId int32 = -1 // golang don't have this
)
ip4Bytes = GetIp4Bytes()
ip4Bytes = util.GetIp4Bytes()
pid = int16(os.Getpid())
var buf = bytes.NewBuffer([]byte{})
binary.Write(buf, binary.BigEndian, ip4Bytes)
Expand Down
Loading

0 comments on commit 6306f14

Please sign in to comment.