# Observer Pattern - Single and Multi-threaded Implementation in Python

In this article we will consider one of the very well known behavioral patterns called the **observer pattern**.

In this pattern we are isolating a group of objects called `Observer`s from the `Observable` (also known as `Subject`).

The `Observer` is interested in a state change of the `Observable`.
The role of the `Observable` is to notify all registered observers about the state update (similar to a notification
 subscription on our phones).

We will implement initially a single-threaded version of this pattern.
Then we will discuss the potential issues in a multi threaded approach and finally progress to a thread safe version
that is as performant as we can make it.

### Creating Auxiliary Classes
We will define the auxiliary classes that will be needed.

We require an `Event` object definition that will be used to pass information into the `Observer` from the `Observable`.

In [12]:
from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime
from threading import Lock, Thread, currentThread
from time import sleep
from typing import DefaultDict, List


class EventKeys:
    EXPENSES = 'expenses'


@dataclass
class Event:
    name: str
    data: dict

### Defining Interfaces for the Observer Pattern
Below we define the `Observer` and `Observable` as abstract classes with their methods.

In [13]:
from abc import ABC, abstractmethod



class IObserver(ABC):
    @abstractmethod
    def update(self, event: Event, key: str):
        raise NotImplementedError


class IObservable(ABC):
    @abstractmethod
    def add_observer(self, observer: IObserver, key: str):
        raise NotImplementedError
    
    @abstractmethod
    def remove_observer(self, observer: IObserver, key: str):
        raise NotImplementedError
    
    @abstractmethod
    def update_observers(self, event: Event, key: str):
        raise NotImplementedError

### Implementing Concrete Versions for Single Threaded Use Case
Now that we have the necessary building blocks we can implement a single threaded version of the pattern and profile
its performance.

In [14]:
UPDATE_DELAY = 0.5


class Observer(IObserver):
    def __init__(self, name: str):
        self.name = name
        self.totals = defaultdict(float)

    def update(self, event: Event, key: str):
        sleep(UPDATE_DELAY)
        print(f'{self.name} received event "{event.name}" for {key} in thread: {currentThread().getName()}')
        print(f'Event data: {event.data}')
        self.totals[key] += event.data.get(key, 0)
        print(f'Total for {key}: {self.totals[key]}')

Our concrete `Observer` has a simple `update` method that will inform us about the received event and store total of
values received in the data for each key. We are adding delay to `update` to simulate a computationally expensive
execution and check how our class performs later on in different implementations.

In [15]:
class Observable(IObservable):
    def __init__(self):
        self.observers_map: DefaultDict[str, List[Observer]] = defaultdict(list)

    def add_observer(self, observer: Observer, key: str):
        self.observers_map[key].append(observer)

    def remove_observer(self, observer: Observer, key: str):
        if observer in self.observers_map[key]:
            self.observers_map[key].remove(observer)

    def update_observers(self, event: Event, key: str):
        for observer in self.observers_map[key]:
            observer.update(event=event, key=key)

The concrete version of the `Observable` contains a map of observers that will be added by key (category).
We have implementations of the basic methods that will allow adding, removing and updating observers.

### Testing performance of single threaded implementation  of the Observer Pattern

We can easily argue that adding and removing observers are trivial and do not require performance testing since access
to the dictionary is a O(1) operation.

The most important test for us is updating large quantity of observers. As we can guess it will be a linear relationship
growing proportionally with the number of observers added. Here we should point out that the update method may be taking
an unknown time to execute as individual observer will have its own implementation of it. To make matters simple we will
add some constant delay to update method so that later we can compare single and multi threaded performance.

In [16]:
# Build an array of observers
NUMBER_OF_OBSERVERS = 10
observers = []
for i in range(NUMBER_OF_OBSERVERS):
    observer = Observer(name=f'Observer-{i}')
    observers.append(observer)

# Create an instance of Observable
observable = Observable()

# Add observers to the observable instance.
for observer in observers:
    observable.add_observer(observer=observer, key=EventKeys.EXPENSES)

# Generate an event
expenses_event = Event(name='Adding Expenses', data={EventKeys.EXPENSES: 1})

# Measure time it takes to update all observers in the single threaded implementation
single_threaded_update_start = datetime.now()
observable.update_observers(event=expenses_event, key=EventKeys.EXPENSES)
single_threaded_update_stop = datetime.now()

