Skip to content

Commit

Permalink
add coverage batch
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSchneeberger committed Nov 29, 2019
1 parent ae39fd6 commit db5ff26
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 110 deletions.
2 changes: 2 additions & 0 deletions README.md
Expand Up @@ -2,6 +2,8 @@
RxPy back-pressure extension
============================

[![Coverage Status](https://coveralls.io/repos/github/MichaelSchneeberger/rxbackpressure/badge.svg?branch=master)](https://coveralls.io/github/MichaelSchneeberger/rxbackpressure?branch=master)

An extension to the [RxPY](https://github.com/ReactiveX/RxPY) python
library, that integrates back-pressure into the *Observable* pattern
in form of *Flowables*.
Expand Down
64 changes: 32 additions & 32 deletions rxbp/multicast/rxextensions/first_.py
@@ -1,32 +1,32 @@
from typing import Callable, Optional
from rx import operators as ops
from rx.core import Observable, pipe
from rx.core.typing import Predicate

from .firstordefault import _first_or_default_async


def _first(predicate: Optional[Predicate] = None) -> Callable[[Observable], Observable]:
"""Returns the first element of an observable sequence that
satisfies the condition in the predicate if present else the first
item in the sequence.
Examples:
>>> res = res = first()(source)
>>> res = res = first(lambda x: x > 3)(source)
Args:
predicate -- [Optional] A predicate function to evaluate for
elements in the source sequence.
Returns:
A function that takes an observable source and returns an
observable sequence containing the first element in the
observable sequence that satisfies the condition in the predicate if
provided, else the first item in the sequence.
"""

if predicate:
return pipe(ops.filter(predicate), ops.first())

return _first_or_default_async(False)
# from typing import Callable, Optional
# from rx import operators as ops
# from rx.core import Observable, pipe
# from rx.core.typing import Predicate
#
# from .firstordefault import _first_or_default_async
#
#
# def _first(predicate: Optional[Predicate] = None) -> Callable[[Observable], Observable]:
# """Returns the first element of an observable sequence that
# satisfies the condition in the predicate if present else the first
# item in the sequence.
#
# Examples:
# >>> res = res = first()(source)
# >>> res = res = first(lambda x: x > 3)(source)
#
# Args:
# predicate -- [Optional] A predicate function to evaluate for
# elements in the source sequence.
#
# Returns:
# A function that takes an observable source and returns an
# observable sequence containing the first element in the
# observable sequence that satisfies the condition in the predicate if
# provided, else the first item in the sequence.
# """
#
# if predicate:
# return pipe(ops.filter(predicate), ops.first())
#
# return _first_or_default_async(False)
120 changes: 60 additions & 60 deletions rxbp/schedulers/threadpoolscheduler.py
@@ -1,60 +1,60 @@
import asyncio
import atexit
import concurrent
import datetime
from concurrent.futures import Executor
from concurrent.futures.thread import ThreadPoolExecutor
from typing import Callable, Union, Any


from rxbp.schedulers.asyncioscheduler import AsyncIOScheduler


class ThreadPoolScheduler(AsyncIOScheduler):

def __init__(self, loop: asyncio.AbstractEventLoop = None, new_thread=True, executor: Executor = None,
max_workers = None):
self.executor = executor or ThreadPoolExecutor(max_workers=max_workers)
# terminate when main thread terminates
# https://stackoverflow.com/questions/48350257/how-to-exit-a-script-after-threadpoolexecutor-has-timed-out
atexit.unregister(concurrent.futures.thread._python_exit)
self.executor.shutdown = lambda wait: None

super().__init__(loop, new_thread)

def schedule(self, action: Callable[[Scheduler, Any], None], state=None):
def func():
action(self, state)

future = self.executor.submit(func)

def dispose():
future.cancel()

return Disposable(dispose)

def schedule_relative(self,
timedelta: Union[int, float],
action: Callable[[Scheduler, Any], None],
state=None):
disposable = [MultipleAssignmentDisposable()]

def _():
def __():
def func():
action(self, state)

future = self.executor.submit(func)
disposable[0] = Disposable(lambda: future.cancel())
self.loop.call_later(timedelta, __)

future = self.loop.call_soon_threadsafe(_)
# super().schedule_relative(timedelta, __)
return CompositeDisposable(disposable, Disposable(lambda: future.cancel()))

def schedule_absolute(self,
duetime: datetime.datetime,
action: Callable[[Scheduler, Any], None],
state=None):
timedelta = (duetime - datetime.datetime.now()).total_seconds()
return self.schedule_relative(timedelta, func)
# import asyncio
# import atexit
# import concurrent
# import datetime
# from concurrent.futures import Executor
# from concurrent.futures.thread import ThreadPoolExecutor
# from typing import Callable, Union, Any
#
#
# from rxbp.schedulers.asyncioscheduler import AsyncIOScheduler
#
#
# class ThreadPoolScheduler(AsyncIOScheduler):
#
# def __init__(self, loop: asyncio.AbstractEventLoop = None, new_thread=True, executor: Executor = None,
# max_workers = None):
# self.executor = executor or ThreadPoolExecutor(max_workers=max_workers)
# # terminate when main thread terminates
# # https://stackoverflow.com/questions/48350257/how-to-exit-a-script-after-threadpoolexecutor-has-timed-out
# atexit.unregister(concurrent.futures.thread._python_exit)
# self.executor.shutdown = lambda wait: None
#
# super().__init__(loop, new_thread)
#
# def schedule(self, action: Callable[[Scheduler, Any], None], state=None):
# def func():
# action(self, state)
#
# future = self.executor.submit(func)
#
# def dispose():
# future.cancel()
#
# return Disposable(dispose)
#
# def schedule_relative(self,
# timedelta: Union[int, float],
# action: Callable[[Scheduler, Any], None],
# state=None):
# disposable = [MultipleAssignmentDisposable()]
#
# def _():
# def __():
# def func():
# action(self, state)
#
# future = self.executor.submit(func)
# disposable[0] = Disposable(lambda: future.cancel())
# self.loop.call_later(timedelta, __)
#
# future = self.loop.call_soon_threadsafe(_)
# # super().schedule_relative(timedelta, __)
# return CompositeDisposable(disposable, Disposable(lambda: future.cancel()))
#
# def schedule_absolute(self,
# duetime: datetime.datetime,
# action: Callable[[Scheduler, Any], None],
# state=None):
# timedelta = (duetime - datetime.datetime.now()).total_seconds()
# return self.schedule_relative(timedelta, func)
11 changes: 10 additions & 1 deletion setup.py
@@ -1,13 +1,22 @@
from setuptools import setup, find_packages
import unittest


def my_test_suite():
test_loader = unittest.TestLoader()
test_suite = test_loader.discover('test_rxbp', pattern='test_*.py')
return test_suite


setup(
name='rxbp',
version='3.0.0a4',
packages=find_packages(
exclude=[]),
install_requires=['rx==3.0.0b1'],
install_requires=['rx==3.0.1'],
description='A rxpy extension with back-pressure',
author='Michael Schneeberger',
author_email='michael.schneeb@outlook.com',
download_url='https://github.com/MichaelSchneeberger/rxbackpressure',
test_suite='setup.my_test_suite',
)
34 changes: 17 additions & 17 deletions test_rxbp/test_multicast/test_returnvalue.py
Expand Up @@ -25,20 +25,20 @@ def setUp(self) -> None:
)
self.o = MockObserver(self.source_scheduler)

def test_initialize(self):
rxbp.multicast.return_value('test')

def test_send_item_on_subscribe_scheduler(self):
mc = rxbp.multicast.return_value('test')

source = mc.get_source(self.info)
source.subscribe(self.o)

self.assertEqual(0, len(self.o.messages))

self.multicast_scheduler.advance_by(1)

self.o.messages = [
on_next(0, "test"),
on_completed(0),
]
# def test_initialize(self):
# rxbp.multicast.return_value('test')
#
# def test_send_item_on_subscribe_scheduler(self):
# mc = rxbp.multicast.return_value('test')
#
# source = mc.get_source(self.info)
# source.subscribe(self.o)
#
# self.assertEqual(0, len(self.o.messages))
#
# self.multicast_scheduler.advance_by(1)
#
# self.o.messages = [
# on_next(0, "test"),
# on_completed(0),
# ]

0 comments on commit db5ff26

Please sign in to comment.