Skip to content

Commit

Permalink
Merge b27fe5a into d5c1139
Browse files Browse the repository at this point in the history
  • Loading branch information
dbrattli authored Jan 20, 2019
2 parents d5c1139 + b27fe5a commit 46feaf0
Show file tree
Hide file tree
Showing 508 changed files with 14,063 additions and 15,470 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,4 @@ TestResults/Rx.TE.Tests_log.ldf
.mypy_cache/
.vscode/
.noseids
_build
169 changes: 91 additions & 78 deletions README.md

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions doc/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
#
import os
import sys
sys.path.insert(0, os.path.abspath('../rx'))
sys.path.insert(0, os.path.abspath('../'))


# -- Project information -----------------------------------------------------
Expand All @@ -24,9 +24,9 @@
author = 'Dag Brattli'

# The short X.Y version
version = '2.0'
version = '3.0'
# The full version, including alpha/beta/rc tags
release = '2.0.0-alpha1'
release = '3.0.0-alpha1'


# -- General configuration ---------------------------------------------------
Expand Down
4 changes: 4 additions & 0 deletions doc/get_started.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
.. get_started
Get Started
============
18 changes: 10 additions & 8 deletions doc/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@
You can adapt this file completely to your liking, but it should at least
contain the root `toctree` directive.
Welcome to RxPY's documentation!
================================
The Reactive Extensions for Python (RxPY)
==========================================

.. toctree::
:maxdepth: 2
:caption: Contents:
RxPY is a library for composing asynchronous and event-based programs using
observable collections and pipable query operators in Python.

.. automodule:: rx.core
.. toctree::
:maxdepth: 2
:caption: Contents:

.. autoclass:: ObservableBase
:members:
installation
get_started
reference

Indices and tables
==================
Expand Down
4 changes: 4 additions & 0 deletions doc/installation.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
.. Installation
Installation
=============
13 changes: 13 additions & 0 deletions doc/reference.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
.. reference:
Reference
==========


.. toctree::
:name: reference

Observable Factory <reference_observable>
Observer <reference_observer>
Subject <reference_subject>
Operators <reference_operators>
7 changes: 7 additions & 0 deletions doc/reference_observable.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
.. reference_observable:
Observable Factory
=====================

.. automodule:: rx
:members:
7 changes: 7 additions & 0 deletions doc/reference_observer.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
.. reference_observer:
Observer
========

.. autoclass:: rx.core.Observer
:members:
7 changes: 7 additions & 0 deletions doc/reference_operators.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
.. reference_operators:
Operators
=========

.. automodule:: rx.operators
:members:
16 changes: 16 additions & 0 deletions doc/reference_subject.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
.. reference_subject:
Subject
========

.. autoclass:: rx.subjects.Subject
:members:

.. autoclass:: rx.subjects.BehaviorSubject
:members:

.. autoclass:: rx.subjects.ReplaySubject
:members:

.. autoclass:: rx.subjects.AsyncSubject
:members:
18 changes: 10 additions & 8 deletions examples/autocomplete/autocomplete.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from tornado.escape import json_decode
from tornado import ioloop

from rx import operators as ops
from rx.subjects import Subject
from rx.concurrency import IOLoopScheduler

Expand Down Expand Up @@ -48,21 +49,21 @@ def open(self):
self.stream = Subject()

# Get all distinct key up events from the input and only fire if long enough and distinct
query = (self.stream
.map(lambda x: x["term"])
.filter(lambda text: len(text) > 2) # Only if the text is longer than 2 characters
.debounce(0.750) # Pause for 750ms
.distinct_until_changed() # Only if the value has changed
)
searcher = query.flat_map_latest(search_wikipedia)
searcher = self.stream.pipe(
ops.map(lambda x: x["term"]),
ops.filter(lambda text: len(text) > 2), # Only if the text is longer than 2 characters
ops.debounce(0.750), # Pause for 750ms
ops.distinct_until_changed(), # Only if the value has changed
ops.flat_map_latest(search_wikipedia)
)

