In [None]:
from typing import List, Any

import reactivex as rx 
import reactivex.operators as ops

# Multicast vs Unicast

- **Unicast**: a single producer is observed by a single observer
  - Every observer gets different instances of data
  - Observation chain is run multiple times
- **Multicast**: a single producer is observed by multiple observers
  - Every observer gets the same instance of data (from the producer)
  - Observation chain is run only once

# `share`

`share` convert an unicast observable to a multicast observable

- Helpful for reducing heavy computation
- `share` does not guarantee replaying previous values on subscription; **even when the origin observable has replay properties**

In [None]:
def cast():
    def subscriber(name: str):
        return lambda v: print(f"{name}: {v}")

    def to_upper(v: str) -> str:
        print(f"to_upper {v}")
        return v.upper()

    disposables = []  # type: List[Any]

    subject = rx.subject.ReplaySubject()
    unicast = subject.pipe(ops.map(to_upper))

    disposables.append(unicast.subscribe(subscriber("unicast a")))
    disposables.append(unicast.subscribe(subscriber("unicast b")))

    multicast = unicast.pipe(ops.share())

    # to_upper will only be called once for both subscriber
    disposables.append(multicast.subscribe(subscriber("multicast a")))
    disposables.append(multicast.subscribe(subscriber("multicast b")))
    
    subject.on_next("v")

    # since share does not entail replay, multicast c will not be called
    disposables.append(multicast.subscribe(subscriber("multicast c")))

    for disposable in disposables:
        disposable.dispose()


cast()
    

to_upper v
unicast a: V
to_upper v
unicast b: V
to_upper v
multicast a: V
multicast b: V


To gain replay feature, chain `replay` after `share`

In [None]:
def multicast_replay():
    def subscriber(name: str):
        return lambda v: print(f"{name}: {v}")

    def to_upper(v: str) -> str:
        print(f"to_upper {v}")
        return v.upper()

    def id(v):
        return v

    disposables = []  # type: List[Any]

    subject = rx.subject.Subject()
    observable = subject.pipe(ops.map(to_upper), ops.share(), ops.replay()) 

    # connect() is only available if ops.replay is last in pipe
    observable.connect()

    disposables.append(observable.subscribe(subscriber("a")))
    disposables.append(observable.subscribe(subscriber("b")))

    subject.on_next("a")

    
multicast_replay()

to_upper a
a: A
b: A
