# Simple implementation of reactive patterns in Python

In [1]:
from typing import Callable, List, TypeVar, Generic, Optional, Any, Iterable, overload
import threading
from time import sleep

## Base classes

### Some needed types

In [2]:
T = TypeVar("T")
T1 = TypeVar("T1")
T2 = TypeVar("T2")
T3 = TypeVar("T3")
T4 = TypeVar("T4")
T5 = TypeVar("T5")
T6 = TypeVar("T6")

U = TypeVar("U")
V = TypeVar("V")

NextCallback = Optional[Callable[[T], None]]
ErrorCallback = Optional[Callable[[Exception], None]]
CompleteCallback = Optional[Callable[[], None]]

### Observer

In [3]:
class Observer(Generic[T]):

    def __init__(
            self,
            next: NextCallback[T] = None,
            error: ErrorCallback = None,
            complete: CompleteCallback = None):
        super().__init__()
        self._next = next
        self._error = error
        self._complete = complete
    
    def next(self, value: T) -> None:
        if self._next is not None:
            self._next(value)

    def error(self, error: Exception) -> None:
        if self._error is not None:
            self._error(error)

    def complete(self) -> None:
        if self._complete is not None:
            self._complete()

### Subscription

In [4]:
class Subscription:

    def __init__(self, dispose: Callable[[], None]):
        self._dispose = dispose

    def dispose(self) -> None:
        self._dispose()

### Observable

In [5]:
class Observable(Generic[T]):

    def __init__(self, subscribe: Callable[[Observer[T]], Subscription]):
        super().__init__()
        self._subscribe = subscribe

    def subscribe(
            self,
            next: NextCallback[T] = None,
            error: ErrorCallback = None,
            complete: CompleteCallback = None) -> Subscription:
        
        return self._subscribe(
            Observer(next=next, error=error, complete=complete)
        )
    
    @overload
    def pipe(self,
            __op1: Callable[['Observable[T]'], 'Observable[T1]']) -> 'Observable[T1]': ...
    
    @overload
    def pipe(self, 
            __op1: Callable[['Observable[T]'], 'Observable[T1]'], 
            __op2: Callable[['Observable[T1]'], 'Observable[T2]']) -> 'Observable[T2]': ...
    
    @overload
    def pipe(self, 
            __op1: Callable[['Observable[T]'], 'Observable[T1]'], 
            __op2: Callable[['Observable[T1]'], 'Observable[T2]'], 
            __op3: Callable[['Observable[T2]'], 'Observable[T3]']) -> 'Observable[T3]': ...

    @overload
    def pipe(self, 
            __op1: Callable[['Observable[T]'], 'Observable[T1]'], 
            __op2: Callable[['Observable[T1]'], 'Observable[T2]'], 
            __op3: Callable[['Observable[T2]'], 'Observable[T3]'], 
            __op4: Callable[['Observable[T3]'], 'Observable[T4]']) -> 'Observable[T4]': ...

    @overload
    def pipe(self, 
            __op1: Callable[['Observable[T]'], 'Observable[T1]'], 
            __op2: Callable[['Observable[T1]'], 'Observable[T2]'], 
            __op3: Callable[['Observable[T2]'], 'Observable[T3]'], 
            __op4: Callable[['Observable[T3]'], 'Observable[T4]'], 
            __op5: Callable[['Observable[T4]'], 'Observable[T5]']) -> 'Observable[T5]': ...

    @overload
    def pipe(self, 
            __op1: Callable[['Observable[T]'], 'Observable[T1]'], 
            __op2: Callable[['Observable[T1]'], 'Observable[T2]'], 
            __op3: Callable[['Observable[T2]'], 'Observable[T3]'], 
            __op4: Callable[['Observable[T3]'], 'Observable[T4]'], 
            __op5: Callable[['Observable[T4]'], 'Observable[T5]'], 
            __op6: Callable[['Observable[T5]'], 'Observable[T6]']) -> 'Observable[T6]': ...

    def pipe(self, *operators: Callable[["Observable"], "Observable"]) -> "Observable":
        result = self
        for operator in operators:
            result = operator(result)
        return result

### Subject

In [6]:
class Subject(Generic[T]):

    def __init__(self):
        super().__init__()
        self._observers: List[Observer[T]] = []

    def next(self, value: T) -> None:
        for observer in self._observers:
            observer.next(value=value)

    def error(self, error: Exception) -> None:
        for observer in self._observers:
            observer.error(error=error)

    def complete(self) -> None:
        for observer in self._observers:
            observer.complete()

    def asObservable(self) -> 'Observable[T]':

        def _subscribe(observer: Observer[T]) -> Subscription:
            return self.subscribe(
                next=observer.next,
                error=observer.error,
                complete=observer.complete
            )

        return Observable(subscribe=_subscribe)
    
    def subscribe(
            self,
            next: NextCallback[T] = None,
            error: ErrorCallback = None,
            complete: CompleteCallback = None) -> Subscription:
        
        observer = Observer(next=next, error=error, complete=complete)
        self._observers.append(observer)

        def _remove() -> None:
            if observer in self._observers:
                self._observers.remove(observer)
        
        return Subscription(dispose=_remove)

