In [1]:
import nbdev

In [2]:
#| default_exp store

In [16]:
#|export
from __future__ import annotations
from typing import List, Callable, TypeVar,  Generic, Sequence, Union, Optional, Any, Set

### Types

In [14]:
#| export

T = TypeVar("T")

TSubscriber = Callable[[T], None]
TUnsubscriber = Callable[[], None]
TUpdater = Callable[[T], T]
TStartStop = Callable[[TSubscriber], Union[TUnsubscriber, None]]

    
class Readable(Generic[T]):
    def __init__(self, value: T, start: Optional[TStartStop]=None) -> None:
        self.value: T = value
        self.start: Optional[TStartStop] = start
        self.subscribers: set[TSubscriber] = set() # callback list

    def __getattr__(self, name):
        return getattr(self.value, name)

    def __repr__(self) -> str:
        return f"{self.__class__.__name__}({self.value!r})"

    def __str__(self) -> str: return str(self.value)

    def get(self) -> T: return self.value

    def _set(self, new_value: T) -> None:
        if new_value != self.value: self.value = new_value
        for subscriber in self.subscribers: subscriber(self.value)

    def subscribe(self, subscriber: TSubscriber) -> TUnsubscriber:
        self.subscribers.add(subscriber)
        subscriber(self.value)
        if (len(self.subscribers) == 1):
             if self.start is not None: 
                self.stop: Union[TUnsubscriber, None] =  self.start(self._set) 
        def unsubscribe()-> None:
            self.subscribers.remove(subscriber)
        if isinstance(subscriber, Subscriber): subscriber.add_subscription(unsubscribe)
        return unsubscribe

class Writable(Readable[T]):

    def set(self, new_value: T) -> None:
        self._set(new_value)
    
    def update(self, fn: TUpdater) -> None: self.set(fn(self.value))


class Subscriber(Generic[T]):
    """ Represents a subscriber (a callback) to a store (an observable)."""
    def __init__(self, fn: Callable[[T], None], observable: Optional[Union[Readable[T], Writable[T]]]) -> None:
        self.fn = fn
        self.subscriptions: Set[Callable[[], None]] = set()

    def __call__(self, value: T) -> None:
        self.fn(value)

    def __eq__(self, other: Callable[[T], None]) -> bool:
        return self.fn == other.fn

    def __hash__(self) -> int:
        return hash(self.fn)
    
    def __del__(self)-> None:
        for unsubscribe in self.subscriptions: unsubscribe()
        
    def add_subscription(self, unsubscribe: Callable[[], None])-> None:
        self.subscriptions.add(unsubscribe)
    


In [24]:
a = set()
a.add(1), a.add('foo')
a, a[0]

TypeError: 'set' object is not subscriptable

In [31]:
a.add(2)
a.add(object())
a.add(1.0)

In [35]:
[x for x in a]

[2, <object at 0x7facbc0cae20>, 'foo']

In [12]:
Stores = Union[Readable[T], Sequence[Readable[T]]]
Observable = Union[Readable[T], Writable[T]]

In [None]:
def derived (stores: Stores. fn: Callable) -> Readable:
"""Derived value store by synchronizing one or more readable stores and
applying an aggregation function over its input values."""

In [None]:
export function writable<T>(value?: T, start: StartStopNotifier<T> = noop): Writable<T> {
	let stop: Unsubscriber;
	const subscribers: Set<SubscribeInvalidateTuple<T>> = new Set();

	function set(new_value: T): void {
		if (safe_not_equal(value, new_value)) {
			value = new_value;
			if (stop) { // store is ready
				const run_queue = !subscriber_queue.length;
				for (const subscriber of subscribers) {
					subscriber[1]();
					subscriber_queue.push(subscriber, value);
				}
				if (run_queue) {
					for (let i = 0; i < subscriber_queue.length; i += 2) {
						subscriber_queue[i][0](subscriber_queue[i + 1]);
					}
					subscriber_queue.length = 0;
				}
			}
		}
	}

	function update(fn: Updater<T>): void {
		set(fn(value));
	}

	function subscribe(run: Subscriber<T>, invalidate: Invalidator<T> = noop): Unsubscriber {
		const subscriber: SubscribeInvalidateTuple<T> = [run, invalidate];
		subscribers.add(subscriber);
		if (subscribers.size === 1) {
			stop = start(set) || noop;
		}
		run(value);

		return () => {
			subscribers.delete(subscriber);
			if (subscribers.size === 0) {
				stop();
				stop = null;
			}
		};
	}

	return { set, update, subscribe };
}

In [13]:
len(one)


NameError: name 'one' is not defined

In [None]:


class Derived:
    def __init__(self, stores: Stores, fn: Callable[[Any], Any], initial_value: Any):
        self.stores = stores
        self.fn = fn
        self.initial_value = initial_value
        self.value = initial_value
        self.subscribers: List[Callable[[Any], None]] = []
        self._unsubscribe: List[Callable[[], None]] = []
        self._set = self.set
        self.set = self.update

    def set(self, value: Any) -> None:
        self.value = value
        for subscriber in self.subscribers: subscriber(self.value)

    def update(self, fn: Callable[[Any], Any]) -> None:
        self.set(fn(self.value))

    def subscribe(self, subscriber: Callable[[Any], None]) -> Callable[[], None]:
        self.subscribers.append(subscriber)
        subscriber(self.value)
        return lambda: self.subscribers.remove(subscriber)

    def unsubscribe(self) -> None:
        for unsub in self._unsubscribe: unsub()

    def __enter__(self) -> Derived:
        self._unsubscribe = [store.subscribe(self._set) for store in self.stores]
        return self

    def __exit__(self, exc_type, exc_value, traceback) -> None:
        self.unsubscribe()

    def __call__(self) -> Any:
        return self.value

    def __repr__(self) -> str:
        return f"{self.__class__.__name__}({self.stores!r}, {self.fn!r}, {self.initial_value!r})"

    def __str__(self) -> str: return str(self.value)

export function derived<S extends Stores, T>(
	stores: S,
	fn: (values: StoresValues<S>, set: (value: T) => void) => Unsubscriber | void,
	initial_value?: T
): Readable<T>;

/**
 * Derived value store by synchronizing one or more readable stores and
 * applying an aggregation function over its input values.
 *
 * @param stores - input stores
 * @param fn - function callback that aggregates the values
 * @param initial_value - initial value
 */
export function derived<S extends Stores, T>(
	stores: S,
	fn: (values: StoresValues<S>) => T,
	initial_value?: T
): Readable<T>;

/**
 * Derived value store by synchronizing one or more readable stores and
 * applying an aggregation function over its input values.
 *
 * @param stores - input stores
 * @param fn - function callback that aggregates the values
 */
export function derived<S extends Stores, T>(
	stores: S,
	fn: (values: StoresValues<S>) => T
): Readable<T>;

export function derived<T>(stores: Stores, fn: Function, initial_value?: T): Readable<T> {
	const single = !Array.isArray(stores);
	const stores_array: Array<Readable<any>> = single
		? [stores as Readable<any>]
		: stores as Array<Readable<any>>;

	const auto = fn.length < 2;

	return readable(initial_value, (set) => {
		let inited = false;
		const values = [];

		let pending = 0;
		let cleanup = noop;

		const sync = () => {
			if (pending) {
				return;
			}
			cleanup();
			const result = fn(single ? values[0] : values, set);
			if (auto) {
				set(result as T);
			} else {
				cleanup = is_function(result) ? result as Unsubscriber : noop;
			}
		};

		const unsubscribers = stores_array.map((store, i) => subscribe(
			store,
			(value) => {
				values[i] = value;
				pending &= ~(1 << i);
				if (inited) {
					sync();
				}
			},
			() => {
				pending |= (1 << i);
			})
		);

		inited = true;
		sync();

		return function stop() {
			run_all(unsubscribers);
			cleanup();
		};
	});
}

/**
 * Get the current value from a store by subscribing and immediately unsubscribing.
 * @param store readable
 */
export { get_store_value as get };

### Export

In [15]:
nbdev.nbdev_export()

In [5]:
# #| export

# def writable(value: T) -> Writable[T]:
#     """ Create a writable store with a given value that allows both updating and reading by subscription."""

    
#     def set(new_value: T) -> None:
#         nonlocal value
#         if new_value != value: value = new_value
#         for subscriber in subscriber_queue: subscriber(value)

#     def update(fn: Updater[T]) -> None: set(fn(value))

#     def subscribe(subscriber: Subscriber[T]) -> Unsubscriber:
#         subscriber_queue.append(subscriber)
#         subscriber(value)

#         def unsubscribe() -> None:
#             subscriber_queue.remove(subscriber)

#         return unsubscribe

#     ret = Writable() # type: ignore
#     ret.set = set # type: ignore
#     ret.update = update # type: ignore
#     ret.subscribe = subscribe

#     return ret

In [6]:
# #| export

# def readable(value: T) -> Readable[T]:
#     ret = Readable()
#     ret.subscribe = writable(value).subscribe
#     return ret

In [8]:
# class Store:
#     def __init__(self, value):
#         self.value = value
#         self.subscribers = []
#     def subscribe(self, subscriber):
#         self.subscribers.append(subscriber)
#         subscriber(self.value)
#         return lambda: self.subscribers.remove(subscriber)
#     def set(self, value):
#         if value != self.value:
#             self.value = value
#             for subscriber in self.subscribers:
#                 subscriber(value)
#     def update(self, updater):
#         self.set(updater(self.value))