Skip to content

Commit

Permalink
Make inner containers in stream as Maps.
Browse files Browse the repository at this point in the history
  • Loading branch information
Kapelianovych committed Aug 7, 2021
1 parent 523dbc1 commit 06c33df
Showing 1 changed file with 24 additions and 38 deletions.
62 changes: 24 additions & 38 deletions src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,36 +75,32 @@ interface StreamCreationOptions<T> {
* will never receive values from destroyed stream.
*/
_isActive: boolean;
_valueListeners: Array<StreamListener<T>>;
_freezeListeners: Array<VoidFunction>;
_resumeListeners: Array<VoidFunction>;
_destroyListeners: Array<VoidFunction>;
_valueListeners: Map<StreamListener<T>, StreamListener<T>>;
_freezeListeners: Map<VoidFunction, VoidFunction>;
_resumeListeners: Map<VoidFunction, VoidFunction>;
_destroyListeners: Map<VoidFunction, VoidFunction>;
}

const createStream = <T>(
options: StreamCreationOptions<T> = {
_isActive: true,
_valueListeners: [],
_destroyListeners: [],
_freezeListeners: [],
_resumeListeners: [],
}
_valueListeners: new Map(),
_destroyListeners: new Map(),
_freezeListeners: new Map(),
_resumeListeners: new Map(),
},
): Stream<T> => {
const listen = (listener: StreamListener<T>): VoidFunction => {
if (options._isActive) {
options._valueListeners.push(listener);
return () => {
options._valueListeners = options._valueListeners.filter(
(fn) => fn !== listener
);
};
options._valueListeners.set(listener, listener);
return () => options._valueListeners.delete(listener);
} else {
return () => {};
}
};

const derive = <R>(
fn: (derived: Stream<R>) => StreamListener<T>
fn: (derived: Stream<R>) => StreamListener<T>,
): Stream<R> => {
const derived = createStream<R>();
derived.on(StreamEvent.DESTROY, listen(fn(derived)));
Expand All @@ -116,26 +112,16 @@ const createStream = <T>(
if (options._isActive) {
switch (event) {
case StreamEvent.FREEZE:
options._freezeListeners.push(listener);
return () => {
options._freezeListeners = options._freezeListeners.filter(
(fn) => fn !== listener
);
};
options._freezeListeners.set(listener, listener);
return () => options._freezeListeners.delete(listener);
case StreamEvent.RESUME:
options._resumeListeners.push(listener);
return () => {
options._resumeListeners = options._resumeListeners.filter(
(fn) => fn !== listener
);
};
options._resumeListeners.set(listener, listener);
return () => options._resumeListeners.delete(listener);

case StreamEvent.DESTROY:
options._destroyListeners.push(listener);
return () => {
options._destroyListeners = options._destroyListeners.filter(
(fn) => fn !== listener
);
};
options._destroyListeners.set(listener, listener);
return () => options._destroyListeners.delete(listener);

default:
return () => {};
}
Expand Down Expand Up @@ -196,10 +182,10 @@ const createStream = <T>(
options._destroyListeners.forEach((fn) => fn());

options._isActive = false;
options._valueListeners = [];
options._freezeListeners = [];
options._resumeListeners = [];
options._destroyListeners = [];
options._valueListeners = new Map();
options._freezeListeners = new Map();
options._resumeListeners = new Map();
options._destroyListeners = new Map();

return createStream(options);
},
Expand Down

0 comments on commit 06c33df

Please sign in to comment.