Skip to content

Commit

Permalink
first commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Gogs committed Dec 8, 2015
0 parents commit 09604f1
Show file tree
Hide file tree
Showing 16 changed files with 1,787 additions and 0 deletions.
Binary file added .DS_Store
Binary file not shown.
11 changes: 11 additions & 0 deletions admin.go
@@ -0,0 +1,11 @@
package rocketmq

type Admin interface {
createTopic(key string, newTopic string, queueNum int)
createTopic1(key string, newTopic string, queueNum int, topicSysFlag int)
searchOffset(mq MessageQueue, timestamp int64) error
maxOffset(mq MessageQueue) error
minOffset(mq MessageQueue) error
earliestMsgStoreTime(mq MessageQueue) error
queryMessage(topic string, key, string, maxNum int, begin int64, end int64) error
}
107 changes: 107 additions & 0 deletions client_test.go
@@ -0,0 +1,107 @@
package rocketmq

import (
"encoding/json"
"io/ioutil"
"log"
"strings"
"testing"
"time"
)

var ch = make(chan *RemotingCommand)
var client = NewDefaultRemotingClient()

func TestConnect(t *testing.T) {
log.SetFlags(log.Lshortfile | log.LstdFlags)

broker := "192.168.1.197:10911"
namesrv := "192.168.1.234:9876"

data, err := ioutil.ReadFile("request.txt")
if err != nil {
log.Print(err)
}

lines := strings.Split(string(data), "\n")

var lastHeader, lastBody []byte
for _, line := range lines {
if strings.HasPrefix(line, "header=") {
if lastHeader != nil {
cmd := new(RemotingCommand)
cmd.Body = lastBody

err = json.Unmarshal(lastHeader, cmd)
if err != nil {
log.Print(err)
return
}
callback := func(responseFuture *ResponseFuture) {
}
switch cmd.Code {
case 101:
getKvCallback := func(responseFuture *ResponseFuture) {
jsonCmd, _ := json.Marshal(responseFuture.responseCommand)
log.Printf("resp=%s", string(jsonCmd))
}
err := client.invokeAsync(namesrv, cmd, 5000, getKvCallback)
if err != nil {
log.Print(err)
}
case 105:
getRouteInfoCallback := func(responseFuture *ResponseFuture) {
jsonCmd, _ := json.Marshal(responseFuture.responseCommand)

log.Printf("resp=%s", string(jsonCmd))
log.Print(string(responseFuture.responseCommand.Body))
}
err := client.invokeAsync(namesrv, cmd, 5000, getRouteInfoCallback)
if err != nil {
log.Print(err)
}
case 34:
err := client.invokeAsync(broker, cmd, 5000, callback)
if err != nil {
log.Print(err)
}
case 38:
log.Print("getConsumerListCallback")
getConsumerListCallback := func(responseFuture *ResponseFuture) {
jsonCmd, _ := json.Marshal(responseFuture.responseCommand)

log.Printf("getConsumerListCallback=%s", string(jsonCmd))
log.Print(string(responseFuture.responseCommand.Body))
}
log.Print(cmd)
err := client.invokeAsync(broker, cmd, 5000, getConsumerListCallback)
if err != nil {
log.Print(err)
}
case 11:
pullCallback := func(responseFuture *ResponseFuture) {
//if responseFuture.responseCommand.Code == 0 && len(responseFuture.responseCommand.Body) > 0 {
//msgs := decodeMessage(responseFuture.responseCommand.Body)
//for _, msg := range msgs {
//log.Print(string(msg.Body))
//}
//}
}
err := client.invokeAsync(broker, cmd, 5000, pullCallback)
if err != nil {
log.Print(err)
}
}
}
}

if strings.HasPrefix(line, "header=") {
lastHeader = []byte(strings.TrimLeft(line, "header="))
}
if strings.HasPrefix(line, "body=") {
lastBody = []byte(strings.TrimLeft(line, "body="))
}
}

time.Sleep(1000 * time.Second)
}
222 changes: 222 additions & 0 deletions consumer.go
@@ -0,0 +1,222 @@
package rocketmq

import "log"
import "net"
import "os"
import "strconv"
import "sync/atomic"

const (
BrokerSuspendMaxTimeMillis = 1000 * 15
)

type MessageListener func(msgs []*MessageExt)

type Config struct {
Nameserver string
ClientIp string
InstanceName string
}

type Consumer interface {
//Admin
Start() error
Shutdown()
RegisterMessageListener(listener MessageListener)
Subscribe(topic string, subExpression string)
UnSubcribe(topic string)
SendMessageBack(msg MessageExt, delayLevel int) error
SendMessageBack1(msg MessageExt, delayLevel int, brokerName string) error
fetchSubscribeMessageQueues(topic string) error
}

type DefaultConsumer struct {
conf *Config
consumerGroup string
consumeFromWhere string
consumerType string
messageModel string
unitMode bool

subscription map[string]string
messageListener MessageListener
offsetStore OffsetStore
brokers map[string]net.Conn

rebalance *Rebalance
remotingClient RemotingClient
mqClient *MqClient
}

