# Observer or Pub-Sub Pattern
---

## What it is?

- Observer is a `behavioral design pattern` that allows some objects to notify other objects about changes in their state.
- Its provides a way to `subscribe` and `unsubscribe` to and from these notifications.
- So that, when an object changes its state, all the objects that depend on it are `notified` and `updated` automatically.
- This way the workload can be democratized between all the subscribers. This will improve the performance & efficiency of the system.

## Explanation

**`ELI5 Example`:** Imagine you are watching a cricket match. Instead of directly checking the live score repeatedly, you subscribe to a cricket score app on your phone. Now, whenever the score changes, the app sends you a notification. This way, you get updates automatically without constantly checking the score.

This is what the Observer pattern does: it lets multiple objects (observers) subscribe to another object (subject) so that whenever the subject's state changes, all the observers are notified automatically.

**`Technical Explanation`:** You have a `Subject` or `Publisher` class that maintains a list of observers in an queue. The `Subject` class has methods to `subscribe` and `unsubscribe` observers. It also has a method to `notify` all the observers when its state changes. Once the observer is notified, it can `pull` the updated data from the subject and process it.

In [2]:
# Implementation of Observer Pattern using Asyncio in Python

from dataclasses import dataclass
from asyncio import Queue, Lock, sleep, create_task
from typing import Any

@dataclass
class Publisher:

    def __post_init__(self):
        self._queue = Queue()
        self._lock = Lock()

    async def publish(self, data: Any):
        await self._queue.put(data)

    async def subscribe(self, sub_id: int):
        while True:
            data = await self._queue.get()
            async with self._lock:
                print(f"Subscriber {sub_id} received: {data}")
                await sleep(1)
                print(f"Subscriber {sub_id} processed: {data}")
                self._queue.task_done()


In [None]:
pub = Publisher()

create_task(pub.subscribe(1))
create_task(pub.subscribe(2))
create_task(pub.subscribe(3))
create_task(pub.subscribe(4))


<Task pending name='Task-8' coro=<Publisher.subscribe() running at C:\Users\desar\AppData\Local\Temp\ipykernel_26248\2921365266.py:17>>

Subscriber 1 received: Event 0
Subscriber 1 processed: Event 0
Subscriber 2 received: Event 1
Subscriber 2 processed: Event 1
Subscriber 3 received: Event 2
Subscriber 3 processed: Event 2
Subscriber 4 received: Event 3
Subscriber 4 processed: Event 3
Subscriber 1 received: Event 4
Subscriber 1 processed: Event 4
Subscriber 2 received: Event 0
Subscriber 2 processed: Event 0
Subscriber 3 received: Event 1
Subscriber 3 processed: Event 1
Subscriber 4 received: Event 2
Subscriber 4 processed: Event 2
Subscriber 1 received: Event 3
Subscriber 1 processed: Event 3
Subscriber 2 received: Event 4
Subscriber 2 processed: Event 4


In [6]:
# Publish events
for i in range(5):
    await pub.publish(f"Event {i}")
    await sleep(0.5)

# Wait for all events to be processed
await pub._queue.join()

## Key Takeaways:

- This design pattern is nicely suited for `event handling systems`, `distributed systems`, `message brokers`, etc.
- It helps in `reducing coupling` between the objects.
- It help in efficiencies since subscribers compete with each other to process the data.
- It can be combined with Event driven autoscaling, where the autoscaler can scale subscribers based on the number of events. [`KEDA`](https://keda.sh) is a good example of this.  