def send_response(x):
self.write_message(x.body)

def on_error(ex):
print(ex)

searcher.subscribe(send_response, on_error, scheduler=scheduler)
searcher.subscribe_(send_response, on_error, scheduler=scheduler)

def on_message(self, message):
obj = json_decode(message)
Expand All @@ -88,5 +89,6 @@ def main():
app.listen(port)
ioloop.IOLoop.current().start()


if __name__ == '__main__':
main()
40 changes: 21 additions & 19 deletions examples/autocomplete/autocomplete_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,18 @@
"""

import os
import rx
asyncio = rx.config['asyncio']
import asyncio

from tornado.websocket import WebSocketHandler
from tornado.web import RequestHandler, StaticFileHandler, Application, url
from tornado.httpclient import AsyncHTTPClient
from tornado.httputil import url_concat
from tornado.escape import json_decode
from tornado.platform.asyncio import AsyncIOMainLoop
from tornado.escape import json_decode
from tornado.httputil import url_concat
from tornado.httpclient import AsyncHTTPClient
from tornado.web import RequestHandler, StaticFileHandler, Application, url
from tornado.websocket import WebSocketHandler

from rx.subjects import Subject
from rx import operators as ops
from rx.concurrency import AsyncIOScheduler

scheduler = AsyncIOScheduler()
from rx.subjects import Subject


def search_wikipedia(term):
Expand All @@ -44,28 +42,30 @@ def search_wikipedia(term):

class WSHandler(WebSocketHandler):
def open(self):
scheduler = AsyncIOScheduler()

print("WebSocket opened")

# A Subject is both an observable and observer, so we can both subscribe
# to it and also feed (send) it with new values
self.subject = Subject()

# Get all distinct key up events from the input and only fire if long enough and distinct
query = (self.subject
.map(lambda x: x["term"])
.filter(lambda text: len(text) > 2) # Only if the text is longer than 2 characters
.debounce(750) # Pause for 750ms
.distinct_until_changed() # Only if the value has changed
)
searcher = query.flat_map_latest(search_wikipedia)
searcher = self.subject.pipe(
ops.map(lambda x: x["term"]),
ops.filter(lambda text: len(text) > 2), # Only if the text is longer than 2 characters
ops.debounce(0.750), # Pause for 750ms
ops.distinct_until_changed(), # Only if the value has changed
ops.flat_map_latest(search_wikipedia)
)

def send_response(x):
self.write_message(x.body)

def on_error(ex):
print(ex)

searcher.subscribe(send_response, on_error, scheduler=scheduler)
searcher.subscribe_(send_response, on_error, scheduler=scheduler)

def on_message(self, message):
obj = json_decode(message)
Expand All @@ -81,17 +81,19 @@ def get(self):


def main():
AsyncIOMainLoop().install()
AsyncIOMainLoop().make_current()

port = os.environ.get("PORT", 8080)
app = Application([
url(r"/", MainHandler),
(r'/ws', WSHandler),
(r'/static/(.*)', StaticFileHandler, {'path': "."})
])

print("Starting server at port: %s" % port)
app.listen(port)
asyncio.get_event_loop().run_forever()


if __name__ == '__main__':
main()
23 changes: 12 additions & 11 deletions examples/konamicode/konamicode.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from tornado.escape import json_decode
from tornado import ioloop

from rx import operators as ops
from rx.subjects import Subject

UP, DOWN, LEFT, RIGHT, B, A = 38, 40, 37, 39, 66, 65
Expand All @@ -20,18 +21,18 @@ def open(self):

# Now we take on our magic glasses and project the stream of bytes into
# a ...
query = (self.subject
# 1. stream of keycodes
.map(lambda obj: obj["keycode"])
# 2. stream of windows (10 ints long)
.window_with_count(10, 1)
# 3. stream of booleans, True or False
.flat_map(lambda win: win.sequence_equal(codes))
# 4. stream of Trues
.filter(lambda equal: equal)
)
query = self.subject.pipe(
# 1. stream of keycodes
ops.map(lambda obj: obj["keycode"]),
# 2. stream of windows (10 ints long)
ops.window_with_count(10, 1),
# 3. stream of booleans, True or False
ops.flat_map(lambda win: win.pipe(ops.sequence_equal(codes))),
# 4. stream of Trues
ops.filter(lambda equal: equal)
)
# 4. we then subscribe to the Trues, and signal Konami! if we see any
query.subscribe(lambda x: self.write_message("Konami!"))
query.subscribe_(lambda x: self.write_message("Konami!"))

def on_message(self, message):
obj = json_decode(message)
Expand Down
33 changes: 23 additions & 10 deletions examples/timeflies/timeflies_tkinter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from tkinter import Tk, Label, Frame

import rx
from rx import operators as ops
from rx.subjects import Subject
from rx.concurrency import TkinterScheduler

Expand All @@ -13,25 +15,36 @@ def main():

frame = Frame(root, width=600, height=600)

def move(event):
mousemove.on_next(event)
frame.bind("<Motion>", move)
frame.bind("<Motion>", mousemove.on_next)

text = 'TIME FLIES LIKE AN ARROW'
labels = [Label(frame, text=c) for c in text]

def handle_label(i, label):
def on_next(info):
label, ev, i = info
label.place(x=ev.x + i*12 + 15, y=ev.y)

def handle_label(label, i):
label.config(dict(borderwidth=0, padx=0, pady=0))

def on_next(ev):
label.place(x=ev.x + i*12 + 15, y=ev.y)
mousemove.delay(i*100).subscribe(on_next, scheduler=scheduler)
mapper = ops.map(lambda ev: (label, ev, i))
delayer = ops.delay(i*0.1)

return mousemove.pipe(
delayer,
mapper
)

for i, label in enumerate(labels):
handle_label(i, label)
labeler = ops.flat_map_indexed(handle_label)
mapper = ops.map(lambda c: Label(frame, text=c))

rx.from_(text).pipe(
mapper,
labeler
).subscribe_(on_next, on_error=print, scheduler=scheduler)

frame.pack()
root.mainloop()


if __name__ == '__main__':
main()
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,7 @@
" will be run if the condition function returns true.\n",
" else_source -- {Observable} [Optional] The observable sequence or\n",
" Promise that will be run if the condition function returns False.\n",
" If this is not provided, it defaults to rx.Observable.empty\n",
" If this is not provided, it defaults to rx.empty\n",
" scheduler -- [Optional] Scheduler to use.\n",
"\n",
" Returns an observable {Observable} sequence which is either the\n",
Expand Down Expand Up @@ -1019,8 +1019,8 @@
" Returns an empty observable sequence, using the specified scheduler\n",
" to send out the single OnCompleted message.\n",
"\n",
" 1 - res = rx.Observable.empty()\n",
" 2 - res = rx.Observable.empty(rx.Scheduler.timeout)\n",
" 1 - res = rx.empty()\n",
" 2 - res = rx.empty(rx.Scheduler.timeout)\n",
"\n",
" scheduler -- Scheduler to send the termination call on.\n",
"\n",
Expand Down Expand Up @@ -1109,8 +1109,8 @@
" Returns an observable sequence that terminates with an exception,\n",
" using the specified scheduler to send out the single OnError message.\n",
"\n",
" 1 - res = rx.Observable.throw(Exception('Error'))\n",
" 2 - res = rx.Observable.throw(Exception('Error'),\n",
" 1 - res = rx.throw(Exception('Error'))\n",
" 2 - res = rx.throw(Exception('Error'),\n",
" rx.Scheduler.timeout)\n",
"\n",
" Keyword arguments:\n",
Expand Down
Loading

0 comments on commit 46feaf0

Please sign in to comment.