The Reactive Extensions for Python (RxPY)
A library for composing asynchronous and event-based programs using observable collections and LINQ-style query operators in Python
The main repository is at ReactiveX/RxPY. There are currently outdated mirrors at Reactive-Extensions/RxPy and CodePlex. Please register any issues to ReactiveX/RxPY/issues, and make sure your pull requests is made against the develop branch.
About the Reactive Extensions
The Reactive Extensions for Python (RxPY) is a set of libraries for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in Python. Using Rx, developers represent asynchronous data streams with Observables, query asynchronous data streams using LINQ operators, and parameterize the concurrency in the asynchronous data streams using Schedulers. Simply put, Rx = Observables + LINQ + Schedulers.
Whether you are authoring a client-side or server-side application in Python, you have to deal with asynchronous and event-based programming as a matter of course.
Using Rx, you can represent multiple asynchronous data streams (that come from diverse sources, e.g., stock quote, tweets, computer events, web service requests, etc.), and subscribe to the event stream using the Observer object. The Observable notifies the subscribed Observer instance whenever an event occurs.
Because observable sequences are data streams, you can query them using standard LINQ query operators implemented by the Observable type. Thus you can filter, map, reduce, compose and perform time-based operations on multiple events easily by using these static LINQ operators. In addition, there are a number of other reactive stream specific operators that allow powerful queries to be written. Cancellation, exceptions, and synchronization are also handled gracefully by using the methods on the Observable object.
To install RxPY:
pip install rx
pip may be called
pip3 if your are using Python3.
Differences from .NET and RxJS
RxPY is a fairly complete implementation of Rx v2.2 with more than 134 query operators, and over 1100 passing unit-tests. RxPY is mostly a direct port of RxJS, but also borrows a bit from RxNET and RxJava in terms of threading and blocking operators.
RxPY follows PEP 8, so all function and method names are lowercase with words separated by underscores as necessary to improve readability.
Thus .NET code such as:
var group = source.GroupBy(i => i % 3);
need to be written with an
_ in Python:
group = source.group_by(lambda i: i % 3)
With RxPY you should use named keyword arguments instead of positional arguments when an operator has multiple optional arguments. RxPY will not try to detect which arguments you are giving to the operator (or not).
res = Observable.timer(5000) # Yes res = Observable.timer(5000, 1000) # Yes res = Observable.timer(5000, 1000, Scheduler.timeout) # Yes res = Observable.timer(5000, scheduler=Scheduler.timeout) # Yes, but must name res = Observable.timer(5000, Scheduler.timeout) # No, this is an error
Thus when an operator like
Observable.timeout has multiple optional arguments
you should name your arguments. At least the arguments marked as optional.
Disposables implements a context manager so you may use them in
Observable sequences may be concatenated using
+, so you can write:
xs = Observable.from_([1,2,3]) ys = Observable.from_([4,5,6]) zs = xs + ys # Concatenate observables
Observable sequences may be repeated using
*=, so you can write:
xs = Observable.from_([1,2,3]) ys = xs * 4
Observable sequences may be sliced using
[start:stop:step], so you can write:
xs = Observable.from_([1,2,3,4,5,6]) ys = xs[1:-1]
Observable sequences may be turned into an iterator so you can use generator expressions, or iterate over them (uses queueing and blocking).
xs = Observable.from_([1,2,3,4,5,6]) ys = xs.to_blocking() zs = (x*x for x in ys if x > 3) for x in zs: print(x)
In RxPY you can choose to run fully asynchronously or you may decide to schedule work and timeouts using threads.
RxPY also comes with batteries included, and has a number of Python specific mainloop schedulers to make it easier for you to use RxPY with your favorite Python framework.
AsyncIOSchedulerfor use with AsyncIO. (requires Python 3.4 or trollius, a port of
asynciocompatible with Python 2.6-3.5).
IOLoopSchedulerfor use with Tornado IOLoop. See the autocomplete and konamicode examples for howto use RxPY with your Tonado application.
GEventSchedulerfor use with GEvent. (Python 2.7 only).
TwistedSchedulerfor use with Twisted.
TkinterSchedulerfor use with Tkinter. See the timeflies example for howto use RxPY with your Tkinter application.
PyGameSchedulerfor use with PyGame. See the chess example for howto use RxPY with your PyGame application.
QtSchedulerfor use with PyQt4, PyQt5, and PySide. See the timeflies example for howto use RxPY with your Qt application.
You can contribute by reviewing and sending feedback on code checkins, suggesting and trying out new features as they are implemented, register issues and help us verify fixes as they are checked in, as well as submit code fixes or code contributions of your own.
Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. Microsoft Open Technologies would like to thank its contributors, a list of whom are at http://rx.codeplex.com/wikipage?title=Contributors.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.