Skip to content

Commit

Permalink
resolve #89
Browse files Browse the repository at this point in the history
  • Loading branch information
sunface committed Jun 19, 2018
1 parent 8ee7958 commit 076c737
Show file tree
Hide file tree
Showing 37 changed files with 3,578 additions and 65 deletions.
2 changes: 1 addition & 1 deletion broker/broker.yaml
Expand Up @@ -13,7 +13,7 @@ broker:
serverid: 1

store:
engine: fdb
engine: memory
fdb:
namespace: meq-store
threads: 5
Expand Down
14 changes: 5 additions & 9 deletions broker/service/client.go
Expand Up @@ -60,7 +60,6 @@ func initClient(cid uint64, conn net.Conn, bk *Broker) *client {
}
}
func (c *client) readLoop(isWs bool) error {
c.bk.wg.Add(1)
defer func() {
c.closed = true
c.closech <- struct{}{}
Expand All @@ -85,7 +84,6 @@ func (c *client) readLoop(isWs bool) error {
submsg := SubMessage{CLUSTER_UNSUB, []byte(topic), c.cid, []byte("")}
c.bk.cluster.peer.send.GossipBroadcast(submsg)
}
c.bk.wg.Done()
if err := recover(); err != nil {
L.Info("read loop panic:", zap.Error(err.(error)), zap.Stack("stack"))
return
Expand All @@ -95,7 +93,6 @@ func (c *client) readLoop(isWs bool) error {
reader := bufio.NewReaderSize(c.conn, 65536)
for !c.closed {
c.conn.SetDeadline(time.Now().Add(time.Second * proto.MAX_IDLE_TIME))

msg, err := mqtt.DecodePacket(reader)
if err != nil {
return err
Expand Down Expand Up @@ -151,7 +148,6 @@ func (c *client) readLoop(isWs bool) error {
packet := msg.(*mqtt.Publish)
if len(packet.Payload) > 0 {
cmd := packet.Payload[0]
fmt.Println(cmd)
switch cmd {
case proto.MSG_PULL:
count, offset := proto.UnPackPullMsg(packet.Payload[1:])
Expand Down Expand Up @@ -242,14 +238,14 @@ func (c *client) readLoop(isWs bool) error {
case proto.MSG_BROADCAST:
// clients publish messges to a broadcast topic
// broadcast will not store the messages
ms, err := proto.UnpackPubBatch(packet.Payload[1:])
m, err := proto.UnpackMsg(packet.Payload[1:])
if err != nil {
return err
}
for _, m := range ms {
// gen msg id
m.ID = c.bk.idgen.Generate().Bytes()
}
// gen msg id
m.ID = c.bk.idgen.Generate().Bytes()

ms := []*proto.PubMsg{m}
publishOnline(c.cid, c.bk, ms, true)

case proto.MSG_REDUCE_COUNT:
Expand Down
6 changes: 4 additions & 2 deletions broker/service/fdb_store.go
Expand Up @@ -356,8 +356,10 @@ func (f *FdbStore) Del(topic []byte, msgid []byte) error {
func put(d *database, msgs []*proto.PubMsg) {
_, err := d.db.Transact(func(tr fdb.Transaction) (ret interface{}, err error) {
for _, msg := range msgs {
key := d.msgsp.Pack(tuple.Tuple{msg.Topic, msg.ID})
tr.Set(key, proto.PackMsg(msg))
if msg.QoS != proto.QOS0 {
key := d.msgsp.Pack(tuple.Tuple{msg.Topic, msg.ID})
tr.Set(key, proto.PackMsg(msg))
}
}
return
})
Expand Down
101 changes: 90 additions & 11 deletions broker/service/mem_store.go
Expand Up @@ -30,6 +30,9 @@ type MemStore struct {
DBIndex map[string][]string
DBIDIndex map[string]string

chatroom map[string]map[string]int
topicCount map[string]int

timerDB []*proto.TimerMsg

bk *Broker
Expand Down Expand Up @@ -69,7 +72,8 @@ func (ms *MemStore) Init() {
ms.cache = make([]*proto.PubMsg, 0, MaxCacheLength)
ms.msgSyncCache = make([]*proto.PubMsg, 0, MaxSyncMsgLen)
ms.readSyncCache = make([]proto.Ack, 0, MaxSyncAckLen)

ms.chatroom = make(map[string]map[string]int)
ms.topicCount = make(map[string]int)
go func() {
ms.bk.wg.Add(1)
defer ms.bk.wg.Done()
Expand Down Expand Up @@ -168,7 +172,44 @@ func (ms *MemStore) MarkRead(topic []byte, msgids [][]byte) {
}

func (ms *MemStore) UpdateUnreadCount(topic []byte, user []byte, isAdd bool, count int) {

tp := proto.GetTopicType(topic)
if tp == proto.TopicTypeNormal {
c, ok := ms.topicCount[talent.Bytes2String(topic)]
if !isAdd {
if ok {
if count == proto.REDUCE_ALL_COUNT {
ms.topicCount[talent.Bytes2String(topic)] = 0
} else {
if c-count > 0 {
ms.topicCount[talent.Bytes2String(topic)] = c - count
} else {
ms.topicCount[talent.Bytes2String(topic)] = 0
}
}
}
} else {
if !ok {
ms.topicCount[talent.Bytes2String(topic)] = count
} else {
ms.topicCount[talent.Bytes2String(topic)] = c + count
}
}
} else {
t, ok := ms.chatroom[talent.Bytes2String(topic)]
if !ok {
return
}
if !isAdd {
_, ok := t[talent.Bytes2String(user)]
if ok {
t[talent.Bytes2String(user)] = 0
}
} else {
for u, c := range t {
t[u] = c + count
}
}
}
}

func (ms *MemStore) Query(t []byte, count int, offset []byte, acked bool) []*proto.PubMsg {
Expand Down Expand Up @@ -242,17 +283,20 @@ func (ms *MemStore) Query(t []byte, count int, offset []byte, acked bool) []*pro
}

func (ms *MemStore) UnreadCount(topic []byte, user []byte) int {
t := string(topic)
ms.Lock()
defer ms.Unlock()
ms.RLock()
defer ms.RUnlock()

var count int
for _, m := range ms.DB[t] {
if !m.Acked {
count++
tp := proto.GetTopicType(topic)
if tp == proto.TopicTypeNormal {
return ms.topicCount[talent.Bytes2String(topic)]
} else {
t, ok := ms.chatroom[talent.Bytes2String(topic)]
if !ok {
return 0
}

return t[talent.Bytes2String(user)]
}
return count
}

func (ms *MemStore) StoreTM(m *proto.TimerMsg) {
Expand Down Expand Up @@ -283,15 +327,49 @@ func (ms *MemStore) QueryTM() []*proto.PubMsg {
}

func (ms *MemStore) JoinChat(topic []byte, user []byte) error {
ms.Lock()
defer ms.Unlock()

t, ok := ms.chatroom[talent.Bytes2String(topic)]
if !ok {
ms.chatroom[talent.Bytes2String(topic)] = map[string]int{
talent.Bytes2String(user): 0,
}
} else {
_, ok := t[talent.Bytes2String(user)]
if !ok {
t[talent.Bytes2String(user)] = 0
}
}

return nil
}

func (ms *MemStore) LeaveChat(topic []byte, user []byte) error {
ms.Lock()
defer ms.Unlock()

t, ok := ms.chatroom[talent.Bytes2String(topic)]
if ok {
delete(t, talent.Bytes2String(user))
}

return nil
}

func (ms *MemStore) GetChatUsers(topic []byte) [][]byte {
return nil
ms.Lock()
defer ms.Unlock()

users := make([][]byte, 0)
t, ok := ms.chatroom[talent.Bytes2String(topic)]
if ok {
for u := range t {
users = append(users, talent.String2Bytes(u))
}
}

return users
}

func (ms *MemStore) Del(topic []byte, msgid []byte) error {
Expand Down Expand Up @@ -321,6 +399,7 @@ func (ms *MemStore) flush() {
ms.DB[t][talent.Bytes2String(msg.ID)] = msg
ms.DBIndex[t] = append(ms.DBIndex[t], talent.Bytes2String(msg.ID))
ms.DBIDIndex[talent.Bytes2String(msg.ID)] = t
ms.topicCount[t]++
ms.Unlock()
}
ms.Lock()
Expand Down
2 changes: 1 addition & 1 deletion broker/service/message_id_test.go
Expand Up @@ -14,7 +14,7 @@ func TestGenMessageID(t *testing.T) {
b2.conf.Broker.ServerID = 2
StartIDGenerator(b2)
id1 := b1.idgen.Generate().Int64()
id2 := b1.idgen.Generate().Int64()
id2 := b1.idgen.Generate().Time()

assert.NotEqual(t, id1, id2)

Expand Down
6 changes: 1 addition & 5 deletions broker/service/router.go
Expand Up @@ -15,7 +15,6 @@ package service

import (
"encoding/binary"
"fmt"
"sync"

"github.com/cosmos-gg/meq/proto"
Expand Down Expand Up @@ -48,14 +47,13 @@ func (r *Router) recvRoute(src mesh.PeerName, buf []byte) {
cid := uint64(binary.LittleEndian.Uint32(buf[:4]))
cmd := buf[4]

fmt.Println(cmd, cid)
r.RLock()
c, ok := r.bk.clients[cid]
r.RUnlock()
if !ok {
return
}
fmt.Println(cmd, cid)

switch cmd {
case proto.MSG_PUB_BATCH:
msgs, err := proto.UnpackPubBatch(buf[5:])
Expand All @@ -65,8 +63,6 @@ func (r *Router) recvRoute(src mesh.PeerName, buf []byte) {
}
c.msgSender <- msgs
case proto.MSG_JOIN_CHAT: // notify someone has join the chat
fmt.Println("recv join chat msgs:")
fmt.Println(proto.UnpackJoinChatNotify(buf[5:]))
notifyOne(c.conn, buf[5:])
}
}
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.

0 comments on commit 076c737

Please sign in to comment.