Skip to content

Commit

Permalink
Merge 2716a45 into 2f81dc3
Browse files Browse the repository at this point in the history
  • Loading branch information
dbrattli committed Feb 20, 2022
2 parents 2f81dc3 + 2716a45 commit ea02ae3
Show file tree
Hide file tree
Showing 263 changed files with 8,506 additions and 4,712 deletions.
49 changes: 28 additions & 21 deletions .github/workflows/pythonpackage.yml
Expand Up @@ -6,29 +6,37 @@ jobs:
build:
strategy:
matrix:
platform: [ubuntu-latest, macos-latest, windows-latest]
python-version: [3.6, 3.7, 3.8, 3.9, pypy-3.7]
platform: [ubuntu-latest, macos-latest, windows-latest]
python-version: [3.7, 3.8, 3.9, "3.10", pypy-3.8]
runs-on: ${{ matrix.platform }}

steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
pip install -r requirements.txt
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E9,F63,F7 --show-source --statistics
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
- name: Test with pytest
run: |
pytest
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
pip install -r requirements.txt
- name: Check formatting
run: |
black --check --verbose rx
isort --check rx
- name: Check type annotations (mypy & pyright)
run: |
pyright rx
mypy rx/__init__.py rx/operators rx/core/abc rx/core/observer rx/core/observable rx/disposable rx/internal rx/subject rx/scheduler
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E9,F63,F7 --show-source --statistics
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
- name: Test with pytest
run: |
pytest -n auto
coverage:
runs-on: ubuntu-latest
steps:
Expand All @@ -49,4 +57,3 @@ jobs:
COVERALLS_SERVICE_NAME: github
run: |
coveralls
4 changes: 4 additions & 0 deletions .gitignore
Expand Up @@ -78,3 +78,7 @@ _build

# Type checkers
.pyre

.ionide/

.python-version
4 changes: 2 additions & 2 deletions README.rst
Expand Up @@ -19,12 +19,12 @@ The Reactive Extensions for Python (RxPY)
*A library for composing asynchronous and event-based programs using observable collections and
query operator functions in Python*

RxPY v3.0
RxPY v4.0
----------------

For v1.X please go to the `v1 branch <https://github.com/ReactiveX/RxPY/tree/release/v1.6.x>`_.

RxPY v3.x runs on `Python <http://www.python.org/>`_ 3.6 or above. To install
RxPY v4.x runs on `Python <http://www.python.org/>`_ 3.7 or above. To install
RxPY:

