Skip to content

Commit

Permalink
Merge 15754e0 into a10cc1c
Browse files Browse the repository at this point in the history
  • Loading branch information
dbrattli committed Jan 21, 2023
2 parents a10cc1c + 15754e0 commit dc707d5
Show file tree
Hide file tree
Showing 9 changed files with 329 additions and 66 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Expand Up @@ -30,7 +30,7 @@ repos:
repo: local
- hooks:
- id: mypy
exclude: (^docs/|^examples/|^notebooks/|^tests/|^reactivex/operators/_\w.*\.py$)
exclude: (^docs/|^examples/|^notebooks/|^tests/|^reactivex/operators/_\w.*\.py$|^reactivex/curry\.py$)
repo: https://github.com/pre-commit/mirrors-mypy
rev: v0.942

7 changes: 7 additions & 0 deletions docs/contributing.rst
Expand Up @@ -11,3 +11,10 @@ Please register any issues to `ReactiveX/RxPY/issues <https://github.com/Reactiv

Please submit any pull requests against the
`master <https://github.com/ReactiveX/RxPY/tree/master>`_ branch.

Before submitting the pull requets, be sure to run

.. code:: shell
poetry run pre-commit run --all-files --show-diff-on-failure
poetry run coverage run -m pytest && poetry run coverage report -m
24 changes: 24 additions & 0 deletions docs/get_started.rst
Expand Up @@ -221,6 +221,30 @@ Output:
Received delta
Received epsilon
Version x.x.x introduces the ``curry_flip`` decorator to make the creation of custom
operators less verbose:

.. code:: python
import reactivex
@curry_flip(1)
def lowercase(source):
def subscribe(observer, scheduler = None):
def on_next(value):
observer.on_next(value.lower())
return source.subscribe(
on_next,
observer.on_error,
observer.on_completed,
scheduler=scheduler)
return reactivex.create(subscribe)
When building more complex operators which take arguments or even optional arguments,
``curry_flip`` allows to always keep the ``source`` as first argument in the definition.


Concurrency
-----------

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Expand Up @@ -63,7 +63,7 @@ float_to_top=true
python_version = "3.9"
follow_imports = "silent"
files = ["reactivex"]
exclude = ["reactivex/operators/_\\w.*\\.py$"] # mypy will eventually catch up
exclude = ["reactivex/operators/_\\w.*\\.py$", "reactivex/curry\\.py$"] # mypy will eventually catch up
disallow_any_generics = true
disallow_untyped_defs = true

Expand Down
218 changes: 218 additions & 0 deletions reactivex/curry.py
@@ -0,0 +1,218 @@
# This file is borrowed from the Expression library, licensed under the MIT license.
# https://github.com/cognitedata/Expression
from typing import Any, Callable, Tuple, TypeVar, overload

from typing_extensions import Concatenate, Literal, ParamSpec

_P = ParamSpec("_P")
_A = TypeVar("_A")
_B = TypeVar("_B")
_C = TypeVar("_C")
_D = TypeVar("_D")
_E = TypeVar("_E")

_Arity = Literal[0, 1, 2, 3, 4]


def _curry(
args: Tuple[Any, ...], arity: int, fun: Callable[..., Any]
) -> Callable[..., Any]:
def wrapper(*args_: Any, **kw: Any) -> Any:
if arity == 1:
return fun(*args, *args_, **kw)
return _curry(args + args_, arity - 1, fun)

return wrapper


@overload
def curry(num_args: Literal[0]) -> Callable[[Callable[_P, _B]], Callable[_P, _B]]:
...


@overload
def curry(
num_args: Literal[1],
) -> Callable[[Callable[Concatenate[_A, _P], _B]], Callable[[_A], Callable[_P, _B]]]:
...


@overload
def curry(
num_args: Literal[2],
) -> Callable[
[Callable[Concatenate[_A, _B, _P], _C]],
Callable[
[_A],
Callable[
[_B],
Callable[_P, _C],
],
],
]:
...


@overload
def curry(
num_args: Literal[3],
) -> Callable[
[Callable[Concatenate[_A, _B, _C, _P], _D]],
Callable[
[_A],
Callable[
[_B],
Callable[
[_C],
Callable[_P, _D],
],
],
],
]:
...


@overload
def curry(
num_args: Literal[4],
) -> Callable[
[Callable[Concatenate[_A, _B, _C, _D, _P], _E]],
Callable[
[_A],
Callable[
[_B],
Callable[
[_C],
Callable[[_D], Callable[_P, _E]],
],
],
],
]:
...


def curry(num_args: _Arity) -> Callable[..., Any]:
"""A curry decorator.
Makes a function curried.
Args:
num_args: The number of args to curry from the start of the
function
Example:
>>> @curry(1)
... def add(a: int, b: int) -> int:
... return a + b
>>>
>>> assert add(3)(4) == 7
"""

def wrapper(fun: Callable[..., Any]) -> Callable[..., Any]:
return _curry((), num_args + 1, fun)

return wrapper


@overload
def curry_flip(
num_args: Literal[0],
) -> Callable[[Callable[_P, _A]], Callable[_P, _A]]:
...


@overload
def curry_flip(
num_args: Literal[1],
) -> Callable[[Callable[Concatenate[_A, _P], _B]], Callable[_P, Callable[[_A], _B]]]:
...


