Skip to content

Commit

Permalink
RemoveListener properly works in Go client
Browse files Browse the repository at this point in the history
  • Loading branch information
buraksezer committed Jun 20, 2020
1 parent 508dc3f commit 462300a
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 63 deletions.
15 changes: 4 additions & 11 deletions client/dtopic.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,10 @@ func (dt *DTopic) AddListener(f func(olric.DTopicMessage)) (uint64, error) {
}

func (dt *DTopic) RemoveListener(listenerID uint64) error {
streamID, err := dt.findStreamIDByListenerID(listenerID)
if err != nil {
return err
}
m := &protocol.Message{
DMap: dt.name,
Extra: protocol.DTopicRemoveListenerExtra{
ListenerID: listenerID,
StreamID: streamID,
},
}
resp, err := dt.client.Request(protocol.OpDTopicRemoveListener, m)
Expand All @@ -116,15 +111,13 @@ func (dt *DTopic) RemoveListener(listenerID uint64) error {
return dt.removeStreamListener(listenerID)
}

/*
func (d *DTopic) Destroy() error {
func (dt *DTopic) Destroy() error {
m := &protocol.Message{
DMap: d.name,
DMap: dt.name,
}
resp, err := d.client.Request(protocol.OpDTopicDestroy, m)
resp, err := dt.client.Request(protocol.OpDTopicDestroy, m)
if err != nil {
return err
}
return checkStatusCode(resp)
}
*/
}
39 changes: 39 additions & 0 deletions client/dtopic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,42 @@ func TestClient_DTopicRemoveListener(t *testing.T) {
}
}
}

func TestClient_DTopicDestroy(t *testing.T) {
db, done, err := newDB()
if err != nil {
t.Fatalf("Expected nil. Got %v", err)
}
defer func() {
serr := db.Shutdown(context.Background())
if serr != nil {
t.Errorf("Expected nil. Got %v", serr)
}
<-done
}()

c, err := New(testConfig)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
onMessage := func(message olric.DTopicMessage) {}
dt := c.NewDTopic("my-dtopic")
listenerID, err := dt.AddListener(onMessage)
if err != nil {
t.Errorf("Expected nil. Got: %s", err)
}
err = dt.RemoveListener(listenerID)
if err != nil {
t.Errorf("Expected nil. Got: %s", err)
}
dt.streams.mu.RLock()
defer dt.streams.mu.RUnlock()

for _, s := range dt.streams.m {
for id, _ := range s.listeners {
if id == listenerID {
t.Fatalf("ListenerID: %d is still exist", id)
}
}
}
}
15 changes: 0 additions & 15 deletions client/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,18 +204,3 @@ func (c *Client) removeStreamListener(listenerID uint64) error {
}
return fmt.Errorf("no listener found with given ID")
}

func (c *Client) findStreamIDByListenerID(listenerID uint64) (uint64, error) {
c.streams.mu.RLock()
defer c.streams.mu.RUnlock()

for streamID, s := range c.streams.m {
for id, _ := range s.listeners {
if id == listenerID {
return streamID, nil
}
}
}
// TODO: Create an error type for this
return 0, fmt.Errorf("no listener found with given ID")
}
84 changes: 59 additions & 25 deletions dtopic.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"golang.org/x/sync/semaphore"
)

var errListenerIDCollision = errors.New("given listenerID already exists")

// DTopicMessage denotes a distributed topic message.
type DTopicMessage struct {
Message interface{}
Expand Down Expand Up @@ -68,28 +70,44 @@ func newDTopic(ctx context.Context) *dtopic {
}
}

