Skip to content

Commit

Permalink
pubsub implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
aslrousta committed Jun 23, 2022
1 parent 5212178 commit 1f25820
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 2 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
# pubsub
Generic PubSub Pattern in Go
# Generic PubSub

This library provides a generic implementation of pub-sub pattern using channels in Go.
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module github.com/aslrousta/pubsub

go 1.18
3 changes: 3 additions & 0 deletions package.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
// Package pubsub provides a generic implementation of pub-sub pattern using
// channels in Go.
package pubsub
86 changes: 86 additions & 0 deletions pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package pubsub

import "sync"

// PubSub implements pub-sub pattern using channels.
type PubSub[T comparable, M any] struct {
mutex sync.RWMutex
subscriptions map[T][]chan M
}

// New instantiates a new pub-sub.
func New[T comparable, M any]() *PubSub[T, M] {
return &PubSub[T, M]{
subscriptions: make(map[T][]chan M),
}
}

// Publish sends msg to the topic.
func (ps *PubSub[T, M]) Publish(topic T, msg M) {
ps.mutex.RLock()
defer ps.mutex.RUnlock()

if _, ok := ps.subscriptions[topic]; !ok {
return
}

for _, ch := range ps.subscriptions[topic] {
go func(ch chan M) {
ch <- msg
}(ch)
}
}

// Subscribe subscribes to receive messages in the topic on ch.
func (ps *PubSub[T, M]) Subscribe(topic T, ch chan M) {
ps.mutex.Lock()
defer ps.mutex.Unlock()

if subs, ok := ps.subscriptions[topic]; ok {
ps.subscriptions[topic] = append(subs, ch)
} else {
ps.subscriptions[topic] = []chan M{ch}
}
}

// Unsubscribe removes the subscription on the topic.
func (ps *PubSub[T, M]) Unubscribe(topic T, ch chan M) {
ps.mutex.Lock()
defer ps.mutex.Unlock()

if _, ok := ps.subscriptions[topic]; !ok {
return
}

index := -1
for i, c := range ps.subscriptions[topic] {
if c == ch {
index = i
break
}
}
if index >= 0 {
if subs := ps.subscriptions[topic]; len(subs) == 1 {
delete(ps.subscriptions, topic)
} else if index == 0 {
ps.subscriptions[topic] = subs[1:]
} else if index == len(subs)-1 {
ps.subscriptions[topic] = subs[:len(subs)-1]
} else {
ps.subscriptions[topic] = append(subs[:index], subs[index+1:]...)
}
}
}

// Close terminates all topics and closes the subscribed channels.
func (ps *PubSub[T, M]) Close() {
ps.mutex.Lock()
defer ps.mutex.Unlock()

for _, subs := range ps.subscriptions {
for _, ch := range subs {
close(ch)
}
}
ps.subscriptions = nil
}
41 changes: 41 additions & 0 deletions pubsub_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package pubsub_test

import (
"sync"
"sync/atomic"
"testing"

"github.com/aslrousta/pubsub"
)

func TestPubSub(t *testing.T) {
ps := pubsub.New[string, string]()
wg := sync.WaitGroup{}

totalCount := int32(0)
for i := 0; i < 100; i++ {
ch := make(chan string)
ps.Subscribe("test", ch)

wg.Add(1)
go func() {
count := int32(0)
for range ch {
if count++; count == 10 {
break
}
}
atomic.AddInt32(&totalCount, count)
wg.Done()
}()
}

for i := 0; i < 10; i++ {
ps.Publish("test", "message")
}
wg.Wait()

if totalCount != 1000 {
t.FailNow()
}
}

0 comments on commit 1f25820

Please sign in to comment.