func NewDefaultConsumer(name string, conf *Config) (Consumer, error) {
if conf == nil {
conf = &Config{
Nameserver: os.Getenv("ROCKETMQ_NAMESVR"),
InstanceName: "DEFAULT",
}
}

remotingClient := NewDefaultRemotingClient()
mqClient := NewMqClient()

rebalance := NewRebalance()
rebalance.groupName = name
rebalance.mqClient = mqClient

offsetStore := new(RemoteOffsetStore)
offsetStore.mqClient = mqClient
offsetStore.groupName = name

pullMessageService := NewPullMessageService()

consumer := &DefaultConsumer{
conf: conf,
consumerGroup: name,
consumeFromWhere: "CONSUME_FROM_LAST_OFFSET",
subscription: make(map[string]string),
offsetStore: offsetStore,
brokers: make(map[string]net.Conn),
rebalance: rebalance,
remotingClient: remotingClient,
mqClient: mqClient,
}

mqClient.consumerTable[name] = consumer
mqClient.remotingClient = remotingClient
mqClient.conf = conf
mqClient.clientId = conf.ClientIp + "@" + strconv.Itoa(os.Getpid())
mqClient.pullMessageService = pullMessageService

rebalance.consumer = consumer
pullMessageService.consumer = consumer

return consumer, nil
}

func (self *DefaultConsumer) Start() error {
self.mqClient.start()
return nil
}

func (self *DefaultConsumer) Shutdown() {
}

func (self *DefaultConsumer) RegisterMessageListener(messageListener MessageListener) {
self.messageListener = messageListener
}

func (self *DefaultConsumer) Subscribe(topic string, subExpression string) {
self.subscription[topic] = subExpression

subData := &SubscriptionData{
Topic: topic,
SubString: subExpression,
}
self.rebalance.subscriptionInner[topic] = subData
}

func (self *DefaultConsumer) UnSubcribe(topic string) {
delete(self.subscription, topic)
}

func (self *DefaultConsumer) SendMessageBack(msg MessageExt, delayLevel int) error {
return nil
}

func (self *DefaultConsumer) SendMessageBack1(msg MessageExt, delayLevel int, brokerName string) error {
return nil
}

func (self *DefaultConsumer) fetchSubscribeMessageQueues(topic string) error {
return nil
}

func (self *DefaultConsumer) pullMessage(pullRequest *PullRequest) {

requestHeader := new(PullMessageRequestHeader)
requestHeader.ConsumerGroup = pullRequest.consumerGroup
requestHeader.Topic = pullRequest.messageQueue.topic
requestHeader.QueueId = pullRequest.messageQueue.queueId
requestHeader.QueueOffset = pullRequest.nextOffset

requestHeader.SysFlag = 2
requestHeader.CommitOffset = 0
requestHeader.SuspendTimeoutMillis = BrokerSuspendMaxTimeMillis
requestHeader.Subscription = "*"

subscriptionData, ok := self.rebalance.subscriptionInner[pullRequest.messageQueue.topic]

if ok {
requestHeader.SubVersion = subscriptionData.SubVersion
}

currOpaque := atomic.AddInt32(&opaque, 1)
remotingCommand := new(RemotingCommand)
remotingCommand.Code = PULL_MESSAGE
remotingCommand.Opaque = currOpaque
remotingCommand.Flag = 0
remotingCommand.Language = "JAVA"
remotingCommand.Version = 79

remotingCommand.ExtFields = requestHeader

brokerAddr, _, found := self.mqClient.findBrokerAddressInSubscribe(pullRequest.messageQueue.brokerName, 0, false)

pullCallback := func(responseFuture *ResponseFuture) {
if responseFuture.responseCommand.Code == 0 && len(responseFuture.responseCommand.Body) > 0 {
var nextBeginOffset int64
var err error
pullResult, ok := responseFuture.responseCommand.ExtFields.(map[string]interface{})
if ok {
if nextBeginOffsetInter, ok := pullResult["nextBeginOffset"]; ok {
if nextBeginOffsetStr, ok := nextBeginOffsetInter.(string); ok {
nextBeginOffset, err = strconv.ParseInt(nextBeginOffsetStr, 10, 64)
if err != nil {
log.Print(err)
return
}

}

}

}

nextPullRequest := &PullRequest{
consumerGroup: pullRequest.consumerGroup,
nextOffset: nextBeginOffset,
messageQueue: pullRequest.messageQueue,
}

self.mqClient.pullMessageService.pullRequestQueue <- nextPullRequest

msgs := decodeMessage(responseFuture.responseCommand.Body)
self.messageListener(msgs)
}
}

if found {
self.remotingClient.invokeAsync(brokerAddr, remotingCommand, 1000, pullCallback)
}
}

func (self *DefaultConsumer) updateTopicSubscribeInfo(topic string, info []*MessageQueue) {
if self.rebalance.subscriptionInner != nil {
_, ok := self.rebalance.subscriptionInner[topic]
if ok {
self.rebalance.topicSubscribeInfoTable[topic] = info
}
}
}

func (self *DefaultConsumer) subscriptions() []*SubscriptionData {
subscriptions := make([]*SubscriptionData, 0)
for _, subscription := range self.rebalance.subscriptionInner {
subscriptions = append(subscriptions, subscription)
}
return subscriptions
}

func (self *DefaultConsumer) doRebalance() {
self.rebalance.doRebalance()
}
30 changes: 30 additions & 0 deletions example/main.go
@@ -0,0 +1,30 @@
package main

import (
rocketmq "didapinche.com/go_rocket_mq"
"log"
"runtime"
"time"
)

func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
log.SetFlags(log.Lshortfile | log.LstdFlags)
conf := &rocketmq.Config{
Nameserver: "192.168.1.234:9876",
ClientIp: "192.168.1.23",
InstanceName: "DEFAULT",
}
consumer, err := rocketmq.NewDefaultConsumer("C_TEST", conf)
if err != nil {
log.Panic(err)
}
consumer.Subscribe("test3", "*")
consumer.RegisterMessageListener(func(msgs []*rocketmq.MessageExt) {
for i, msg := range msgs {
log.Print(i, string(msg.Body))
}
})
consumer.Start()
time.Sleep(1000 * time.Second)
}

0 comments on commit 09604f1

Please sign in to comment.