Skip to content

Commit

Permalink
Add PublisherAddr and PublishedAt
Browse files Browse the repository at this point in the history
  • Loading branch information
buraksezer committed Jun 9, 2020
1 parent cf8b40a commit 69d9341
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 30 deletions.
60 changes: 34 additions & 26 deletions dtopic.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,19 @@ import (
"math/rand"
"runtime"
"sync"
"time"

"github.com/buraksezer/olric/internal/discovery"
"github.com/buraksezer/olric/internal/protocol"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
)

type TopicMessage struct {
Message interface{}
// DTopicMessage denotes a distributed topic message.
type DTopicMessage struct {
Message interface{}
PublisherAddr string
PublishedAt int64
}

// DTopic implements a distributed topic to deliver messages between clients and Olric nodes.
Expand All @@ -40,7 +44,7 @@ type DTopic struct {
}

type listener struct {
f func(message TopicMessage)
f func(message DTopicMessage)
}

type listeners struct {
Expand All @@ -64,7 +68,7 @@ func newDTopic(ctx context.Context) *dtopic {
}
}

func (d *dtopic) addListener(topic string, f func(TopicMessage)) (uint64, error) {
func (d *dtopic) addListener(topic string, f func(DTopicMessage)) (uint64, error) {
d.topics.mtx.Lock()
defer d.topics.mtx.Unlock()

Expand Down Expand Up @@ -102,7 +106,7 @@ func (d *dtopic) removeListener(topic string, regID uint64) error {
return nil
}

func (d *dtopic) dispatch(topic string, msg interface{}) error {
func (d *dtopic) dispatch(topic string, msg *DTopicMessage) error {
d.topics.mtx.RLock()
defer d.topics.mtx.RUnlock()

Expand All @@ -112,10 +116,6 @@ func (d *dtopic) dispatch(topic string, msg interface{}) error {
return fmt.Errorf("topic not found: %s: %w", topic, ErrInvalidArgument)
}

tm := TopicMessage{
Message: msg,
}

var wg sync.WaitGroup
num := int64(runtime.NumCPU())
sem := semaphore.NewWeighted(num)
Expand All @@ -126,10 +126,12 @@ func (d *dtopic) dispatch(topic string, msg interface{}) error {
}

wg.Add(1)
go func(f func(message TopicMessage)) {
// Dereference the pointer and make a copy of DTopicMessage for every listener.
m := *msg
go func(f func(message DTopicMessage)) {
defer wg.Done()
defer sem.Release(1)
f(tm)
f(m)
}(ll.f)
}
wg.Wait()
Expand Down Expand Up @@ -160,28 +162,29 @@ func (db *Olric) NewDTopic(name string) (*DTopic, error) {
}

func (db *Olric) publishDTopicMessageOperation(req *protocol.Message) *protocol.Message {
msg, err := db.unmarshalValue(req.Value)
rawmsg, err := db.unmarshalValue(req.Value)
if err != nil {
req.Error(protocol.StatusInternalServerError, err)
}
msg, ok := rawmsg.(DTopicMessage)
if !ok {
req.Error(protocol.StatusInternalServerError, "invalid distributed topic message received")
}

// req.DMap is a wrong name here. We will refactor the protocol and use a generic name.
err = db.dtopic.dispatch(req.DMap, msg)
err = db.dtopic.dispatch(req.DMap, &msg)
if err != nil {
req.Error(protocol.StatusInternalServerError, err)
}
return req.Success()
}

func (db *Olric) publishDTopicMessageToAddr(member discovery.Member, topic string, data []byte, sem *semaphore.Weighted) error {
func (db *Olric) publishDTopicMessageToAddr(member discovery.Member, topic string, msg *DTopicMessage, sem *semaphore.Weighted) error {
defer db.wg.Done()
defer sem.Release(1)

if hostCmp(member, db.this) {
// Dispatch messages in this process.
var msg interface{}
if err := db.serializer.Unmarshal(data, &msg); err != nil {
return err
}
err := db.dtopic.dispatch(topic, msg)
if err != nil {
db.log.V(6).Printf("[ERROR] Failed to dispatch message on this node: %v", err)
Expand All @@ -192,20 +195,24 @@ func (db *Olric) publishDTopicMessageToAddr(member discovery.Member, topic strin
}
}

data, err := db.serializer.Marshal(*msg)
if err != nil {
return err
}
// TODO: We need to find a better name for DMap in this struct.
req := &protocol.Message{
DMap: topic,
Value: data,
}
_, err := db.requestTo(member.String(), protocol.OpPublishDTopicMessage, req)
_, err = db.requestTo(member.String(), protocol.OpPublishDTopicMessage, req)
if err != nil {
db.log.V(2).Printf("[ERROR] Failed to publish message to %s: %v", member, err)
return err
}
return nil
}

func (db *Olric) publishDTopicMessage(topic string, data []byte) error {
func (db *Olric) publishDTopicMessage(topic string, msg *DTopicMessage) error {
db.members.mtx.RLock()
defer db.members.mtx.RUnlock()

Expand All @@ -228,7 +235,7 @@ func (db *Olric) publishDTopicMessage(topic string, data []byte) error {
db.wg.Add(1)
member := member // https://golang.org/doc/faq#closures_and_goroutines
g.Go(func() error {
return db.publishDTopicMessageToAddr(member, topic, data, sem)
return db.publishDTopicMessageToAddr(member, topic, msg, sem)
})
}
// Wait blocks until all function calls from the Go method have returned,
Expand All @@ -238,16 +245,17 @@ func (db *Olric) publishDTopicMessage(topic string, data []byte) error {

// Publish publishes the given message to listeners of the topic. Message order and delivery are not guaranteed.
func (dt *DTopic) Publish(msg interface{}) error {
data, err := dt.db.serializer.Marshal(msg)
if err != nil {
return err
tm := &DTopicMessage{
Message: msg,
PublisherAddr: dt.db.this.String(),
PublishedAt: time.Now().UnixNano(),
}
return dt.db.publishDTopicMessage(dt.name, data)
return dt.db.publishDTopicMessage(dt.name, tm)
}

// AddListener adds a new listener for the topic. Returns a registration ID or an non-nil error.
// Registered functions are run by parallel.
func (dt *DTopic) AddListener(f func(TopicMessage)) (uint64, error) {
func (dt *DTopic) AddListener(f func(DTopicMessage)) (uint64, error) {
return dt.db.dtopic.addListener(dt.name, f)
}

Expand Down
75 changes: 71 additions & 4 deletions dtopic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestDTopic_PublishStandalone(t *testing.T) {
}

ctx, cancel := context.WithCancel(context.Background())
onMessage := func(msg TopicMessage) {
onMessage := func(msg DTopicMessage) {
defer cancel()
if msg.Message.(string) != "message" {
t.Fatalf("Expected nil. Got: %v", err)
Expand Down Expand Up @@ -87,7 +87,7 @@ func TestDTopic_RemoveListener(t *testing.T) {
t.Fatalf("Expected nil. Got: %v", err)
}

onMessage := func(msg TopicMessage) {}
onMessage := func(msg DTopicMessage) {}
regID, err := dt.AddListener(onMessage)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
Expand Down Expand Up @@ -121,7 +121,7 @@ func TestDTopic_PublishCluster(t *testing.T) {

var count int32
ctx, cancel := context.WithCancel(context.Background())
onMessage := func(msg TopicMessage) {
onMessage := func(msg DTopicMessage) {
defer cancel()
if msg.Message.(string) != "message" {
t.Fatalf("Expected nil. Got: %v", err)
Expand Down Expand Up @@ -204,7 +204,7 @@ func TestDTopic_Destroy(t *testing.T) {
t.Fatalf("Expected nil. Got: %v", err)
}

onMessage := func(msg TopicMessage) {}
onMessage := func(msg DTopicMessage) {}
regID, err := dtOne.AddListener(onMessage)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
Expand All @@ -225,3 +225,70 @@ func TestDTopic_Destroy(t *testing.T) {
t.Fatalf("Expected ErrInvalidArgument. Got: %v", err)
}
}

func TestDTopic_DTopicMessage(t *testing.T) {
c := newTestCluster(nil)
defer c.teardown()

dbOne, err := c.newDB()
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}

dbTwo, err := c.newDB()
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}

// Add listener

dtOne, err := dbOne.NewDTopic("my-topic")
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}

ctx, cancel := context.WithCancel(context.Background())
onMessage := func(msg DTopicMessage) {
defer cancel()
if msg.Message.(string) != "message" {
t.Fatalf("Expected nil. Got: %v", err)
}

if msg.PublisherAddr != dbTwo.this.String() {
t.Fatalf("Expected %s. Got: %s", dbTwo.this.String(), msg.PublisherAddr)
}

if msg.PublishedAt <= 0 {
t.Fatalf("Invalid PublishedAt: %d", msg.PublishedAt)
}
}

regID, err := dtOne.AddListener(onMessage)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
defer func() {
err = dtOne.RemoveListener(regID)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
}()

// Publish

dtTwo, err := dbTwo.NewDTopic("my-topic")
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}

err = dtTwo.Publish("message")
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}

select {
case <-ctx.Done():
case <-time.After(5 * time.Second):
t.Fatal("Failed to call onMessage function")
}
}

0 comments on commit 69d9341

Please sign in to comment.