Skip to content

Commit

Permalink
Concurrency settings for DTopic
Browse files Browse the repository at this point in the history
  • Loading branch information
buraksezer committed Aug 1, 2020
1 parent 4fdd7c3 commit c160cdb
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 38 deletions.
4 changes: 2 additions & 2 deletions client/dtopic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func TestDTopic_DeliveryOrder(t *testing.T) {
<-done
}()

_, err = db.NewDTopic("my-topic", 0)
_, err = db.NewDTopic("my-topic", 0, 0)
if !errors.Is(err, olric.ErrInvalidArgument) {
t.Errorf("Expected ErrInvalidArgument. Got: %v", err)
}
Expand All @@ -299,7 +299,7 @@ func TestDTopic_OrderedDelivery(t *testing.T) {
}
<-done
}()
_, err = db.NewDTopic("my-topic", olric.OrderedDelivery)
_, err = db.NewDTopic("my-topic", 0, olric.OrderedDelivery)
if err != olric.ErrNotImplemented {
t.Errorf("Expected ErrNotImplemented. Got: %v", err)
}
Expand Down
62 changes: 39 additions & 23 deletions dtopic.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,34 +31,44 @@ import (
)

const (
// Messages are delivered in random order. It's good to distribute independent events in a distributed system.
UnorderedDelivery = int16(1) << iota
// Messages are delivered in some order. Not implemented yet.
OrderedDelivery
)

var errListenerIDCollision = errors.New("given listenerID already exists")

// DTopicMessage denotes a distributed topic message.
// DTopicMessage is a message type for DTopic data structure.
type DTopicMessage struct {
Message interface{}
PublisherAddr string
PublishedAt int64
}

// DTopic implements a distributed topic to deliver messages between clients and Olric nodes.
// DTopic implements a distributed topic to deliver messages between clients and Olric nodes. You should know that:
//
// * Communication between parties is one-to-many (fan-out).
// * All data is in-memory, and the published messages are not stored in the cluster.
// * Fire&Forget: message delivery is not guaranteed.
type DTopic struct {
name string
flag int16
db *Olric
name string
flag int16
concurrency int
db *Olric
}

type listener struct {
f func(message DTopicMessage)
}

// Listeners with id.
type listeners struct {
m map[uint64]*listener
m map[uint64]*listener
concurrency int
}

// Registered listeners for topics. A global map for an Olric instance.
type topics struct {
mtx sync.RWMutex
m map[string]*listeners
Expand All @@ -76,7 +86,7 @@ func newDTopic(ctx context.Context) *dtopic {
}
}

