Skip to content

Commit

Permalink
Add concurrency parameter to limit parallel callback functions
Browse files Browse the repository at this point in the history
  • Loading branch information
buraksezer committed Aug 1, 2020
1 parent e4fd637 commit 4fdd7c3
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 16 deletions.
23 changes: 13 additions & 10 deletions client/dtopic.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,28 @@ import (
// DTopic denotes a distributed topic instance in the cluster.
type DTopic struct {
*Client
name string
flag int16
name string
flag int16
concurrency int

mu sync.Mutex
listeners map[uint64]struct{}
}

// NewDTopic creates and returns a new DTopic instance. It doesn't create any underlying TCP socket and goroutine.
func (c *Client) NewDTopic(name string, flag int16) (*DTopic, error) {
func (c *Client) NewDTopic(name string, concurrency int, flag int16) (*DTopic, error) {
if flag&olric.UnorderedDelivery == 0 && flag&olric.OrderedDelivery == 0 {
return nil, fmt.Errorf("invalid delivery mode: %w", olric.ErrInvalidArgument)
}
if flag&olric.OrderedDelivery != 0 {
return nil, olric.ErrNotImplemented
}
return &DTopic{
Client: c,
name: name,
flag: flag,
listeners: make(map[uint64]struct{}),
Client: c,
name: name,
flag: flag,
concurrency: concurrency,
listeners: make(map[uint64]struct{}),
}, nil
}

Expand All @@ -71,9 +73,10 @@ func (dt *DTopic) listen(l *listener, f func(olric.DTopicMessage)) {
defer dt.wg.Done()

// Limit concurrency to prevent CPU or network I/O starvation.
// TODO: Configure this
num := int64(runtime.NumCPU())
sem := semaphore.NewWeighted(num)
if dt.concurrency == 0 {
dt.concurrency = runtime.NumCPU()
}
sem := semaphore.NewWeighted(int64(dt.concurrency))
for {
select {
case <-l.ctx.Done():
Expand Down
12 changes: 6 additions & 6 deletions client/dtopic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestClient_DTopicPublish(t *testing.T) {
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
dt, err := c.NewDTopic("my-dtopic", olric.UnorderedDelivery)
dt, err := c.NewDTopic("my-dtopic", 0, olric.UnorderedDelivery)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
Expand All @@ -68,7 +68,7 @@ func TestClient_DTopicPublishMessages(t *testing.T) {
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
dt, err := c.NewDTopic("my-dtopic", olric.UnorderedDelivery)
dt, err := c.NewDTopic("my-dtopic", 0, olric.UnorderedDelivery)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
Expand Down Expand Up @@ -123,7 +123,7 @@ func TestClient_DTopicAddListener(t *testing.T) {
t.Fatalf("Expected nil. Got: %v", err)
}
onMessage := func(message olric.DTopicMessage) {}
dt, err := c.NewDTopic("my-dtopic", olric.UnorderedDelivery)
dt, err := c.NewDTopic("my-dtopic", 0, olric.UnorderedDelivery)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
Expand Down Expand Up @@ -161,7 +161,7 @@ func TestClient_DTopicOnMessage(t *testing.T) {
}
}

dt, err := c.NewDTopic("my-dtopic", olric.UnorderedDelivery)
dt, err := c.NewDTopic("my-dtopic", 0, olric.UnorderedDelivery)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
Expand Down Expand Up @@ -200,7 +200,7 @@ func TestClient_DTopicRemoveListener(t *testing.T) {
t.Fatalf("Expected nil. Got: %v", err)
}
onMessage := func(message olric.DTopicMessage) {}
dt, err := c.NewDTopic("my-dtopic", olric.UnorderedDelivery)
dt, err := c.NewDTopic("my-dtopic", 0, olric.UnorderedDelivery)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
Expand Down Expand Up @@ -242,7 +242,7 @@ func TestClient_DTopicDestroy(t *testing.T) {
t.Fatalf("Expected nil. Got: %v", err)
}
onMessage := func(message olric.DTopicMessage) {}
dt, err := c.NewDTopic("my-dtopic", olric.UnorderedDelivery)
dt, err := c.NewDTopic("my-dtopic", 0, olric.UnorderedDelivery)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
Expand Down

0 comments on commit 4fdd7c3

Please sign in to comment.