## Operators

### Take

Takes n first values then disposes the source observable

0-1-2-3-4-5--->
 
  take(3)
 
0-1-2-|------->

In [7]:
def take(n: int) -> Callable[[Observable[T]], Observable[T]]:

    def _take(source: Observable[T]) -> Observable[T]:
        
        def _subscribe(observer: Observer[T]) -> Subscription:
            
            i = 0

            def _next(value: T) -> None:
                nonlocal i
                
                if i < n:
                    observer.next(value=value)
                    i += 1

                    if i == n:
                        subscription.dispose()

            def _error(error: Exception) -> None:
                observer.error(error=error)

            def _complete() -> None:
                observer.complete()

            subscription = source.subscribe(
                next=_next,
                error=_error,
                complete=_complete
            )

            return Subscription(dispose=subscription.dispose)

        return Observable(subscribe=_subscribe)

    return _take

### Map

Maps each resulting value from stream

0-1-2-3-4-5--->
 
map(x -> x * 2)
 
0-2-4-6-8-10--->

In [8]:
def map(func: Callable[[T], U]) -> Callable[[Observable[T]], Observable[U]]:

    def _map(source: Observable[T]) -> Observable[U]:

        def _subscribe(observer: Observer[U]) -> Subscription:
            
            def _next(value: T) -> None:
                observer.next(value=func(value))

            def _error(error: Exception) -> None:
                observer.error(error=error)

            def _complete() -> None:
                observer.complete()

            subscription = source.subscribe(
                next=_next,
                error=_error,
                complete=_complete
            )

            return Subscription(dispose=subscription.dispose)

        return Observable(subscribe=_subscribe)
    
    return _map

### Filter

Filters value from source stream

-5-9-2-7-3-4---->

filter(x -> x > 4)
 
-5-9---7-------->

In [9]:
def filter(test: Callable[[T], bool]) -> Callable[[Observable[T]], Observable[T]]:

    def _filter(source: Observable[T]) -> Observable[T]:

        def _subscribe(observer: Observer[T]) -> Subscription:
            
            def _next(value: T) -> None:
                if test(value):
                    observer.next(value=value)

            def _error(error: Exception) -> None:
                observer.error(error=error)

            def _complete() -> None:
                observer.complete()

            subscription = source.subscribe(
                next=_next,
                error=_error,
                complete=_complete
            )

            return Subscription(dispose=subscription.dispose)

        return Observable(subscribe=_subscribe)
    
    return _filter

### Do

Do something on each value from stream without modifying the stream

0-1-2-3-4-5--->

do(x -> console.log(x))

0-1-2-3-4-5--->

In [10]:
def do(next: NextCallback[T] = None,
        error: ErrorCallback = None,
        complete: CompleteCallback = None) -> Callable[[Observable[T]], Observable[T]]:

    def _do(source: Observable[T]) -> Observable[T]:

        to_do = Observer(
            next=next,
            error=error,
            complete=complete
        )

        def _subscribe(observer: Observer[T]) -> Subscription:
            
            def _next(value: T) -> None:
                to_do.next(value=value)
                observer.next(value=value)

            def _error(error: Exception) -> None:
                to_do.error(error=error)
                observer.error(error=error)

            def _complete() -> None:
                to_do.complete()
                observer.complete()

            subscription = source.subscribe(
                next=_next,
                error=_error,
                complete=_complete
            )

            return Subscription(dispose=subscription.dispose)

        return Observable(subscribe=_subscribe)
    
    return _do

### Take Last

Takes last value of a stream

0-1-2-3-4-5-|->

takeLast()

------------5-|->

In [11]:
def take_last() -> Callable[[Observable[T]], Observable[T]]:

  def _take_last(source: Observable[T]) -> Observable[T]:

        last_value: Optional[T] = None 

        def _subscribe(observer: Observer[T]) -> Subscription:
            
            def _next(value: T) -> None:
                nonlocal last_value
                last_value = value

            def _error(error: Exception) -> None:
                observer.error(error=error)

            def _complete() -> None:
                if last_value is not None:
                  observer.next(value=last_value)
                observer.complete()

            subscription = source.subscribe(
                next=_next,
                error=_error,
                complete=_complete
            )

            return Subscription(dispose=subscription.dispose)

        return Observable(subscribe=_subscribe)
    
  return _take_last

