Skip to content

Commit

Permalink
fix chan directions for correct semantics
Browse files Browse the repository at this point in the history
  • Loading branch information
aslrousta committed Jun 24, 2022
1 parent 1f25820 commit 7e36e7c
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 9 deletions.
27 changes: 25 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,26 @@
# Generic PubSub
# PubSub

This library provides a generic implementation of pub-sub pattern using channels in Go.
This library provides a generic implementation of pub-sub pattern using channels in Go. An example usage would be like:

```go
ps := pubsub.New[string, int]()

// Produces a random integer each second and publishes
// it on the "random" topic.
go func() {
for {
ps.Publish("random", rand.Int())
time.Sleep(time.Second)
}
}()

// Consumes produced random integers by subscribing on
// the "random" topic.
go func() {
ch := make(chan int)
ps.Subscribe("random", ch)
for r := range ch {
fmt.Println(r)
}
}()
```
2 changes: 1 addition & 1 deletion package.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
// Package pubsub provides a generic implementation of pub-sub pattern using
// channels in Go.
// channels.
package pubsub
12 changes: 6 additions & 6 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import "sync"
// PubSub implements pub-sub pattern using channels.
type PubSub[T comparable, M any] struct {
mutex sync.RWMutex
subscriptions map[T][]chan M
subscriptions map[T][]chan<- M
}

// New instantiates a new pub-sub.
func New[T comparable, M any]() *PubSub[T, M] {
return &PubSub[T, M]{
subscriptions: make(map[T][]chan M),
subscriptions: make(map[T][]chan<- M),
}
}

Expand All @@ -25,26 +25,26 @@ func (ps *PubSub[T, M]) Publish(topic T, msg M) {
}

for _, ch := range ps.subscriptions[topic] {
go func(ch chan M) {
go func(ch chan<- M) {
ch <- msg
}(ch)
}
}

// Subscribe subscribes to receive messages in the topic on ch.
func (ps *PubSub[T, M]) Subscribe(topic T, ch chan M) {
func (ps *PubSub[T, M]) Subscribe(topic T, ch chan<- M) {
ps.mutex.Lock()
defer ps.mutex.Unlock()

if subs, ok := ps.subscriptions[topic]; ok {
ps.subscriptions[topic] = append(subs, ch)
} else {
ps.subscriptions[topic] = []chan M{ch}
ps.subscriptions[topic] = []chan<- M{ch}
}
}

// Unsubscribe removes the subscription on the topic.
func (ps *PubSub[T, M]) Unubscribe(topic T, ch chan M) {
func (ps *PubSub[T, M]) Unubscribe(topic T, ch chan<- M) {
ps.mutex.Lock()
defer ps.mutex.Unlock()

Expand Down

0 comments on commit 7e36e7c

Please sign in to comment.