Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
9 contributors

Users who have contributed to this file

@kwonoj @staltz @SangKa @Widdershin @BioPhoton @gsans @fahad19 @benlesh @21042jim
204 lines (157 sloc) 9.71 KB

Operator Creation

There are many ways to create an operator for RxJS. In this version of RxJS, performance was the primary consideration, as such, operator creation in a way that adheres to the existing structures in this library may not be straight forward. This is an attempt to document how to create an operator either for yourself, or for this library.

For how to develop a custom operator for this library, see below.

DIY Custom Operators for End Users

Guidelines

In the most common case, users might like to create an operator to be used only by their app. These can be developed in any way the developer sees fit, but here are some guidelines:

  1. Operators should always return an Observable. You're performing operations on unknown sets of things to create new sets. It only makes sense to return a new set. If you create a method that returns something other than an Observable, it's not an operator, and that's fine.
  2. Be sure to manage subscriptions created inside of the Observable your operator returns. Your operator is going to have to subscribe to the source (or this) inside of the returned Observable, be sure that it's returned as part of unsubscribe handler or subscription.
  3. Be sure to handle exceptions from passed functions. If you're implementing an operator that takes a function as an argument, when you call it, you'll want to wrap it in a try/catch and send the error down the error() path on the observable.
  4. Be sure to teardown scarce resources in your unsubscribe handler of your returned Observable. If you're setting up event handlers or a web socket, or something like that, the unsubscribe handler is a great place to remove that event handler or close that socket.

Example

function mySimpleOperator(someCallback) {
   // We *could* do a `const self = this;` here to close over, but see next comment
   return Observable.create(subscriber => {
     // because we're in an arrow function `this` is from the outer scope.
     const source = this;

     // save our inner subscription
     const subscription = source.subscribe(value => {
       // important: catch errors from user-provided callbacks
       try {
         subscriber.next(someCallback(value));
       } catch(err) {
         subscriber.error(err);
       }
     },
     // be sure to handle errors and completions as appropriate and
     // send them along
     err => subscriber.error(err),
     () => subscriber.complete());

     // to return now
     return subscription;
   });
}

Adding the operator to Observable

There are a few ways to do this. It's really down to needs and preference:

  1. Use the ES7 function bind operator (::) available in transpilers like BabelJS:
someObservable::mySimpleOperator(x => x + '!');
  1. Create your own Observable subclass and override lift to return it:
class MyObservable extends Observable {
  lift(operator) {
    const observable = new MyObservable(); //<-- important part here
    observable.source = this;
    observable.operator = operator;
    return observable;
  }

  // put it here .. or ..
  customOperator() {
    /* do things and return an Observable */
  }
}

// ... put it here...
MyObservable.prototype.mySimpleOperator = mySimpleOperator;
  1. Patch Observable.prototype directly:
Observable.prototype.mySimpleOperator = mySimpleOperator;

// ... and later .../

someObservable.mySimpleOperator(x => x + '!');

Operator as a pure function

If you don't want to patch the Observable prototype, you can also write the operator as a pure function that takes the input Observable as argument, instead of relying on the this keyword.

Example implementation:

function mySimpleOperator(someCallback) {
  // notice that we return a function here
  return function mySimpleOperatorImplementation(source) {
    return Observable.create(subscriber => {
      const subscription = source.subscribe(value => {
        try {
          subscriber.next(someCallback(value));
        } catch(err) {
          subscriber.error(err);
        }
      },
      err => subscriber.error(err),
      () => subscriber.complete());

      return subscription;
   });
  }
}

This can now be used with the pipe() method on the Observable:

const obs = someObservable.pipe(mySimpleOperator(x => x + '!'));

Publish your operator as a separate library

We strongly recommend you to publish your custom operator as an npm package as a pure function to be used with let. See the sections above. RxJS core already has over 100 operators, and we should not add more operators unless they are absolutely essential and add functionality not possible with existing operators.

Publishing it as a separate library will guarantee your operator to be immediately usable by the community, and we can start growing an RxJS ecosystem as opposed to making the RxJS library thicker and heavier. There are cases, however, where the new operator should be added to the core library.

Creating An Operator For Inclusion In This Library

Please publish your operator as a separate library before proposing it into RxJS. See the section above.

To create an operator for inclusion in this library, it's probably best to work from prior art. Something like the filter operator would be a good start. It's not expected that you'll be able to read this section and suddenly be an expert operator contributor.

If you find yourself confused, DO NOT worry. Follow prior examples in the repo, submit a PR, and we'll work with you.

Hopefully the information provided here will give context to decisions made while developing operators in this library. There are a few things to know and (try to) understand while developing operators:

  1. All operator methods are actually created in separate modules from Observable. This is so developers can "build their own observable" by pulling in operator methods and adding them to observable in their own module. It also means operators can be brought in ad-hock and used directly, either with the ES7 function bind operator in Babel (::) or by using it with .call().
  2. Every operator has an Operator class. The Operator class is really a Subscriber "factory". It's what gets passed into the lift method to make the "magic" happen. It's sole job is to create the operation's Subscriber instance on subscription.
  3. Every operator has a Subscriber class. This class does all of the logic for the operation. It's job is to handle values being nexted in (generally by overriding _next()) and forward it along to the destination, which is the next observer in the chain.
    • It's important to note that the destination Observer set on any Subscriber serves as more than just the destinations for the events passing through, If the destination is a Subscriber it also is used to set up a shared underlying Subscription, which, in fact, is also a Subscriber, and is the first Subscriber in the chain.
    • Subscribers all have add and remove methods that are used for adding and removing inner subscriptions to the shared underlying subscription.
    • When you subscribe to an Observable, the functions or Observer you passed are used to create the final destination Subscriber for the chain. It's this Subscriber that is really also the shared Subscription for the operator chain.

Submitting a PR for an Operator

Please complete these steps for each new operator added to RxJS as a pull request:

  • Add the operator to Rx
  • It must have a -spec.ts tests file covering the canonical corner cases, with marble diagram tests
  • If possible, write a asDiagram test case too, for PNG diagram generation purposes
  • The spec file should have a type definition test at the end of the spec to verify type definition for various use cases
  • The operator must be documented in JSDoc style in the implementation file, including also the PNG marble diagram image
  • The operator should be listed in doc/operators.md in a category of operators
  • It should also be inserted in the operator decision tree file doc/decision-tree-widget/tree.yml
  • You may need to update MIGRATION.md if the operator differs from the corresponding one in RxJS v4

Inner Subscriptions

An "inner subscriber" or "inner subscription" is any subscription created inside of an operator's primary Subscriber. For example, if you were to create your own "merge" operator of some sort, the Observables that are arriving on the source observable that you want to "merge" will need to be subscribed to. Those subscriptions will be inner subscriptions. One interesting thing about inner subscriptions in this library is that if you pass and set a destination on them, they will try to use that destination for their unsubscribe calls. Meaning if you call unsubscribe on them, it might not do anything. As such, it's usually desireable not to set the destination of inner subscriptions. An example of this might be the switch operators, that have a single underlying inner subscription that needs to unsubscribe independent of the main subscription.

If you find yourself creating inner subscriptions, it might also be worth checking to see if the observable being passed _isScalar, because if it is, you can pull the value out of it directly and improve the performance of your operator when it's operating over scalar observables. For reference a scalar observable is any observable that has a single static value underneath it. Observable.of('foo') will return a ScalarObservable, likewise, resolved PromiseObservables will act as scalars.

You can’t perform that action at this time.