Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: asReadable now accepts any StoreInput #79

Merged
merged 1 commit into from
Feb 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions src/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -343,11 +343,46 @@ describe('stores', () => {
unsubscribe();
});

it('asReadable should be compatible with rxjs (BehaviorSubject)', () => {
const behaviorSubject = new BehaviorSubject(0);
const store = asReadable(behaviorSubject);
expect(store.subscribe).toBeDefined();
expect(store[symbolObservable]).toBeDefined();
const values: number[] = [];
const unsubscribe = store.subscribe((value) => values.push(value));
expect(values).toEqual([0]);
expect(typeof unsubscribe).toBe('function');
expect(unsubscribe.unsubscribe).toBe(unsubscribe);
behaviorSubject.next(1);
expect(values).toEqual([0, 1]);
unsubscribe();
behaviorSubject.next(2);
expect(values).toEqual([0, 1]);
});

it('get should be compatible with rxjs (BehaviorSubject)', () => {
const store = new BehaviorSubject(0);
expect(get(store)).toBe(0);
});

it('asReadable should be compatible with rxjs (InteropObservable)', () => {
const behaviorSubject = new BehaviorSubject(0);
const interop = { [symbolObservable]: () => behaviorSubject };
const store = asReadable(interop);
expect(store.subscribe).toBeDefined();
expect(store[symbolObservable]).toBeDefined();
const values: number[] = [];
const unsubscribe = store.subscribe((value) => values.push(value));
expect(values).toEqual([0]);
expect(typeof unsubscribe).toBe('function');
expect(unsubscribe.unsubscribe).toBe(unsubscribe);
behaviorSubject.next(1);
expect(values).toEqual([0, 1]);
unsubscribe();
behaviorSubject.next(2);
expect(values).toEqual([0, 1]);
});