func (d *dtopic) addListener(topic string, f func(DTopicMessage)) (uint64, error) {
d.topics.mtx.Lock()
defer d.topics.mtx.Unlock()
func (dt *dtopic) _addListener(listenerID uint64, topic string, f func(DTopicMessage)) error {
dt.topics.mtx.Lock()
defer dt.topics.mtx.Unlock()

listenerID := rand.Uint64()
l, ok := d.topics.m[topic]
// TODO: Check listenerID for collisions
l, ok := dt.topics.m[topic]
if ok {
if _, ok = l.m[listenerID]; ok {
return errListenerIDCollision
}
l.m[listenerID] = &listener{f: f}
} else {
d.topics.m[topic] = &listeners{
dt.topics.m[topic] = &listeners{
m: make(map[uint64]*listener),
}
d.topics.m[topic].m[listenerID] = &listener{f: f}
dt.topics.m[topic].m[listenerID] = &listener{f: f}
}
return nil
}

func (dt *dtopic) addListener(topic string, f func(DTopicMessage)) (uint64, error) {
listenerID := rand.Uint64()
err := dt._addListener(listenerID, topic, f)
if err != nil {
return 0, err
}
return listenerID, nil
}

func (d *dtopic) removeListener(topic string, listenerID uint64) error {
d.topics.mtx.Lock()
defer d.topics.mtx.Unlock()
func (dt *dtopic) addRemoteListener(listenerID uint64, topic string, f func(DTopicMessage)) error {
return dt._addListener(listenerID, topic, f)
}

func (dt *dtopic) removeListener(topic string, listenerID uint64) error {
dt.topics.mtx.Lock()
defer dt.topics.mtx.Unlock()

l, ok := d.topics.m[topic]
l, ok := dt.topics.m[topic]
if !ok {
return fmt.Errorf("topic not found: %s: %w", topic, ErrInvalidArgument)
}
Expand All @@ -101,16 +119,16 @@ func (d *dtopic) removeListener(topic string, listenerID uint64) error {

delete(l.m, listenerID)
if len(l.m) == 0 {
delete(d.topics.m, topic)
delete(dt.topics.m, topic)
}
return nil
}

func (d *dtopic) dispatch(topic string, msg *DTopicMessage) error {
d.topics.mtx.RLock()
defer d.topics.mtx.RUnlock()
func (dt *dtopic) dispatch(topic string, msg *DTopicMessage) error {
dt.topics.mtx.RLock()
defer dt.topics.mtx.RUnlock()

l, ok := d.topics.m[topic]
l, ok := dt.topics.m[topic]
if !ok {
// there is no listener for this topic on this node.
return fmt.Errorf("topic not found: %s: %w", topic, ErrInvalidArgument)
Expand All @@ -121,7 +139,7 @@ func (d *dtopic) dispatch(topic string, msg *DTopicMessage) error {
sem := semaphore.NewWeighted(num)

for _, ll := range l.m {
if err := sem.Acquire(d.ctx, 1); err != nil {
if err := sem.Acquire(dt.ctx, 1); err != nil {
return err
}

Expand All @@ -138,10 +156,10 @@ func (d *dtopic) dispatch(topic string, msg *DTopicMessage) error {
return nil
}

func (d *dtopic) destroy(topic string) {
d.topics.mtx.Lock()
defer d.topics.mtx.Unlock()
delete(d.topics.m, topic)
func (dt *dtopic) destroy(topic string) {
dt.topics.mtx.Lock()
defer dt.topics.mtx.Unlock()
delete(dt.topics.m, topic)
}

// NewDTopic returns a new distributed topic instance.
Expand Down Expand Up @@ -262,21 +280,21 @@ func (db *Olric) exDTopicAddListenerOperation(req *protocol.Message) *protocol.M
s, ok := db.streams.m[streamID]
db.streams.mu.RUnlock()
if !ok {
db.log.V(2).Println("[ERROR] Stream could not be found with the given ID: %d", streamID)
db.log.V(2).Printf("[ERROR] Stream could not be found with the given ID: %d", streamID)
// TODO: Deregister this listener
return
}
value, err := db.serializer.Marshal(msg)
if err != nil {
db.log.V(2).Println("[ERROR] Failed to serialize DTopicMessage: %v", err)
db.log.V(2).Printf("[ERROR] Failed to serialize DTopicMessage: %v", err)
return
}
m := protocol.NewStreamMessage(listenerID)
m.DMap = name
m.Value = value
s.write <- m
}
_, err := db.dtopic.addListener(name, f)
err := db.dtopic.addRemoteListener(listenerID, name, f)
if err != nil {
return req.Error(protocol.StatusInternalServerError, err)
}
Expand Down Expand Up @@ -317,7 +335,15 @@ func (dt *DTopic) AddListener(f func(DTopicMessage)) (uint64, error) {
}

func (db *Olric) exDTopicRemoveListenerOperation(req *protocol.Message) *protocol.Message {
db.dtopic.removeListener(req.DMap, )
extra, ok := req.Extra.(protocol.DTopicRemoveListenerExtra)
if !ok {
return req.Error(protocol.StatusBadRequest, "wrong Extra type")
}
err := db.dtopic.removeListener(req.DMap, extra.ListenerID)
if err != nil {
return req.Error(protocol.StatusInternalServerError, err)
}
return req.Success()
}

// RemoveListener removes a listener with the given listenerID.
Expand Down Expand Up @@ -358,6 +384,14 @@ func (db *Olric) destroyDTopicOnCluster(topic string) error {
return g.Wait()
}

func (db *Olric) exDTopicDestroyOperation(req *protocol.Message) *protocol.Message {
err := db.destroyDTopicOnCluster(req.DMap)
if err != nil {
return req.Error(protocol.StatusInternalServerError, err)
}
return req.Success()
}

// Destroy removes all listeners for this topic on the cluster. If Publish function is called again after Destroy, the topic will be
// recreated.
func (dt *DTopic) Destroy() error {
Expand Down
20 changes: 10 additions & 10 deletions dtopic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ func TestDTopic_PublishStandalone(t *testing.T) {
}
}

regID, err := dt.AddListener(onMessage)
listenerID, err := dt.AddListener(onMessage)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
defer func() {
err = dt.RemoveListener(regID)
err = dt.RemoveListener(listenerID)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
Expand Down Expand Up @@ -88,11 +88,11 @@ func TestDTopic_RemoveListener(t *testing.T) {
}

onMessage := func(msg DTopicMessage) {}
regID, err := dt.AddListener(onMessage)
listenerID, err := dt.AddListener(onMessage)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
err = dt.RemoveListener(regID)
err = dt.RemoveListener(listenerID)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
Expand Down Expand Up @@ -129,12 +129,12 @@ func TestDTopic_PublishCluster(t *testing.T) {
atomic.AddInt32(&count, 1)
}

regID, err := dt.AddListener(onMessage)
listenerID, err := dt.AddListener(onMessage)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
defer func() {
err = dt.RemoveListener(regID)
err = dt.RemoveListener(listenerID)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
Expand Down Expand Up @@ -205,7 +205,7 @@ func TestDTopic_Destroy(t *testing.T) {
}

onMessage := func(msg DTopicMessage) {}
regID, err := dtOne.AddListener(onMessage)
listenerID, err := dtOne.AddListener(onMessage)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
Expand All @@ -220,7 +220,7 @@ func TestDTopic_Destroy(t *testing.T) {
t.Fatalf("Expected nil. Got: %v", err)
}

err = dtOne.RemoveListener(regID)
err = dtOne.RemoveListener(listenerID)
if !errors.Is(err, ErrInvalidArgument) {
t.Fatalf("Expected ErrInvalidArgument. Got: %v", err)
}
Expand Down Expand Up @@ -263,12 +263,12 @@ func TestDTopic_DTopicMessage(t *testing.T) {
}
}

regID, err := dtOne.AddListener(onMessage)
listenerID, err := dtOne.AddListener(onMessage)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
defer func() {
err = dtOne.RemoveListener(regID)
err = dtOne.RemoveListener(listenerID)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ const (
OpStreamCreated
OpStreamClosed
OpStreamMessage
OpDTopicDestroy
)

type StatusCode uint8
Expand Down Expand Up @@ -236,7 +237,6 @@ type DTopicAddListenerExtra struct {
}

type DTopicRemoveListenerExtra struct {
StreamID uint64
ListenerID uint64
}

Expand Down
3 changes: 2 additions & 1 deletion olric.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ var (
)

// ReleaseVersion is the current stable version of Olric
const ReleaseVersion string = "0.2.0"
const ReleaseVersion string = "0.3.0"

const (
nilTimeout = 0 * time.Second
Expand Down Expand Up @@ -455,6 +455,7 @@ func (db *Olric) registerOperations() {
db.operations[protocol.OpDTopicPublish] = db.exDTopicPublishOperation
db.operations[protocol.OpDTopicAddListener] = db.exDTopicAddListenerOperation
db.operations[protocol.OpDTopicRemoveListener] = db.exDTopicRemoveListenerOperation
db.operations[protocol.OpDTopicDestroy] = db.exDTopicDestroyOperation

// Bidirectional communication channel for clients and cluster members.
db.operations[protocol.OpCreateStream] = db.createStreamOperation
Expand Down

0 comments on commit 462300a

Please sign in to comment.