single_threaded_update_duration = single_threaded_update_stop - single_threaded_update_start

Observer-0 received event "Adding Expenses" for expenses
Event data: {'expenses': 1}
Total for expenses: 1.0
Observer-1 received event "Adding Expenses" for expenses
Event data: {'expenses': 1}
Total for expenses: 1.0
Observer-2 received event "Adding Expenses" for expenses
Event data: {'expenses': 1}
Total for expenses: 1.0
Observer-3 received event "Adding Expenses" for expenses
Event data: {'expenses': 1}
Total for expenses: 1.0
Observer-4 received event "Adding Expenses" for expenses
Event data: {'expenses': 1}
Total for expenses: 1.0
Observer-5 received event "Adding Expenses" for expenses
Event data: {'expenses': 1}
Total for expenses: 1.0
Observer-6 received event "Adding Expenses" for expenses
Event data: {'expenses': 1}
Total for expenses: 1.0
Observer-7 received event "Adding Expenses" for expenses
Event data: {'expenses': 1}
Total for expenses: 1.0
Observer-8 received event "Adding Expenses" for expenses
Event data: {'expenses': 1}
Total for expenses: 1.0
Observer-9 received

In [17]:
print(f'Single threaded update takes: {single_threaded_update_duration}')

datetime.timedelta(seconds=5, microseconds=12051)

As expected the single threaded `update` took roughly number of observers * `UPDATE_DELAY` in seconds to complete.

### Implementing multi-threaded version of the Observer Pattern

As we have observed the slowest part of the pattern is updating all of the observers.
In the single threaded version we simply loop over the array of the observers and if any of their individual `update`
methods requires long execution the entire `update_observers` method blocks the thread in which it is running.

It would be nice to be able to add/remove observers from any thread safely as well as make `update_observers` more 
performant.
Unfortunately as soon as we walk into the shared memory land of multi-threaded design we hit a bunch of problems.

In the `Observer` class - we are modifying the dictionary `observers_map` in `add_observer` and
`remove_observer` methods. We are also relying on it being in a non-changing state when iterating over its contents in
`update_observers`.

The problem is that in multi-threaded execution the individual threads are switched by the operating system
(OS) in a non-deterministic way (preemptive switching). This interleaving of execution can lead to a corrupted state
(as explained below) if thread safety is not implemented correctly due to **race conditions**.

Race conditions occur when 2 or more threads attempt reading and writing the same memory.

Lastly our observers if allowed to work in multiple threads have to be implemented in a thread-safe
way since the `Observable` cannot know the details of each possible `Observer` and its `update` method
(and should not be concerned with it).
Thus we put the onus of making sure that the observers can be updated from multiple threads on themselves.

What will happen when we allow any combination of threads to call those methods in any order?

### Example of possible complications in multi-threading:

Let's discuss one of multiple issues that can occur when calling `add_observer` from multiple threads.

We have `Thread A` and `Thread B`, there are no registered observers on our `Observable` instance
(`observers_map` is empty).

Both threads are adding an observer each.

Keep in mind that the OS will use preemptive switching - meaning the threads are being changed by the OS at certain
intervals and may interrupt execution of any statement. Also note that even simplest of statements in Python which looks
like one operation in reality is almost always a multi step operation at the CPU level **which may be interrupted at
any time** before its completion (non atomic instruction).

 Here is what can potentially go wrong:
 
 1. `Thread A` calls `add_observer(observer_1, key_1)`, the `observers_map` is still empty.
 2. Since it is empty we are adding an entry to it at `key_1`
 3. Python creates a mapping for the `key_1` pointing at an empty list initially (`defaultdict(list)`).
 4. OS switches threads.
 5. `Thread B` calls `add_observer(observer_2, key_1)`, the `observers_map` has just a `key_1` pointing to an empty
 list! We have not finished adding the `observer_1` yet!
 6. `observer_2` gets appended to the list.
 7. OS switches threads - remember, this means we restore the registers, stack etc - so we go back to just having a 
 mapping from `key_1` to an empty list that we got in point 3.
 8. `observer_1` gets appended to the list under `key_1`. We have lost `observer_2` forever...
 
 We ended up with a corrupted state of the `observers_map`.
 This is an incredibly rare occurrence, nonetheless our code will be prone to intractable errors.
 
 This is only one of the potential problems. We can have similar issues when removing observers or calling update 
 method and also our observers if used from multiple threads present identical issue.