func (dt *dtopic) _addListener(listenerID uint64, topic string, f func(DTopicMessage)) error {
func (dt *dtopic) _addListener(listenerID uint64, topic string, concurrency int, f func(DTopicMessage)) error {
dt.topics.mtx.Lock()
defer dt.topics.mtx.Unlock()

Expand All @@ -88,24 +98,26 @@ func (dt *dtopic) _addListener(listenerID uint64, topic string, f func(DTopicMes
l.m[listenerID] = &listener{f: f}
} else {
dt.topics.m[topic] = &listeners{
m: make(map[uint64]*listener),
m: make(map[uint64]*listener),
concurrency: concurrency,
}
dt.topics.m[topic].m[listenerID] = &listener{f: f}
}
return nil
}

func (dt *dtopic) addListener(topic string, f func(DTopicMessage)) (uint64, error) {
func (dt *dtopic) addListener(topic string, concurrency int, f func(DTopicMessage)) (uint64, error) {
listenerID := rand.Uint64()
err := dt._addListener(listenerID, topic, f)
err := dt._addListener(listenerID, topic, concurrency, f)
if err != nil {
return 0, err
}
return listenerID, nil
}

func (dt *dtopic) addRemoteListener(listenerID uint64, topic string, f func(DTopicMessage)) error {
return dt._addListener(listenerID, topic, f)
// addRemoveListener registers clients listeners.
func (dt *dtopic) addRemoteListener(listenerID uint64, topic string, concurrency int, f func(DTopicMessage)) error {
return dt._addListener(listenerID, topic, concurrency, f)
}

func (dt *dtopic) removeListener(topic string, listenerID uint64) error {
Expand All @@ -129,6 +141,8 @@ func (dt *dtopic) removeListener(topic string, listenerID uint64) error {
return nil
}

// dispatch sends a received message to all registered listeners. It doesn't store
// any message. It just dispatch the message.
func (dt *dtopic) dispatch(topic string, msg *DTopicMessage) error {
dt.topics.mtx.RLock()
defer dt.topics.mtx.RUnlock()
Expand All @@ -140,10 +154,10 @@ func (dt *dtopic) dispatch(topic string, msg *DTopicMessage) error {
}

var wg sync.WaitGroup
// TODO: Configure this
num := int64(runtime.NumCPU())
sem := semaphore.NewWeighted(num)

if l.concurrency == 0 {
l.concurrency = runtime.NumCPU()
}
sem := semaphore.NewWeighted(int64(l.concurrency))
for _, ll := range l.m {
if err := sem.Acquire(dt.ctx, 1); err != nil {
return err
Expand All @@ -168,7 +182,7 @@ func (dt *dtopic) destroy(topic string) {
}

// NewDTopic returns a new distributed topic instance.
func (db *Olric) NewDTopic(name string, flag int16) (*DTopic, error) {
func (db *Olric) NewDTopic(name string, concurrency int, flag int16) (*DTopic, error) {
if flag&UnorderedDelivery == 0 && flag&OrderedDelivery == 0 {
return nil, fmt.Errorf("invalid delivery mode: %w", ErrInvalidArgument)
}
Expand All @@ -185,9 +199,10 @@ func (db *Olric) NewDTopic(name string, flag int16) (*DTopic, error) {
return nil, err
}
return &DTopic{
name: name,
db: db,
flag: flag,
name: name,
db: db,
flag: flag,
concurrency: concurrency,
}, nil
}

Expand Down Expand Up @@ -327,7 +342,8 @@ func (db *Olric) exDTopicAddListenerOperation(w, r protocol.EncodeDecoder) {
})
s.write <- m
}
err := db.dtopic.addRemoteListener(listenerID, name, f)
// set concurrency parameter as 0. the registered listener will only make network i/o. NumCPU is good for this.
err := db.dtopic.addRemoteListener(listenerID, name, 0, f)
if err != nil {
db.errorResponse(w, err)
return
Expand Down Expand Up @@ -365,10 +381,10 @@ func (dt *DTopic) Publish(msg interface{}) error {
return dt.db.publishDTopicMessage(dt.name, tm)
}

// AddListener adds a new listener for the topic. Returns a registration ID or an non-nil error.
// AddListener adds a new listener for the topic. Returns a registration ID or a non-nil error.
// Registered functions are run by parallel.
func (dt *DTopic) AddListener(f func(DTopicMessage)) (uint64, error) {
return dt.db.dtopic.addListener(dt.name, f)
return dt.db.dtopic.addListener(dt.name, dt.concurrency, f)
}

func (db *Olric) exDTopicRemoveListenerOperation(w, r protocol.EncodeDecoder) {
Expand Down
26 changes: 13 additions & 13 deletions dtopic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestDTopic_PublishStandalone(t *testing.T) {
}
}()

dt, err := db.NewDTopic("my-topic", UnorderedDelivery)
dt, err := db.NewDTopic("my-topic", 0, UnorderedDelivery)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
Expand Down Expand Up @@ -82,7 +82,7 @@ func TestDTopic_RemoveListener(t *testing.T) {
}
}()

dt, err := db.NewDTopic("my-topic", UnorderedDelivery)
dt, err := db.NewDTopic("my-topic", 0, UnorderedDelivery)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
Expand Down Expand Up @@ -114,7 +114,7 @@ func TestDTopic_PublishCluster(t *testing.T) {

// Add listener

dt, err := db1.NewDTopic("my-topic", UnorderedDelivery)
dt, err := db1.NewDTopic("my-topic", 0, UnorderedDelivery)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
Expand Down Expand Up @@ -142,7 +142,7 @@ func TestDTopic_PublishCluster(t *testing.T) {

// Publish

dt2, err := db2.NewDTopic("my-topic", UnorderedDelivery)
dt2, err := db2.NewDTopic("my-topic", 0, UnorderedDelivery)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
Expand Down Expand Up @@ -174,7 +174,7 @@ func TestDTopic_RemoveListenerNotFound(t *testing.T) {
}
}()

dt, err := db.NewDTopic("my-topic", UnorderedDelivery)
dt, err := db.NewDTopic("my-topic", 0, UnorderedDelivery)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
Expand All @@ -199,7 +199,7 @@ func TestDTopic_Destroy(t *testing.T) {
}

// Add listener
dtOne, err := dbOne.NewDTopic("my-topic", UnorderedDelivery)
dtOne, err := dbOne.NewDTopic("my-topic", 0, UnorderedDelivery)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
Expand All @@ -210,7 +210,7 @@ func TestDTopic_Destroy(t *testing.T) {
t.Fatalf("Expected nil. Got: %v", err)
}

dtTwo, err := dbTwo.NewDTopic("my-topic", UnorderedDelivery)
dtTwo, err := dbTwo.NewDTopic("my-topic", 0, UnorderedDelivery)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
Expand Down Expand Up @@ -242,7 +242,7 @@ func TestDTopic_DTopicMessage(t *testing.T) {

// Add listener

dtOne, err := dbOne.NewDTopic("my-topic", UnorderedDelivery)
dtOne, err := dbOne.NewDTopic("my-topic", 0, UnorderedDelivery)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
Expand Down Expand Up @@ -276,7 +276,7 @@ func TestDTopic_DTopicMessage(t *testing.T) {

// Publish

dtTwo, err := dbTwo.NewDTopic("my-topic", UnorderedDelivery)
dtTwo, err := dbTwo.NewDTopic("my-topic", 0, UnorderedDelivery)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
Expand Down Expand Up @@ -309,7 +309,7 @@ func TestDTopic_PublishMessagesCluster(t *testing.T) {

// Add listener

dt, err := db1.NewDTopic("my-topic", UnorderedDelivery)
dt, err := db1.NewDTopic("my-topic", 0, UnorderedDelivery)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
Expand Down Expand Up @@ -339,7 +339,7 @@ func TestDTopic_PublishMessagesCluster(t *testing.T) {

// Publish

dt2, err := db2.NewDTopic("my-topic", UnorderedDelivery)
dt2, err := db2.NewDTopic("my-topic", 0, UnorderedDelivery)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
Expand All @@ -365,7 +365,7 @@ func TestDTopic_DeliveryOrder(t *testing.T) {
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
_, err = db.NewDTopic("my-topic", 0)
_, err = db.NewDTopic("my-topic", 0, 0)
if !errors.Is(err, ErrInvalidArgument) {
t.Errorf("Expected ErrInvalidArgument. Got: %v", err)
}
Expand All @@ -376,7 +376,7 @@ func TestDTopic_OrderedDelivery(t *testing.T) {
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
_, err = db.NewDTopic("my-topic", OrderedDelivery)
_, err = db.NewDTopic("my-topic", 0, OrderedDelivery)
if err != ErrNotImplemented {
t.Errorf("Expected ErrNotImplemented. Got: %v", err)
}
Expand Down

0 comments on commit c160cdb

Please sign in to comment.