### Publish and Subscribe with Message Passing (Go)

In the _Publish and Subscribe_ pattern, publishers publish items, and subscribers receive items. The design is related to the Observer pattern in that both distribute data. However:
- With the Observer pattern, there is a single subject; with the Publish and Subscribe pattern, there can be several publishers.
- With Publish and Subscribe, the publisher does not keep track of all the subscribers; instead, a _mediator_ is used for distribution.
- With the Observer pattern, all observers receive updates with (synchronous) calls to re-establish a global invariant; with the Publish and Subscribe pattern, new items are received asynchronously. New items may be published before all existing items have been received.

Below is an implementation with asynchronous message passing in Go.
- Publishers need 0 ... 0.6 seconds to publish a new item. Here, items are non-negative integers. After publishing a certain number of items, a publisher indicates the end of publication to the mediator by sending `-1`.
- The mediator shares one channel of capacity 1 with all publishers for receiving new items and one channel of capacity 1 with all subscribers for receiving new subscriptions. A new subscription consists of a channel over which new items are sent back. The mediator distributes items received from a publisher to all subscribers without delay. Once all publishers have stopped publishing, the channels to all subscribers are closed.
- Each subscriber creates a channel of capacity 3 for receiving new items; subscribers keep receiving from that channel until it is closed.

Your task is to complete the implementation of the mediator! If the channel of one subscriber is full, the mediator may get blocked; you do not need to worry about that case. A possible output is as follows: the printing of some log messages is delayed:

```
publisher A active
publisher B active
publisher A publishing 0
publisher C active
subscriber K subscribed
subscriber L subscribed
publisher C publishing 0
publisher B publishing 0
subscriber M subscribed
publisher A publishing 1
    subscriber M received 1 from A
    subscriber K received 1 from A
    subscriber L received 1 from A
publisher C publishing 1
    subscriber M received 1 from C
    subscriber K received 1 from C
    subscriber L received 1 from C
publisher B publishing 1
    subscriber K received 1 from B
    subscriber L received 1 from B
publisher B publishing 2
    subscriber L received 2 from B
    subscriber M received 1 from B
publisher C publishing 2
    subscriber L received 2 from C
    subscriber K received 2 from B
    subscriber K received 2 from C
publisher A publishing 2
    subscriber L received 2 from A
    subscriber K received 2 from A
    subscriber M received 2 from B
    subscriber M received 2 from C
    subscriber M received 2 from A
publisher A publishing 3
publisher A done
    subscriber L received 3 from A
    subscriber K received 3 from A
    subscriber M received 3 from A
publisher B publishing 3
publisher B done
    subscriber K received 3 from B
    subscriber L received 3 from B
publisher C publishing 3
publisher C done
    subscriber K received 3 from C
    subscriber L received 3 from C
    subscriber M received 3 from B
    subscriber M received 3 from C
subscriber M done
subscriber K done
subscriber L done
```

_Hint:_ Sequences can be implemented in Go with slices: `s = [] T{}` assigns to `s` an empty sequence, `s = append(s, x)` appends `x` of type `T` to `s`, and `for _, x := range s {...}` lets `x` iterate over all elements of `s`.

In [1]:
%%writefile publishsubscribe.go
package main
import ("math/rand"; "time")

type Item struct {
    Publisher string
    Message int
}

const R = 4  // rounds of publishing

func mediator(pch chan Item, sch chan chan Item) {
    subscribers := []chan Item{}
    done := 0

    for {
        select {
            case sub := <- sch:
                subscribers = append(subscribers,sub)
            case pMessage := <- pch:
                if pMessage.Message == -1{
                    done += 1
                } else {
                    for _, sub  := range subscribers {
                         sub <- pMessage 
                    }
                }
            if done == 3 {
                for _, sub  := range subscribers {
                    close(sub)
                }
                return
            }
        }
    }
}
func publisher(id string, ch chan Item) {
    println("publisher", id, "active")
    for m := 0; m < R; m++ {
        time.Sleep(time.Millisecond * time.Duration(rand.Int() % 600)) // sleep between 0 and .6 seconds
        println("publisher", id, "publishing", m)
        ch <- Item{id, m}
    }
     ch <- Item{id, -1}
    println("publisher", id, "done")
}
func subscriber(id string, med chan chan Item) {
    println("subscriber", id, "subscribed")
    ch := make(chan Item, 3); med <- ch // asynchronous channel with capacity 3
    for item := range ch {
        println("    subscriber", id, "received", item.Message, "from", item.Publisher)
        time.Sleep(time.Millisecond * time.Duration(rand.Int() % 200)) // sleep between 0 and .2 seconds
    }
    println("subscriber", id, "done")
}
func main() {
    publisherChannel := make(chan Item, 1) // asynchronous channel with capacity 1
    subscriberChannel := make(chan chan Item, 1) // asynchronous channel with capacity 1
    go mediator(publisherChannel, subscriberChannel)
    for _, name := range []string{"A", "B", "C"} {
        go publisher(name, publisherChannel)
        time.Sleep(time.Millisecond * time.Duration(rand.Int() % 200)) // sleep between 0 and .2 seconds
    }
    for _, name := range []string{"K", "L", "M"} {
        go subscriber(name, subscriberChannel)
        time.Sleep(time.Millisecond * time.Duration(rand.Int() % 200)) // sleep between 0 and .2 seconds
    }
    time.Sleep(4 * time.Second)
}

Overwriting publishsubscribe.go


In [2]:
!go run publishsubscribe.go

publisher A active
publisher B active
publisher C active
subscriber K subscribed
subscriber L subscribed
publisher C publishing 0
    subscriber L received 0 from C
    subscriber K received 0 from C
publisher A publishing 0
    subscriber K received 0 from A
subscriber M subscribed
publisher B publishing 0
    subscriber M received 0 from B
    subscriber K received 0 from B
    subscriber L received 0 from A
publisher A publishing 1
    subscriber K received 1 from A
    subscriber M received 1 from A
    subscriber L received 0 from B
    subscriber L received 1 from A
publisher A publishing 2
    subscriber M received 2 from A
    subscriber L received 2 from A
    subscriber K received 2 from A
publisher C publishing 1
    subscriber M received 1 from C
publisher A publishing 3
publisher A done
    subscriber K received 1 from C
    subscriber L received 1 from C
    subscriber L received 3 from A
    subscriber K received 3 from A
publisher B publishing 1
    subscriber M receive