### Goals when implementing multi-threaded Observer Pattern

* `add_observer`, `remove_observer` and `update_observers` can be safely called from any thread.
* `add_observer` and `remove_observer` require **write access** to `observers_map`. This should be **exclusive to one
thread** and **prohibit read access** to any other thread.
* Since `update_observers` is a read-only method from the point of view of the `Observable` - we should allow for it to
be executed by any number of threads, **as long as no write access** to the `observers_map` is granted at this time.
* We would prefer for the events to arrive at the observers in the order that they were fired.

### Simple thread-safe Observer Pattern implementation and core concepts of synchronisation

Our first attempt will only solve thread safety part of our goals. We will not achieve any boosts in performance or
consider any other aspects of the design but it will allow us to explain the concepts involved on a concrete example.

This implementation will use the **Monitor Pattern** - each method will be guarded, protecting the data from corruption
by wrapping execution of it in a **mutex** (lock) object.

Mutex (mutually exclusive lock - or simply lock) is one of the basic but very effective **synchronisation primitives**.
Since we cannot allow more than one thread to access a given part of shared memory (in the simple version) to avoid
corruption we need a mechanism to achieve that.

The `Lock` (Mutex) object is a simple flag with two possible states - 0 or 1 and two methods:
* `acquire` - which will block the thread's execution until the lock's state is 0. Once acquired it will change the
lock's state to 1 (lock closed state)
* `release` - which will "open" the lock, changing its state to 0. This allows threads to obtain the lock.

**Note:**

If you are a careful reader (and you looked at the examples of complications of multi-threading) you may question the
actual thread safety of the `Lock` object itself.

Since when a simple flag with two states is guaranteed to be thread safe? Shouldn't it be prone to exactly the same
issues as described above?

Well, the lock object is not just a simple flag - its implementation goes deep all the way into the CPU's **atomic
instruction** called **Compare and Swap** (or Compare and Set).

This instruction is implemented at the **hardware** level (CPU) and guarantees **atomic** (uninterrupted) execution.

What does it do?

It compares a value and sets a variable based on the result of the comparison - which is exactly what we need to have
the `Lock` working, right?

We check if it is 0 (opened) or 1 (closed) and allow or block given thread from executing. When checking we also want
to make sure that **nothing else can modify the lock** - this is what the atomicity of the compare and swap instruction
achieves.

In [None]:
class SimpleMultiThreadedObservableV1(IObservable):
    def __init__(self):
        self.observers_map: DefaultDict[str, List[Observer]] = defaultdict(list)
        self.observers_map_lock: Lock = Lock()

    def add_observer(self, observer: Observer, key: str):
        self.observers_map_lock.acquire()
        self.observers_map[key].append(observer)
        self.observers_map_lock.release()

    def remove_observer(self, observer: Observer, key: str):
        self.observers_map_lock.acquire()
        if observer in self.observers_map[key]:
            self.observers_map[key].remove(observer)
        self.observers_map_lock.release()

    def update_observers(self, event: Event, key: str):
        self.observers_map_lock.acquire()
        for observer in self.observers_map[key]:
            observer.update(event=event, key=key)
        self.observers_map_lock.release()

The `SimpleMultiThreadedObservable` implements all the methods of `IObservable` in a thread-safe manner using a monitor
pattern. Each of the methods wraps it actual code in the `obervers_map_lock` acquisition and release.

Since we are using Python we can further simplify lock operations by using the `with` statement for the lock which
performs acquisition of the lock before the `with` statement's block of code is executed and releases the lock after.

In [None]:
class SimpleMultiThreadedObservableV2(IObservable):
    def __init__(self):
        self.observers_map: DefaultDict[str, List[Observer]] = defaultdict(list)
        self.observers_map_lock: Lock = Lock()

    def add_observer(self, observer: Observer, key: str):
        with self.observers_map_lock:
            self.observers_map[key].append(observer)

    def remove_observer(self, observer: Observer, key: str):
        with self.observers_map_lock:
            if observer in self.observers_map[key]:
                self.observers_map[key].remove(observer)

    def update_observers(self, event: Event, key: str):
        with self.observers_map_lock:
            for observer in self.observers_map[key]:
                observer.update(event=event, key=key)