.. code:: console
Expand Down
62 changes: 34 additions & 28 deletions examples/autocomplete/autocomplete.py
Expand Up @@ -6,11 +6,13 @@
Uses the RxPY IOLoopScheduler.
"""

from asyncio import Future
import os
from typing import Any, Dict, Union

from tornado.websocket import WebSocketHandler
from tornado.web import RequestHandler, StaticFileHandler, Application, url
from tornado.httpclient import AsyncHTTPClient
from tornado.httpclient import AsyncHTTPClient, HTTPResponse
from tornado.httputil import url_concat
from tornado.escape import json_decode
from tornado import ioloop
Expand All @@ -22,52 +24,54 @@
scheduler = IOLoopScheduler(ioloop.IOLoop.current())


def search_wikipedia(term):
def search_wikipedia(term: str) -> Future[HTTPResponse]:
"""Search Wikipedia for a given term"""
url = 'http://en.wikipedia.org/w/api.php'
url = "http://en.wikipedia.org/w/api.php"

params = {
"action": 'opensearch',
"search": term,
"format": 'json'
}
params: Dict[str, str] = {"action": "opensearch", "search": term, "format": "json"}
# Must set a user agent for non-browser requests to Wikipedia
user_agent = "RxPY/3.0 (https://github.com/dbrattli/RxPY; dag@brattli.net) Tornado/4.0.1"
user_agent = (
"RxPY/3.0 (https://github.com/dbrattli/RxPY; dag@brattli.net) Tornado/4.0.1"
)

url = url_concat(url, params)

http_client = AsyncHTTPClient()
return http_client.fetch(url, method='GET', user_agent=user_agent)
return http_client.fetch(url, method="GET", user_agent=user_agent)


class WSHandler(WebSocketHandler):
def open(self):
def open(self, *args: Any):
print("WebSocket opened")

# A Subject is both an observable and observer, so we can both subscribe
# to it and also feed (send) it with new values
self.stream = Subject()
self.subject: Subject[Dict[str, str]] = Subject()

# Get all distinct key up events from the input and only fire if long enough and distinct
searcher = self.stream.pipe(
searcher = self.subject.pipe(
ops.map(lambda x: x["term"]),
ops.filter(lambda text: len(text) > 2), # Only if the text is longer than 2 characters
ops.debounce(0.750), # Pause for 750ms
ops.distinct_until_changed(), # Only if the value has changed
ops.flat_map_latest(search_wikipedia)
ops.filter(
lambda text: len(text) > 2
), # Only if the text is longer than 2 characters
ops.debounce(0.750), # Pause for 750ms
ops.distinct_until_changed(), # Only if the value has changed
ops.flat_map_latest(search_wikipedia),
)

def send_response(x):
def send_response(x: HTTPResponse) -> None:
self.write_message(x.body)

def on_error(ex):
def on_error(ex: Exception) -> None:
print(ex)

searcher.subscribe(send_response, on_error, scheduler=scheduler)
searcher.subscribe(
on_next=send_response, on_error=on_error, scheduler=scheduler
)

def on_message(self, message):
def on_message(self, message: Union[bytes, str]):
obj = json_decode(message)
self.stream.on_next(obj)
self.subject.on_next(obj)

def on_close(self):
print("WebSocket closed")
Expand All @@ -80,15 +84,17 @@ def get(self):

def main():
port = os.environ.get("PORT", 8080)
app = Application([
url(r"/", MainHandler),
(r'/ws', WSHandler),
(r'/static/(.*)', StaticFileHandler, {'path': "."})
])
app = Application(
[
url(r"/", MainHandler),
(r"/ws", WSHandler),
(r"/static/(.*)", StaticFileHandler, {"path": "."}),
]
)
print("Starting server at port: %s" % port)
app.listen(port)
ioloop.IOLoop.current().start()


if __name__ == '__main__':
if __name__ == "__main__":
main()
56 changes: 31 additions & 25 deletions examples/autocomplete/autocomplete_asyncio.py
Expand Up @@ -9,11 +9,13 @@

import os
import asyncio
from asyncio import Future
from typing import Dict, Union

from tornado.platform.asyncio import AsyncIOMainLoop
from tornado.escape import json_decode
from tornado.httputil import url_concat
from tornado.httpclient import AsyncHTTPClient
from tornado.httpclient import AsyncHTTPClient, HTTPResponse
from tornado.web import RequestHandler, StaticFileHandler, Application, url
from tornado.websocket import WebSocketHandler

Expand All @@ -22,22 +24,20 @@
from rx.subject import Subject


def search_wikipedia(term):
def search_wikipedia(term: str) -> Future[HTTPResponse]:
"""Search Wikipedia for a given term"""
url = 'http://en.wikipedia.org/w/api.php'
url = "http://en.wikipedia.org/w/api.php"

params = {
"action": 'opensearch',
"search": term,
"format": 'json'
}
params = {"action": "opensearch", "search": term, "format": "json"}
# Must set a user agent for non-browser requests to Wikipedia
user_agent = "RxPY/1.0 (https://github.com/dbrattli/RxPY; dag@brattli.net) Tornado/4.0.1"
user_agent = (
"RxPY/1.0 (https://github.com/dbrattli/RxPY; dag@brattli.net) Tornado/4.0.1"
)

url = url_concat(url, params)

http_client = AsyncHTTPClient()
return http_client.fetch(url, method='GET', user_agent=user_agent)
return http_client.fetch(url, method="GET", user_agent=user_agent)


class WSHandler(WebSocketHandler):
Expand All @@ -48,26 +48,30 @@ def open(self):

# A Subject is both an observable and observer, so we can both subscribe
# to it and also feed (send) it with new values
self.subject = Subject()
self.subject: Subject[Dict[str, str]] = Subject()

# Get all distinct key up events from the input and only fire if long enough and distinct
searcher = self.subject.pipe(
ops.map(lambda x: x["term"]),
ops.filter(lambda text: len(text) > 2), # Only if the text is longer than 2 characters
ops.debounce(0.750), # Pause for 750ms
ops.distinct_until_changed(), # Only if the value has changed
ops.flat_map_latest(search_wikipedia)
ops.filter(
lambda text: len(text) > 2
), # Only if the text is longer than 2 characters
ops.debounce(0.750), # Pause for 750ms
ops.distinct_until_changed(), # Only if the value has changed
ops.flat_map_latest(search_wikipedia),
)

def send_response(x):
def send_response(x: HTTPResponse) -> None:
self.write_message(x.body)

def on_error(ex):
def on_error(ex: Exception):
print(ex)

searcher.subscribe(send_response, on_error, scheduler=scheduler)
searcher.subscribe(
on_next=send_response, on_error=on_error, scheduler=scheduler
)

def on_message(self, message):
def on_message(self, message: Union[bytes, str]):
obj = json_decode(message)
self.subject.on_next(obj)

Expand All @@ -84,16 +88,18 @@ def main():
AsyncIOMainLoop().make_current()

port = os.environ.get("PORT", 8080)
app = Application([
url(r"/", MainHandler),
(r'/ws', WSHandler),
(r'/static/(.*)', StaticFileHandler, {'path': "."})
])
app = Application(
[
url(r"/", MainHandler),
(r"/ws", WSHandler),
(r"/static/(.*)", StaticFileHandler, {"path": "."}),
]
)

print("Starting server at port: %s" % port)
app.listen(port)
asyncio.get_event_loop().run_forever()


if __name__ == '__main__':
if __name__ == "__main__":
main()
25 changes: 14 additions & 11 deletions examples/konamicode/konamicode.py
@@ -1,4 +1,5 @@
import os
from typing import Dict, Union

from tornado.websocket import WebSocketHandler
from tornado.web import RequestHandler, StaticFileHandler, Application, url
Expand All @@ -18,7 +19,7 @@ def open(self):

# A Subject is both an observable and observer, so we can both subscribe
# to it and also feed (on_next) it with new values
self.subject = Subject()
self.subject: Subject[Dict[str, int]] = Subject()

# Now we take on our magic glasses and project the stream of bytes into
# a ...
Expand All @@ -30,12 +31,12 @@ def open(self):
# 3. stream of booleans, True or False
ops.flat_map(lambda win: win.pipe(ops.sequence_equal(codes))),
# 4. stream of Trues
ops.filter(lambda equal: equal)
ops.filter(lambda equal: equal),
)
# 4. we then subscribe to the Trues, and signal Konami! if we see any
query.subscribe(lambda x: self.write_message("Konami!"))
query.subscribe(on_next=lambda x: self.write_message("Konami!"))

def on_message(self, message):
def on_message(self, message: Union[str, bytes]):
obj = json_decode(message)
self.subject.on_next(obj)

Expand All @@ -50,15 +51,17 @@ def get(self):

def main():
port = os.environ.get("PORT", 8080)
app = Application([
url(r"/", MainHandler),
(r'/ws', WSHandler),
(r'/static/(.*)', StaticFileHandler, {'path': "."})
])
app = Application(
[
url(r"/", MainHandler),
(r"/ws", WSHandler),
(r"/static/(.*)", StaticFileHandler, {"path": "."}),
]
)
print("Starting server at port: %s" % port)
app.listen(port)
app.listen(int(port))
ioloop.IOLoop.current().start()


if __name__ == '__main__':
if __name__ == "__main__":
main()

0 comments on commit ea02ae3

Please sign in to comment.