it('get should be compatible with rxjs (InteropObservable)', () => {
const b = new BehaviorSubject(1);
const i = { [symbolObservable]: () => b };
Expand Down
59 changes: 37 additions & 22 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,6 @@ export interface InteropObservable<T> {
*/
export type StoreInput<T> = SubscribableStore<T> | InteropObservable<T>;

const getStore = <T>(store: StoreInput<T>): SubscribableStore<T> =>
'subscribe' in store ? store : store[symbolObservable]();

/**
* This interface augments the base {@link SubscribableStore} interface by requiring the return value of the subscribe method to be both a function and an object with the `unsubscribe` method.
* For {@link https://rxjs.dev/api/index/interface/InteropObservable | interoperability with rxjs}, it also implements the `[Symbol.observable]` method.
Expand Down Expand Up @@ -153,6 +150,9 @@ export interface Writable<T, U = T> extends Readable<T> {

const noop = () => {};

const noopUnsubscribe = () => {};
noopUnsubscribe.unsubscribe = noopUnsubscribe;

const bind = <T>(object: T | null | undefined, fnName: keyof T) => {
const fn = object ? object[fnName] : null;
return typeof fn === 'function' ? fn.bind(object) : noop;
Expand All @@ -179,16 +179,32 @@ const returnThis = function <T>(this: T): T {
return this;
};

const normalizeUnsubscribe = (
unsubscribe: Unsubscriber | void | null | undefined
): UnsubscribeFunction & UnsubscribeObject => {
if (!unsubscribe) {
return noopUnsubscribe;
}
if ((unsubscribe as any).unsubscribe === unsubscribe) {
return unsubscribe as any;
}
const res: any =
typeof unsubscribe === 'function' ? () => unsubscribe() : () => unsubscribe.unsubscribe();
res.unsubscribe = res;
return res;
};

/**
* Returns a wrapper (for the given store) which only exposes the {@link Readable} interface.
* This allows to easily expose any store as read-only.
* This converts any {@link StoreInput} to a {@link Readable} and exposes the store as read-only.
*
* @param store - store to wrap
* @returns A wrapper which only exposes the {@link Readable} interface.
*/
export function asReadable<T>(store: Readable<T>): Readable<T> {
export function asReadable<T>(input: StoreInput<T>): Readable<T> {
const store = 'subscribe' in input ? input : input[symbolObservable]();
return {
subscribe: store.subscribe.bind(store),
subscribe: (...args: [Subscriber<T>]) => normalizeUnsubscribe(store.subscribe(...args)),
[symbolObservable]: returnThis,
};
}
Expand All @@ -198,9 +214,6 @@ const queueProcess = Symbol();
let willProcessQueue = false;
const queue = new Set<{ [queueProcess](): void }>();

const callUnsubscribe = (unsubscribe: Unsubscriber) =>
typeof unsubscribe === 'function' ? unsubscribe() : unsubscribe.unsubscribe();

/**
* Batches multiple changes to stores while calling the provided function,
* preventing derived stores from updating until the function returns,
Expand Down Expand Up @@ -273,7 +286,7 @@ export const batch = <T>(fn: () => T): T => {
*/
export function get<T>(store: StoreInput<T>): T {
let value: T;
callUnsubscribe(getStore(store).subscribe((v) => (value = v)));
asReadable(store).subscribe((v) => (value = v))();
return value!;
}

Expand Down Expand Up @@ -317,7 +330,7 @@ const createNotEqualCache = (valueIndex: number): Record<number, boolean> => ({
*/
export abstract class Store<T> implements Readable<T> {
#subscribers = new Set<PrivateSubscriberObject<T>>();
#cleanupFn: null | Unsubscriber = null;
#cleanupFn: null | UnsubscribeFunction = null;
#subscribersPaused = false;
#valueIndex = 1;
#value: T;
Expand All @@ -332,14 +345,14 @@ export abstract class Store<T> implements Readable<T> {
}

#start() {
this.#cleanupFn = this.onUse() || noop;
this.#cleanupFn = normalizeUnsubscribe(this.onUse());
}

#stop() {
const cleanupFn = this.#cleanupFn;
if (cleanupFn) {
this.#cleanupFn = null;
callUnsubscribe(cleanupFn);
cleanupFn();
}
}

Expand Down Expand Up @@ -569,9 +582,6 @@ export interface StoreOptions<T> {
notEqual?: (a: T, b: T) => boolean;
}

const noopUnsubscribe = () => {};
noopUnsubscribe.unsubscribe = noopUnsubscribe;

/**
* A convenience function to create an optimized constant store (i.e. which never changes
* its value). It does not keep track of its subscribers.
Expand Down Expand Up @@ -716,15 +726,17 @@ function isSyncDeriveFn<T, S>(fn: DeriveFn<T, S>): fn is SyncDeriveFn<T, S> {
return fn.length <= 1;
}

const callFn = (fn: () => void) => fn();

export abstract class DerivedStore<T, S extends StoresInput = StoresInput> extends Store<T> {
readonly #isArray: boolean;
readonly #stores: SubscribableStore<any>[];
readonly #stores: Readable<any>[];

constructor(stores: S, initialValue: T) {
super(initialValue);
const isArray = Array.isArray(stores);
this.#isArray = isArray;
this.#stores = (isArray ? [...stores] : [stores]).map(getStore);
this.#stores = (isArray ? [...stores] : [stores]).map(asReadable);
}

protected onUse(): Unsubscriber | void {
Expand All @@ -736,13 +748,13 @@ export abstract class DerivedStore<T, S extends StoresInput = StoresInput> exten
const storesArr = this.#stores;
const dependantValues = new Array(storesArr.length);

let cleanupFn: null | Unsubscriber = null;
let cleanupFn: null | UnsubscribeFunction = null;

const callCleanup = () => {
const fn = cleanupFn;
if (fn) {
cleanupFn = null;
callUnsubscribe(fn);
fn();
}
};

Expand All @@ -754,7 +766,9 @@ export abstract class DerivedStore<T, S extends StoresInput = StoresInput> exten
if (changed) {
changed = 0;
callCleanup();
cleanupFn = this.derive(isArray ? dependantValues : dependantValues[0]) || noop;
cleanupFn = normalizeUnsubscribe(
this.derive(isArray ? dependantValues : dependantValues[0])
);
}
this.resumeSubscribers();
}
Expand Down Expand Up @@ -782,7 +796,7 @@ export abstract class DerivedStore<T, S extends StoresInput = StoresInput> exten
callDerive(true);
const clean = () => {
callCleanup();
unsubscribers.forEach(callUnsubscribe);
unsubscribers.forEach(callFn);
};
(clean as any)[triggerUpdate] = () => {
initDone = false;
Expand All @@ -791,6 +805,7 @@ export abstract class DerivedStore<T, S extends StoresInput = StoresInput> exten
}
callDerive(true);
};
clean.unsubscribe = clean;
return clean;
}

Expand Down