Skip to content

Commit

Permalink
feat: Add OperatorSubscriber.
Browse files Browse the repository at this point in the history
  • Loading branch information
cartant committed Nov 5, 2020
1 parent 22e69f1 commit 1fb14a7
Showing 1 changed file with 68 additions and 0 deletions.
68 changes: 68 additions & 0 deletions source/OperatorSubscriber.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/**
* @license Use of this source code is governed by an MIT-style license that
* can be found in the LICENSE file at https://github.com/cartant/rxjs-etc
*/

import { Subscriber } from "rxjs";

export class OperatorSubscriber<
TSource,
TDestination = TSource
> extends Subscriber<TSource> {
constructor(
destination: Subscriber<TSource>,
handlers: {
next?: undefined;
error?: (error: unknown) => void;
complete?: () => void;
}
);
constructor(
destination: Subscriber<TDestination>,
handlers: {
next: (value: TSource) => void;
error?: (error: unknown) => void;
complete?: () => void;
}
);
constructor(
destination: Subscriber<TSource | TDestination>,
handlers: {
next?: (value: TSource) => void;
error?: (error: unknown) => void;
complete?: () => void;
}
) {
super(destination);
const { complete, error, next } = handlers;
if (complete) {
this._complete = () => {
try {
complete();
} catch (caught: unknown) {
destination.error(caught);
}
this.unsubscribe();
};
}
if (error) {
this._error = (received) => {
try {
error(received);
} catch (caught: unknown) {
destination.error(caught);
}
this.unsubscribe();
};
}
if (next) {
this._next = (value: TSource) => {
try {
next(value);
} catch (caught: unknown) {
destination.error(caught);
}
};
}
}
}

0 comments on commit 1fb14a7

Please sign in to comment.