Skip to content

Commit fbeee46

Browse files
committed
feat: add createSubject util
1 parent e3db37a commit fbeee46

File tree

1 file changed

+115
-0
lines changed

1 file changed

+115
-0
lines changed
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/**
2+
* Represents a cleanup function that can be called to unsubscribe from a subscription
3+
*/
4+
export type Teardown = { unsubscribe: () => void };
5+
6+
/**
7+
* Basic observer interface that can receive values
8+
*/
9+
export interface Observer<T> {
10+
next: (value: T) => void;
11+
}
12+
13+
/**
14+
* Subject interface that extends Observer with additional functionality
15+
* A Subject is both an Observable (can be subscribed to) and an Observer (can emit values)
16+
*/
17+
export interface Subject<T> extends Observer<T> {
18+
/** Indicates whether the subject is closed for new subscriptions */
19+
readonly closed: boolean;
20+
/** Completes the subject and clears all observers */
21+
complete: () => void;
22+
/** Checks if there are any active observers */
23+
hasObservers: () => boolean;
24+
/** Number of current observers */
25+
readonly size: number;
26+
/** Subscribe to value updates with either an Observer object or a next function */
27+
subscribe: (obs: Observer<T> | ((v: T) => void)) => Teardown;
28+
/** Unsubscribe all observers */
29+
unsubscribe: () => void;
30+
}
31+
32+
/**
33+
* - Creates a new Subject that can be used to emit values to multiple observers
34+
* Implements a simplified version of RxJS Subject pattern
35+
* @example
36+
* const bus = createSubject<string>();
37+
* const a = bus.subscribe(v => console.log('A:', v));
38+
* const b = bus.subscribe({ next: v => console.log('B:', v) });
39+
* bus.next('hello'); // A: hello / B: hello
40+
* a.unsubscribe();
41+
* bus.next('world'); // B: world
42+
* bus.complete();
43+
*/
44+
export function createSubject<T>(): Subject<T> {
45+
// Set to store all active observers
46+
const observers = new Set<Observer<T>>();
47+
// Flag to track if the subject is closed
48+
let _closed = false;
49+
50+
/**
51+
* Subscribes to the subject with either an Observer object or a next function
52+
* Returns a teardown object that can be used to unsubscribe
53+
*/
54+
const subscribe = (obsOrFn: Observer<T> | ((v: T) => void)): Teardown => {
55+
// If subject is closed, return a no-op unsubscribe
56+
if (_closed) return { unsubscribe: () => {} };
57+
// Convert function to Observer object if needed
58+
const obs: Observer<T> = typeof obsOrFn === 'function' ? { next: obsOrFn } : obsOrFn;
59+
observers.add(obs);
60+
let done = false;
61+
return {
62+
unsubscribe: () => {
63+
if (done) return;
64+
done = true;
65+
observers.delete(obs);
66+
}
67+
};
68+
};
69+
70+
/**
71+
* Emits a new value to all observers
72+
* Takes a snapshot of observers to avoid issues if collection is modified during iteration
73+
*/
74+
const next = (value: T) => {
75+
if (_closed || observers.size === 0) return;
76+
// Take a snapshot to avoid collection modification during iteration
77+
const snapshot = Array.from(observers);
78+
for (const ob of snapshot) {
79+
try {
80+
ob.next?.(value);
81+
} catch (e) {
82+
/* Report or ignore errors */ console.error(e);
83+
}
84+
}
85+
};
86+
87+
/**
88+
* Removes all observers without closing the subject
89+
*/
90+
const unsubscribe = () => {
91+
observers.clear();
92+
};
93+
94+
/**
95+
* Marks the subject as completed and removes all observers
96+
*/
97+
const complete = () => {
98+
_closed = true;
99+
observers.clear();
100+
};
101+
102+
return {
103+
get closed() {
104+
return _closed;
105+
},
106+
complete,
107+
hasObservers: () => observers.size > 0,
108+
next,
109+
get size() {
110+
return observers.size;
111+
},
112+
subscribe,
113+
unsubscribe
114+
};
115+
}

0 commit comments

Comments
 (0)