Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(SafeSubscriber): optimize perf for ordinary observers #6815

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
113 changes: 71 additions & 42 deletions src/internal/Subscriber.ts
Expand Up @@ -147,6 +147,49 @@ function bind<Fn extends (...args: any[]) => any>(fn: Fn, thisArg: any): Fn {
return _bind.call(fn, thisArg);
}

/**
* Internal optimization only, DO NOT EXPOSE.
* @internal
*/
class ConsumerObserver<T> implements Observer<T> {
constructor(private partialObserver: Partial<Observer<T>>) {}

next(value: T): void {
const { partialObserver } = this;
if (partialObserver.next) {
try {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could access the partialObserver via this only once here: const { partialObserver } = this or whatever. There's no need to access it twice. Same for the other methods.

partialObserver.next(value);
} catch (error) {
handleUnhandledError(error);
}
}
}

error(err: any): void {
const { partialObserver } = this;
if (partialObserver.error) {
try {
partialObserver.error(err);
} catch (error) {
handleUnhandledError(error);
}
} else {
handleUnhandledError(err);
}
}

complete(): void {
const { partialObserver } = this;
if (partialObserver.complete) {
try {
partialObserver.complete();
} catch (error) {
handleUnhandledError(error);
}
}
}
}

export class SafeSubscriber<T> extends Subscriber<T> {
constructor(
observerOrNext?: Partial<Observer<T>> | ((value: T) => void) | null,
Expand All @@ -155,65 +198,51 @@ export class SafeSubscriber<T> extends Subscriber<T> {
) {
super();

let next: ((value: T) => void) | undefined;
if (isFunction(observerOrNext)) {
let partialObserver: Partial<Observer<T>>;
if (isFunction(observerOrNext) || !observerOrNext) {
// The first argument is a function, not an observer. The next
// two arguments *could* be observers, or they could be empty.
next = observerOrNext;
} else if (observerOrNext) {
// The first argument is an observer object, we have to pull the handlers
// off and capture the owner object as the context. That is because we're
// going to put them all in a new destination with ensured methods
// for `next`, `error`, and `complete`. That's part of what makes this
// the "Safe" Subscriber.
({ next, error, complete } = observerOrNext);
partialObserver = {
next: observerOrNext ?? undefined,
error: error ?? undefined,
complete: complete ?? undefined,
};
} else {
// The first argument is a partial observer.
let context: any;
if (this && config.useDeprecatedNextContext) {
// This is a deprecated path that made `this.unsubscribe()` available in
// next handler functions passed to subscribe. This only exists behind a flag
// now, as it is *very* slow.
context = Object.create(observerOrNext);
context.unsubscribe = () => this.unsubscribe();
partialObserver = {
next: observerOrNext.next && bind(observerOrNext.next, context),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that we only bind in this case now.

error: observerOrNext.error && bind(observerOrNext.error, context),
complete: observerOrNext.complete && bind(observerOrNext.complete, context),
};
} else {
context = observerOrNext;
// The "normal" path. Just use the partial observer directly.
partialObserver = observerOrNext;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any other partial observer passed is used as-is.

}
next = next && bind(next, context);
error = error && bind(error, context);
complete = complete && bind(complete, context);
}

// Once we set the destination, the superclass `Subscriber` will
// do it's magic in the `_next`, `_error`, and `_complete` methods.
this.destination = {
next: next ? wrapForErrorHandling(next, this) : noop,
error: wrapForErrorHandling(error ?? defaultErrorHandler, this),
complete: complete ? wrapForErrorHandling(complete, this) : noop,
};
// Wrap the partial observer to ensure it's a full observer, and
// make sure proper error handling is accounted for.
this.destination = new ConsumerObserver(partialObserver);
}
}

/**
* Wraps a user-provided handler (or our {@link defaultErrorHandler} in one case) to
* ensure that any thrown errors are caught and handled appropriately.
*
* @param handler The handler to wrap
* @param instance The SafeSubscriber instance we're going to mark if there's an error.
*/
function wrapForErrorHandling(handler: (arg?: any) => void, instance: SafeSubscriber<any>) {
return (...args: any[]) => {
try {
handler(...args);
} catch (err) {
if (config.useDeprecatedSynchronousErrorHandling) {
captureError(err);
} else {
// Ideal path, we report this as an unhandled error,
// which is thrown on a new call stack.
reportUnhandledError(err);
}
}
};
function handleUnhandledError(error: any) {
if (config.useDeprecatedSynchronousErrorHandling) {
captureError(error);
} else {
// Ideal path, we report this as an unhandled error,
// which is thrown on a new call stack.
reportUnhandledError(error);
}
}

/**
* An error handler used when no error handler was supplied
* to the SafeSubscriber -- meaning no error handler was supplied
Expand Down