## Generators

### Empty

Returns an observable that completes imediately (empty stream)

-|---------------->

In [12]:
def empty() -> Observable[Any]:

    def _subscribe(observer: Observer[Any]) -> Subscription:
        
        def _dispose() -> None:
            nonlocal is_disposed
            is_disposed = True

        is_disposed = False

        try:
            observer.complete()
        except Exception as e:
            observer.error(e)

        return Subscription(dispose=_dispose)

    return Observable(subscribe=_subscribe)

### Of

Creates an one valued stream

of(5)

-5-|------------------>

In [13]:
def of(value: T) -> Observable[T]:

    def _subscribe(observer: Observer[T]) -> Subscription:
        
        def _dispose() -> None:
            nonlocal is_disposed
            is_disposed = True

        is_disposed = False

        try:
            observer.next(value=value)
            observer.complete()
        except Exception as e:
            observer.error(e)

        return Subscription(dispose=_dispose)

    return Observable(subscribe=_subscribe)

### From Iterable

Creates a stream from an iterable

from_iterable([3,8,5,1])

-3-8-5-1-|-------->

In [14]:
def from_iterable(iterable: Iterable[T]) -> Observable[T]:
    
    def _subscribe(observer: Observer[T]) -> Subscription:
        
        def _dispose() -> None:
            nonlocal is_disposed
            is_disposed = True
        
        def run() -> None:
            try:
                for value in iterable:
                    observer.next(value=value)
                observer.complete()
            except Exception as e:
                observer.error(e)

        is_disposed = False

        threading.Thread(target=run).start()

        return Subscription(dispose=_dispose)

    return Observable(subscribe=_subscribe)

### From Range

Creates a stream from a range

range(4,8)

-4-5-6-7-8-|--->

In [15]:
def from_range(start: int, stop: int, step: int = 1) -> Observable[int]:
    return from_iterable(range(start, stop, step))

### Creates a stream from interval

-0-1-2-3-4-5-6----->

In [16]:
def from_interval(period: int) -> Observable[int]:
    
    def _subscribe(observer: Observer[int]) -> Subscription:
        
        def _dispose() -> None:
            nonlocal is_disposed
            is_disposed = True
        
        def run() -> None:
            nonlocal i

            try:
                while is_disposed == False:
                    observer.next(value=i)
                    i += 1
                    sleep(period)
                observer.complete()
            except Exception as e:
                observer.error(e)

        i = 0
        is_disposed = False

        threading.Thread(target=run).start()

        return Subscription(dispose=_dispose)

    return Observable(subscribe=_subscribe)

## Examples

### Empty stream

In [17]:
empty().subscribe(
    next=lambda o: print(o),
    complete=lambda: print('completed')
)

completed


<__main__.Subscription at 0x1058f18e0>

### Unique valued stream

In [18]:
of('coucou').subscribe(
    next=lambda o: print(o),
    complete=lambda: print('completed')
)

coucou
completed


<__main__.Subscription at 0x1058f0e30>

### Map values

In [19]:
of(5).pipe(map(func=lambda x: 2*x)).subscribe(
    next=lambda o: print(o),
    complete=lambda: print('completed')
)

10
completed


<__main__.Subscription at 0x1058f05f0>

### Takes from range

In [20]:
from_range(1, 10).pipe(take(5)).subscribe(
    next=lambda o: print(o),
    complete=lambda: print('completed')
)


1
2
3
4
5


<__main__.Subscription at 0x1047b2060>

### Filter from range

In [21]:
from_range(1, 10).pipe(filter(lambda o: o%2==0)).subscribe(
    next=lambda o: print(o),
    complete=lambda: print('completed')
)

2
4
6
8
completed


<__main__.Subscription at 0x10589b2c0>

### Takes from interval

In [22]:
from_interval(period=1).pipe(take(3), map(lambda o: o+1)).subscribe(
    next=lambda o: print(o),
    complete=lambda: print('completed')
)

sleep(3)

1
2
3


completed


### Do from range

In [23]:
from_range(1, 10).pipe(
    do(
        next=lambda o: print(o),
        complete=lambda: print('completed')
    ),
    filter(test=lambda o: o%2==0)
).subscribe(
        next=lambda o: print(o),
        complete=lambda: print('completed'))

1
2
2
3
4
4
5
6
6
7
8
8
9
completed
completed


<__main__.Subscription at 0x1058f2ed0>

### Takes last from range

In [24]:
from_range(1, 10).pipe(
    filter(lambda o: o%2==0),
    take_last()
).subscribe(
    next=lambda o: print(o),
    complete=lambda: print('completed')
)

8
completed


<__main__.Subscription at 0x1058f24e0>