Skip to content

Commit

Permalink
manage->kernel
Browse files Browse the repository at this point in the history
  • Loading branch information
StyleTang committed Aug 20, 2017
1 parent cf3cd1a commit 4b14070
Show file tree
Hide file tree
Showing 20 changed files with 122 additions and 928 deletions.
2 changes: 1 addition & 1 deletion rocketmq-go/api/model/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type Message interface {
SetBody([]byte)
}

//create a message instance
/*create a message instance*/
func NewMessage() (msg Message) {
msg = message.NewMessageImpl()
return
Expand Down
10 changes: 5 additions & 5 deletions rocketmq-go/api/rocketmq_clent_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package rocketmq

import (
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/api/model"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/manage"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/kernel"
)

type MQClientInstance interface {
Expand All @@ -32,24 +32,24 @@ type MQClientInstance interface {
}

type ClientInstanceImpl struct {
rocketMqManager *manage.MqClientManager
rocketMqManager *kernel.MqClientManager
}

func InitRocketMQClientInstance(nameServerAddress string) (rocketMQClientInstance MQClientInstance) {
mqClientConfig := rocketmqm.NewMqClientConfig(nameServerAddress)
return InitRocketMQClientInstanceWithCustomClientConfig(mqClientConfig)
}
func InitRocketMQClientInstanceWithCustomClientConfig(mqClientConfig *rocketmqm.MqClientConfig) (rocketMQClientInstance MQClientInstance) {
rocketMQClientInstance = &ClientInstanceImpl{rocketMqManager: manage.MqClientManagerInit(mqClientConfig)}
rocketMQClientInstance = &ClientInstanceImpl{rocketMqManager: kernel.MqClientManagerInit(mqClientConfig)}
return
}

func (r *ClientInstanceImpl) RegisterProducer(producer MQProducer) {
r.rocketMqManager.RegisterProducer(producer.(*manage.DefaultMQProducer))
r.rocketMqManager.RegisterProducer(producer.(*kernel.DefaultMQProducer))
}

func (r *ClientInstanceImpl) RegisterConsumer(consumer MQConsumer) {
r.rocketMqManager.RegisterConsumer(consumer.(*manage.DefaultMQPushConsumer))
r.rocketMqManager.RegisterConsumer(consumer.(*kernel.DefaultMQPushConsumer))
}
func (r *ClientInstanceImpl) Start() {
r.rocketMqManager.Start()
Expand Down
4 changes: 2 additions & 2 deletions rocketmq-go/api/rocketmq_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package rocketmq

import (
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/api/model"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/manage"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/kernel"
)

type MQConsumer interface {
Expand All @@ -43,5 +43,5 @@ func NewDefaultMQPushConsumer(producerGroup string) (r MQConsumer) {

// Concurrently(no order) CLUSTERING mq consumer with custom config
func NewDefaultMQPushConsumerWithCustomConfig(producerGroup string, consumerConfig *rocketmqm.MqConsumerConfig) (r MQConsumer) {
return manage.NewDefaultMQPushConsumer(producerGroup, consumerConfig)
return kernel.NewDefaultMQPushConsumer(producerGroup, consumerConfig)
}
4 changes: 2 additions & 2 deletions rocketmq-go/api/rocketmq_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package rocketmq

import (
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/api/model"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/manage"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/kernel"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
)

Expand All @@ -44,5 +44,5 @@ func NewDefaultMQProducer(producerGroup string) (r MQProducer) {

//mq producer with custom config
func NewDefaultMQProducerWithCustomConfig(producerGroup string, producerConfig *rocketmqm.MqProducerConfig) (r MQProducer) {
return manage.NewDefaultMQProducer(producerGroup, producerConfig)
return kernel.NewDefaultMQProducer(producerGroup, producerConfig)
}
28 changes: 14 additions & 14 deletions rocketmq-go/docs/package.puml
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,30 @@ class api.ClientInstanceImpl{
}

namespace api{
MQClientInstance o-- manage.MqClientManager
MQProducer o-- manage.DefaultMQProducer
MQConsumer o-- manage.DefaultMQPushConsumer
MQConsumer o-- manage.DefaultMQPullConsumer
MQClientInstance o-- kernel.MqClientManager
MQProducer o-- kernel.DefaultMQProducer
MQConsumer o-- kernel.DefaultMQPushConsumer
MQConsumer o-- kernel.DefaultMQPullConsumer
}

class manage.MqClientManager{
class kernel.MqClientManager{
kernelState
}
class manage.PullMessageController{
class kernel.PullMessageController{

}
class kernel.AllocateMessageQueueStrategy{
}
class manage.ClientFactory{
class kernel.ClientFactory{
mqConsumerTable
mqProducerTable
}
class manage.DefaultMQPushConsumer{
class kernel.DefaultMQPushConsumer{
}
class manage.DefaultMQPullConsumer{
class kernel.DefaultMQPullConsumer{
to be done
}
class manage.DefaultMQProducer{
class kernel.DefaultMQProducer{
}
class kernel.MqClient{

Expand Down Expand Up @@ -73,7 +73,7 @@ class manage.DefaultMQProducer{
namespace kernel{


manage.PullMessageController *-- manage.ClientFactory:contains
kernel.PullMessageController *-- kernel.ClientFactory:contains



Expand Down Expand Up @@ -105,13 +105,13 @@ namespace remoting {
namespace manage{
MqClientManager o-- PullMessageController
MqClientManager o-- ClientFactory
manage.ClientFactory *-- DefaultMQPushConsumer:contains
manage.ClientFactory *-- DefaultMQPullConsumer:contains
kernel.ClientFactory *-- DefaultMQPushConsumer:contains
kernel.ClientFactory *-- DefaultMQPullConsumer:contains
DefaultMQPushConsumer *-- kernel.PullAPIWrapper : contains
DefaultMQPushConsumer *-- kernel.OffsetStore : contains
DefaultMQPushConsumer *-- kernel.Rebalance : contains
DefaultMQPushConsumer *-- kernel.ConsumeMessageService : contains
manage.ClientFactory *-- DefaultMQProducer:contains
kernel.ClientFactory *-- DefaultMQProducer:contains

DefaultMQProducer *-- kernel.MqClient :contains
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,26 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package manage
package kernel

import (
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/kernel"
"time"
)

type cleanExpireMsgController struct {
mqClient kernel.RocketMqClient
mqClient RocketMqClient
clientFactory *clientFactory
}

func newCleanExpireMsgController(mqClient kernel.RocketMqClient, clientFactory *clientFactory) *cleanExpireMsgController {
func newCleanExpireMsgController(mqClient RocketMqClient, clientFactory *clientFactory) *cleanExpireMsgController {
return &cleanExpireMsgController{
mqClient: mqClient,
clientFactory: clientFactory,
}
}

func (self *cleanExpireMsgController) start() {
for _, consumer := range self.clientFactory.consumerTable {
func (c *cleanExpireMsgController) start() {
for _, consumer := range c.clientFactory.consumerTable {
go func() {
cleanExpireMsgTimer := time.NewTimer(time.Duration(consumer.ConsumerConfig.ConsumeTimeout) * 1000 * 60 * time.Millisecond)
//cleanExpireMsgTimer := time.NewTimer(time.Duration(consumer.ConsumerConfig.ConsumeTimeout) * time.Millisecond)
Expand Down
32 changes: 16 additions & 16 deletions rocketmq-go/kernel/consume_message_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,35 +26,35 @@ import (
"github.com/golang/glog"
)

type ConsumeMessageService interface {
Init(consumerGroup string, mqClient RocketMqClient, offsetStore OffsetStore, defaultProducerService *DefaultProducerService, consumerConfig *rocketmqm.MqConsumerConfig)
SubmitConsumeRequest(msgs []message.MessageExtImpl, processQueue *model.ProcessQueue,
type consumeMessageService interface {
init(consumerGroup string, mqClient RocketMqClient, offsetStore OffsetStore, defaultProducerService *DefaultProducerService, consumerConfig *rocketmqm.MqConsumerConfig)
submitConsumeRequest(msgs []message.MessageExtImpl, processQueue *model.ProcessQueue,
messageQueue *model.MessageQueue, dispathToConsume bool)
SendMessageBack(messageExt *message.MessageExtImpl, delayLayLevel int, brokerName string) (err error)
ConsumeMessageDirectly(messageExt *message.MessageExtImpl, brokerName string) (consumeMessageDirectlyResult model.ConsumeMessageDirectlyResult, err error)
sendMessageBack(messageExt *message.MessageExtImpl, delayLayLevel int, brokerName string) (err error)
consumeMessageDirectly(messageExt *message.MessageExtImpl, brokerName string) (consumeMessageDirectlyResult model.ConsumeMessageDirectlyResult, err error)
}

type ConsumeMessageConcurrentlyServiceImpl struct {
type consumeMessageConcurrentlyServiceImpl struct {
consumerGroup string
messageListener rocketmqm.MessageListener
sendMessageBackProducerService sendMessageBackProducerService //for send retry MessageImpl
offsetStore OffsetStore
consumerConfig *rocketmqm.MqConsumerConfig
}

func NewConsumeMessageConcurrentlyServiceImpl(messageListener rocketmqm.MessageListener) (consumeService ConsumeMessageService) {
consumeService = &ConsumeMessageConcurrentlyServiceImpl{messageListener: messageListener, sendMessageBackProducerService: &SendMessageBackProducerServiceImpl{}}
func NewConsumeMessageConcurrentlyServiceImpl(messageListener rocketmqm.MessageListener) (consumeService consumeMessageService) {
consumeService = &consumeMessageConcurrentlyServiceImpl{messageListener: messageListener, sendMessageBackProducerService: &sendMessageBackProducerServiceImpl{}}
return
}

func (c *ConsumeMessageConcurrentlyServiceImpl) Init(consumerGroup string, mqClient RocketMqClient, offsetStore OffsetStore, defaultProducerService *DefaultProducerService, consumerConfig *rocketmqm.MqConsumerConfig) {
func (c *consumeMessageConcurrentlyServiceImpl) init(consumerGroup string, mqClient RocketMqClient, offsetStore OffsetStore, defaultProducerService *DefaultProducerService, consumerConfig *rocketmqm.MqConsumerConfig) {
c.consumerGroup = consumerGroup
c.offsetStore = offsetStore
c.sendMessageBackProducerService.InitSendMessageBackProducerService(consumerGroup, mqClient, defaultProducerService, consumerConfig)
c.consumerConfig = consumerConfig
}

func (c *ConsumeMessageConcurrentlyServiceImpl) SubmitConsumeRequest(msgs []message.MessageExtImpl, processQueue *model.ProcessQueue, messageQueue *model.MessageQueue, dispathToConsume bool) {
func (c *consumeMessageConcurrentlyServiceImpl) submitConsumeRequest(msgs []message.MessageExtImpl, processQueue *model.ProcessQueue, messageQueue *model.MessageQueue, dispathToConsume bool) {
msgsLen := len(msgs)
for i := 0; i < msgsLen; {
begin := i
Expand All @@ -73,7 +73,7 @@ func (c *ConsumeMessageConcurrentlyServiceImpl) SubmitConsumeRequest(msgs []mess
}
return
}
func (c *ConsumeMessageConcurrentlyServiceImpl) convert2ConsumeType(msgs []message.MessageExtImpl) (ret []rocketmqm.MessageExt) {
func (c *consumeMessageConcurrentlyServiceImpl) convert2ConsumeType(msgs []message.MessageExtImpl) (ret []rocketmqm.MessageExt) {
msgLen := len(msgs)
ret = make([]rocketmqm.MessageExt, msgLen)

Expand All @@ -83,12 +83,12 @@ func (c *ConsumeMessageConcurrentlyServiceImpl) convert2ConsumeType(msgs []messa
return
}

func (c *ConsumeMessageConcurrentlyServiceImpl) SendMessageBack(messageExt *message.MessageExtImpl, delayLayLevel int, brokerName string) (err error) {
func (c *consumeMessageConcurrentlyServiceImpl) sendMessageBack(messageExt *message.MessageExtImpl, delayLayLevel int, brokerName string) (err error) {
err = c.sendMessageBackProducerService.SendMessageBack(messageExt, 0, brokerName)
return
}

func (c *ConsumeMessageConcurrentlyServiceImpl) ConsumeMessageDirectly(messageExt *message.MessageExtImpl, brokerName string) (consumeMessageDirectlyResult model.ConsumeMessageDirectlyResult, err error) {
func (c *consumeMessageConcurrentlyServiceImpl) consumeMessageDirectly(messageExt *message.MessageExtImpl, brokerName string) (consumeMessageDirectlyResult model.ConsumeMessageDirectlyResult, err error) {
start := util.CurrentTimeMillisInt64()
consumeResult := c.messageListener(c.convert2ConsumeType([]message.MessageExtImpl{*messageExt}))
consumeMessageDirectlyResult.AutoCommit = true
Expand All @@ -102,7 +102,7 @@ func (c *ConsumeMessageConcurrentlyServiceImpl) ConsumeMessageDirectly(messageEx
return
}

func (c *ConsumeMessageConcurrentlyServiceImpl) processConsumeResult(result rocketmqm.ConsumeConcurrentlyResult, msgs []message.MessageExtImpl, messageQueue *model.MessageQueue, processQueue *model.ProcessQueue) {
func (c *consumeMessageConcurrentlyServiceImpl) processConsumeResult(result rocketmqm.ConsumeConcurrentlyResult, msgs []message.MessageExtImpl, messageQueue *model.MessageQueue, processQueue *model.ProcessQueue) {
if processQueue.IsDropped() {
glog.Warning("processQueue is dropped without process consume result. ", msgs)
return
Expand All @@ -126,7 +126,7 @@ func (c *ConsumeMessageConcurrentlyServiceImpl) processConsumeResult(result rock
successMessages = msgs[:ackIndex+1]
}
for i := ackIndex + 1; i < len(msgs); i++ {
err := c.SendMessageBack(&msgs[i], 0, messageQueue.BrokerName)
err := c.sendMessageBack(&msgs[i], 0, messageQueue.BrokerName)
if err != nil {
msgs[i].ReconsumeTimes = msgs[i].ReconsumeTimes + 1
failedMessages = append(failedMessages, msgs[i])
Expand All @@ -135,7 +135,7 @@ func (c *ConsumeMessageConcurrentlyServiceImpl) processConsumeResult(result rock
}
}
if len(failedMessages) > 0 {
c.SubmitConsumeRequest(failedMessages, processQueue, messageQueue, true)
c.submitConsumeRequest(failedMessages, processQueue, messageQueue, true)
}
commitOffset := processQueue.RemoveMessage(successMessages)
if commitOffset > 0 && !processQueue.IsDropped() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package manage
package kernel

import (
"encoding/json"
"errors"
"fmt"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/api/model"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/kernel"

"github.com/apache/incubator-rocketmq-externals/rocketmq-go/kernel/header"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
Expand All @@ -42,18 +42,18 @@ type MqClientManager struct {
rocketMqManagerLock sync.Mutex
BootTimestamp int64
clientFactory *clientFactory
mqClient kernel.RocketMqClient
mqClient RocketMqClient
pullMessageController *PullMessageController
cleanExpireMsgController *cleanExpireMsgController
rebalanceControllr *RebalanceController
defaultProducerService *kernel.DefaultProducerService
defaultProducerService *DefaultProducerService
}

func MqClientManagerInit(clientConfig *rocketmqm.MqClientConfig) (rocketMqManager *MqClientManager) {
rocketMqManager = &MqClientManager{}
rocketMqManager.BootTimestamp = time.Now().Unix()
rocketMqManager.clientFactory = ClientFactoryInit()
rocketMqManager.mqClient = kernel.MqClientInit(clientConfig, rocketMqManager.initClientRequestProcessor()) // todo todo todo
rocketMqManager.mqClient = MqClientInit(clientConfig, rocketMqManager.initClientRequestProcessor()) // todo todo todo
rocketMqManager.pullMessageController = NewPullMessageController(rocketMqManager.mqClient, rocketMqManager.clientFactory)
rocketMqManager.cleanExpireMsgController = newCleanExpireMsgController(rocketMqManager.mqClient, rocketMqManager.clientFactory)
rocketMqManager.rebalanceControllr = NewRebalanceController(rocketMqManager.clientFactory)
Expand All @@ -66,23 +66,23 @@ func (m *MqClientManager) Start() {
}

func (m *MqClientManager) RegisterProducer(producer *DefaultMQProducer) {
producer.producerService = kernel.NewDefaultProducerService(producer.producerGroup, producer.ProducerConfig, m.mqClient)
producer.producerService = NewDefaultProducerService(producer.producerGroup, producer.ProducerConfig, m.mqClient)
m.clientFactory.producerTable[producer.producerGroup] = producer
return
}

func (m *MqClientManager) RegisterConsumer(consumer *DefaultMQPushConsumer) {
if m.defaultProducerService == nil {
m.defaultProducerService = kernel.NewDefaultProducerService(constant.CLIENT_INNER_PRODUCER_GROUP, rocketmqm.NewProducerConfig(), m.mqClient)
m.defaultProducerService = NewDefaultProducerService(constant.CLIENT_INNER_PRODUCER_GROUP, rocketmqm.NewProducerConfig(), m.mqClient)
}
consumer.mqClient = m.mqClient
consumer.offsetStore = kernel.RemoteOffsetStoreInit(consumer.consumerGroup, m.mqClient)
consumer.offsetStore = RemoteOffsetStoreInit(consumer.consumerGroup, m.mqClient)
m.clientFactory.consumerTable[consumer.consumerGroup] = consumer
consumer.rebalance = kernel.NewRebalance(consumer.consumerGroup, consumer.subscription, consumer.mqClient, consumer.offsetStore, consumer.ConsumerConfig)
consumer.rebalance = NewRebalance(consumer.consumerGroup, consumer.subscription, consumer.mqClient, consumer.offsetStore, consumer.ConsumerConfig)

fmt.Println(consumer.consumeMessageService)

consumer.consumeMessageService.Init(consumer.consumerGroup, m.mqClient, consumer.offsetStore, m.defaultProducerService, consumer.ConsumerConfig)
consumer.consumeMessageService.init(consumer.consumerGroup, m.mqClient, consumer.offsetStore, m.defaultProducerService, consumer.ConsumerConfig)
return
}
func (m *MqClientManager) initClientRequestProcessor() (clientRequestProcessor remoting.ClientRequestProcessor) {
Expand Down Expand Up @@ -128,7 +128,7 @@ func (m *MqClientManager) initClientRequestProcessor() (clientRequestProcessor r
}

consumerRunningInfo.Properties["PROP_NAMESERVER_ADDR"] = strings.Join(defaultMQPushConsumer.mqClient.GetRemotingClient().GetNamesrvAddrList(), ";")
consumerRunningInfo.MqTable = defaultMQPushConsumer.rebalance.GetMqTableInfo()
consumerRunningInfo.MqTable = defaultMQPushConsumer.rebalance.getMqTableInfo()

glog.V(2).Info("op=look consumerRunningInfo", consumerRunningInfo)
jsonByte, err := consumerRunningInfo.Encode()
Expand All @@ -149,7 +149,7 @@ func (m *MqClientManager) initClientRequestProcessor() (clientRequestProcessor r
messageExt := &DecodeMessage(cmd.Body)[0]
glog.V(2).Info("op=look", messageExt)
defaultMQPushConsumer := m.clientFactory.consumerTable[consumeMessageDirectlyResultRequestHeader.ConsumerGroup]
consumeResult, err := defaultMQPushConsumer.consumeMessageService.ConsumeMessageDirectly(messageExt, consumeMessageDirectlyResultRequestHeader.BrokerName)
consumeResult, err := defaultMQPushConsumer.consumeMessageService.consumeMessageDirectly(messageExt, consumeMessageDirectlyResultRequestHeader.BrokerName)
if err != nil {
return
}
Expand Down
2 changes: 1 addition & 1 deletion rocketmq-go/kernel/mq_fault_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
)

type MQFaultStrategy struct {
type mqFaultStrategy struct {
}

//if first select : random one
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package manage
package kernel

import (
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/api/model"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/kernel"

"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/message"
Expand All @@ -29,7 +29,7 @@ type DefaultMQProducer struct {
producerGroup string
ProducerConfig *rocketmqm.MqProducerConfig

producerService kernel.ProducerService
producerService ProducerService
}

func NewDefaultMQProducer(producerGroup string, producerConfig *rocketmqm.MqProducerConfig) (rocketMQProducer *DefaultMQProducer) {
Expand Down
Loading

0 comments on commit 4b14070

Please sign in to comment.