### Arguing thread safety of SimpleMultiThreadedObservable

Since at any given time access to the `observers_map` is protected by the mutex `observers_map_lock` we can guarantee
that at maximum 1 thread is allowed to read or write this data. Therefore we can allow this class to be used in a multi
 threaded environment. We are not considering thread safety of the `update` methods of individual `Observer`s.

### Performance tests of SimpleMultiThreadedObservable

In [8]:
# Create an instance of Observable
observable = SimpleMultiThreadedObservableV2()

# Build an array of observers and add them to the observable using multiple threads to confirm it works properly.
NUMBER_OF_OBSERVERS = 10
threads = []
for i in range(NUMBER_OF_OBSERVERS):
    observer = Observer(name=f'Observer-{i}')
    thread = Thread(target=observable.add_observer, kwargs={'observer': observer, 'key': EventKeys.EXPENSES})
    thread.start()

# Make sure all the observers are added and threads completed before progressing further.
for thread in threads:
    thread.join()

# Generate an event.
expenses_event = Event(name='Adding Expenses', data={EventKeys.EXPENSES: 1})

# Measure time it takes to update all observers in the simple multi threaded implementation.
simple_multi_threaded_update_start = datetime.now()

# We will execute update_observers in a separate thread to confirm it works properly.
update_thread = Thread(
    target=observable.update_observers,
    kwargs={'event': expenses_event, 'key': EventKeys.EXPENSES},
    name='Update Thread'
)
update_thread.start()
update_thread.join()

simple_multi_threaded_update_stop = datetime.now()

simple_multi_threaded_update_duration = simple_multi_threaded_update_stop - simple_multi_threaded_update_start

NameError: name 'SimpleMultiThreadedObservableV2' is not defined

In [None]:
print(f'Simple multi threaded update takes: {simple_multi_threaded_update_duration}')

As we can see we can execute `SimpleMultiThreadedObservableV2`'s methods from different threads but we did not really
gain any performance when updating the observers - in fact, because thread instantiation and switching takes a very
considerable amount of CPU clock cycles we actually lose a bit of performance when using more than one thread.
The class though is thread safe now.

### Improving performance of `update_observers` method

So far the main loop of `update_observers` method was executed in the same thread that it was called in but we can think
of another approach. What we can attempt to do is for the `update_observers` method to create a **pool of threads** 
which will call each observer's `update` method concurrently allowing us to gain the benefits of multi-threading for 
real.

Why create a pool of threads and not just make as many of them as we have observers? This is because creating,
switching and most importantly synchronising threads is very expensive and unrestrained number of threads can actually
cost us performance-wise instead of benefit (due to memory consumption, and having to restore state every time the
threads are switched)

We could attempt to create such a pool ourselves but in the spirit of "do not reinvent the wheel" we will use a
built-in, tried and tested `ThreadPoolExecutor` class and `map` method for this purpose. The `map` method will allow to
retain the order of results from `update` calls matching the order that the observers are stored in the `observers_map`.
Currently we do not return anything from `update` but this can be an important consideration in the future so we better
build we it in mind already as it does not cost us more time to implement.

After careful reading of `ThreadPoolExecutor`'s documentation we should be aware that if any of the observers
being notified depend on results of another observer's `update` method we would **deadlock** if using
`ThreadPoolExecutor` as they would be forever waiting for each other to complete.

The deadlock situation occurs when 2 or more threads are waiting for each other in a cycle to release some
resource(s). Most simple deadlock happens when Thread A requires a lock from Thread B and vice versa.

Since our observers are extremely simple we can ignore this potential issue (but it only shows how tricky it is to get
multi-threading right).

1. Threadpoolexecutor inside update_observers
2. Still hold the lock - allows only 1 execution of update_observers at any time
3. Improvement - store events/keys in a queue
4. Take items from the queue and if read lock allows update observers.
5. Read lock allows multiple reads at the same time so we can allow multiple calls of update_observers from multiple 
threads and build that event/key queue for later using in threadpool. 