@overload
def curry_flip(
num_args: Literal[2],
) -> Callable[
[Callable[Concatenate[_A, _B, _P], _C]],
Callable[
_P,
Callable[
[_A],
Callable[[_B], _C],
],
],
]:
...


@overload
def curry_flip(
num_args: Literal[3],
) -> Callable[
[Callable[Concatenate[_A, _B, _C, _P], _D]],
Callable[
_P,
Callable[
[_A],
Callable[
[_B],
Callable[[_C], _D],
],
],
],
]:
...


@overload
def curry_flip(
num_args: Literal[4],
) -> Callable[
[Callable[Concatenate[_A, _B, _C, _D, _P], _E]],
Callable[
_P,
Callable[
[_A],
Callable[
[_B],
Callable[[_C], Callable[[_D], _E]],
],
],
],
]:
...


def curry_flip(
num_args: _Arity,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
"""A flipped curry decorator.
Makes a function curried, but flips the curried arguments to become
the last arguments. This is very nice when having e.g optional
arguments after a source argument that will be piped.
Args:
num_args: The number of args to curry from the start of the
function
Example:
>>> @curry_flip(1)
... def map(source: List[int], mapper: Callable[[int], int]):
... return [mapper(x) for x in source]
>>>
>>> ys = pipe(xs, map(lambda x: x * 10))
"""

def _wrap_fun(fun: Callable[..., Any]) -> Callable[..., Any]:
def _wrap_args(*args: Any, **kwargs: Any) -> Callable[..., Any]:
def _wrap_curried(*curry_args: Any) -> Any:
return fun(*curry_args, *args, **kwargs)

return _curry((), num_args, _wrap_curried)

return _wrap_args if num_args else fun

return _wrap_fun


__all__ = ["curry", "curry_flip"]
18 changes: 10 additions & 8 deletions reactivex/operators/__init__.py
Expand Up @@ -28,6 +28,7 @@
compose,
typing,
)
from reactivex.curry import curry_flip
from reactivex.internal.basic import identity
from reactivex.internal.utils import NotSet
from reactivex.subject import Subject
Expand Down Expand Up @@ -454,9 +455,10 @@ def concat(*sources: Observable[_T]) -> Callable[[Observable[_T]], Observable[_T
return concat_(*sources)


@curry_flip(1)
def concat_map(
project: Mapper[_T1, Observable[_T2]]
) -> Callable[[Observable[_T1]], Observable[_T2]]:
source: Observable[_T1], project: Mapper[_T1, Observable[_T2]]
) -> Observable[_T2]:
"""Projects each source value to an Observable which is merged in the
output Observable, in a serialized fashion waiting for each one to complete
before merging the next.
Expand Down Expand Up @@ -488,7 +490,7 @@ def concat_map(
"""

return compose(map(project), merge(max_concurrent=1))
return source.pipe(map(project), merge(max_concurrent=1))


def contains(
Expand Down Expand Up @@ -1905,8 +1907,8 @@ def max_by(


def merge(
*sources: Observable[Any], max_concurrent: Optional[int] = None
) -> Callable[[Observable[Any]], Observable[Any]]:
*sources: Observable[_T], max_concurrent: Optional[int] = None
) -> Callable[[Observable[Observable[_T]]], Observable[_T]]:
"""Merges an observable sequence of observable sequences into an
observable sequence, limiting the number of concurrent
subscriptions to inner sequences. Or merges two observable
Expand All @@ -1930,9 +1932,9 @@ def merge(
observable sequence.
Returns:
An operator function that takes an observable source and
returns the observable sequence that merges the elements of the
inner sequences.
An operator function that takes an observable source and returns
the observable sequence that merges the elements of the inner
sequences.
"""
from ._merge import merge_

Expand Down
37 changes: 17 additions & 20 deletions reactivex/operators/_buffer.py
Expand Up @@ -2,6 +2,7 @@

from reactivex import Observable, compose
from reactivex import operators as ops
from reactivex.curry import curry_flip

_T = TypeVar("_T")

Expand Down Expand Up @@ -33,9 +34,10 @@ def buffer_toggle_(
)


@curry_flip(1)
def buffer_with_count_(
count: int, skip: Optional[int] = None
) -> Callable[[Observable[_T]], Observable[List[_T]]]:
source: Observable[_T], count: int, skip: Optional[int] = None
) -> Observable[List[_T]]:
"""Projects each element of an observable sequence into zero or more
buffers which are produced based on element count information.
Expand All @@ -54,27 +56,22 @@ def buffer_with_count_(
observable sequence of buffers.
"""

def buffer_with_count(source: Observable[_T]) -> Observable[List[_T]]:
nonlocal skip

if skip is None:
skip = count

def mapper(value: Observable[_T]) -> Observable[List[_T]]:
return value.pipe(
ops.to_list(),
)
if skip is None:
skip = count

def predicate(value: List[_T]) -> bool:
return len(value) > 0

return source.pipe(
ops.window_with_count(count, skip),
ops.flat_map(mapper),
ops.filter(predicate),
def mapper(value: Observable[_T]) -> Observable[List[_T]]:
return value.pipe(
ops.to_list(),
)

return buffer_with_count
def predicate(value: List[_T]) -> bool:
return len(value) > 0

return source.pipe(
ops.window_with_count(count, skip),
ops.flat_map(mapper),
ops.filter(predicate),
)


__all__ = ["buffer_", "buffer_with_count_", "buffer_when_", "buffer_toggle_"]

0 comments on commit dc707d5

Please sign in to comment.