Skip to content

Commit

Permalink
Merge branch 'release/1.6.1'
Browse files Browse the repository at this point in the history
  • Loading branch information
dbrattli committed Feb 17, 2018
2 parents 065e974 + 3a02964 commit 44081a1
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 35 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ TestResults/Rx.TE.Tests.mdf
TestResults/Rx.TE.Tests_log.ldf
*.user

.mypy_cache

# Cloud9
.c9

Expand Down
2 changes: 1 addition & 1 deletion rx/core/scheduledobserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def run(self, recurse, state):
parent.has_faulted = True
raise

return self.scheduler.schedule(self.run)
self.scheduler.schedule(self.run)

def dispose(self):
super(ScheduledObserver, self).dispose()
Expand Down
43 changes: 36 additions & 7 deletions rx/internal/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import sys

from rx import AnonymousObservable
from rx.disposables import CompositeDisposable

Expand All @@ -16,12 +18,34 @@ def adapt_call(func):
Adapt call from taking n params to only taking 1 or 2 params
"""

def _should_reraise_TypeError():
"""Determine whether or not we should re-raise a given TypeError. Since
we rely on excepting TypeError in order to determine whether or not we
can adapt a function, we can't immediately determine whether a given
TypeError is from the adapted function's logic (and should be
re-raised) or from argspec mismatches (which should not be re-raised).
Given that this function is private to adapt_call, we know that there will
always be at least two frames in the given traceback:
- frame: (func1 | func2) call
- frame: func call
- frame: (Optional) TypeError in body
and hence we can check for the presence of the third frame, which indicates
whether an error occurred in the body.
"""
_, __, tb = sys.exc_info()

return tb.tb_next.tb_next is not None

cached = [None]

def func1(arg1, *_):
def func1(arg1, *_, **__):
return func(arg1)

def func2(arg1, arg2=None, *_):
def func2(arg1, arg2=None, *_, **__):
return func(arg1, arg2)

def func_wrapped(*args, **kw):
Expand All @@ -32,14 +56,19 @@ def func_wrapped(*args, **kw):
try:
ret = fn(*args, **kw)
except TypeError:
continue
# Preserve the original traceback if there was a TypeError raised
# in the body of the adapted function.
if _should_reraise_TypeError():
raise
else:
continue
else:
cached[0] = fn
return ret
return ret
else:
# We were unable to call the function successfully.
raise TypeError("Couldn't adapt function {}".format(func.__name__))

# Error if we get here. Just call original function to generate a
# meaningful error message
return func(*args, **kw)
return func_wrapped


Expand Down
7 changes: 5 additions & 2 deletions rx/linq/observable/let.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@


@extensionmethod(Observable, alias="let")
def let_bind(self, func):
def let_bind(self, func, **kwargs):
"""Returns an observable sequence that is the result of invoking the
selector on the source sequence, without sharing subscriptions. This
operator allows for a fluent style of writing queries that use the same
Expand All @@ -16,6 +16,9 @@ def let_bind(self, func):
Returns an observable {Observable} sequence that contains the elements
of a sequence produced by multicasting the source sequence within a
selector function.
Any kwargs given will be passed through to the selector. This allows
for a clean syntax when composing with parameterized selectors.
"""

return func(self)
return func(self, **kwargs)
6 changes: 2 additions & 4 deletions rx/linq/observable/scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ def scan(self, accumulator, seed=None):
Returns an observable sequence containing the accumulated values.
"""

has_seed = False
if not seed is None:
has_seed = True
has_seed = seed is not None

source = self

Expand All @@ -33,7 +31,7 @@ def projection(x):
if has_accumulation[0]:
accumulation[0] = accumulator(accumulation[0], x)
else:
accumulation[0] = accumulator(seed, x) if has_seed else x
accumulation[0] = accumulator(seed, x) if has_seed else x
has_accumulation[0] = True

return accumulation[0]
Expand Down
33 changes: 18 additions & 15 deletions rx/linq/observable/slice.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def slice_(self, start=None, stop=None, step=1):
r---e---a---c---t---i---v---e---|
0 1 2 3 4 5 6 7 8
-8 -7 -6 -5 -4 -3 -2 -1
-8 -7 -6 -5 -4 -3 -2 -1 0
Example:
result = source.slice(1, 10)
Expand All @@ -32,20 +32,23 @@ def slice_(self, start=None, stop=None, step=1):

source = self

if start is not None:
if start < 0:
source = source.take_last(abs(start))
else:
source = source.skip(start)
has_start = start is not None
has_stop = stop is not None
has_step = step is not None

if stop is not None:
if stop > 0:
start = start or 0
source = source.take(stop - start)
else:
source = source.skip_last(abs(stop))
if has_stop and stop >= 0:
source = source.take(stop)

if step is not None:
if has_start and start > 0:
source = source.skip(start)

if has_start and start < 0:
source = source.take_last(abs(start))

if has_stop and stop < 0:
source = source.skip_last(abs(stop))

if has_step:
if step > 1:
source = source.filter(lambda x, i: i % step == 0)
elif step < 0:
Expand All @@ -68,7 +71,7 @@ def __getitem__(self, key):
r---e---a---c---t---i---v---e---|
0 1 2 3 4 5 6 7 8
-8 -7 -6 -5 -4 -3 -2 -1
-8 -7 -6 -5 -4 -3 -2 -1 0
Example:
result = source[1:10]
Expand All @@ -91,4 +94,4 @@ def __getitem__(self, key):
else:
raise TypeError("Invalid argument type.")

return self.slice(start, stop, step)
return slice_(self, start, stop, step)
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

setup(
name='Rx',
version='1.6.0',
version='1.6.1',
description='Reactive Extensions (Rx) for Python',
long_description=("is a library for composing asynchronous and "
"event-based programs using observable collections and LINQ-style "
Expand Down
37 changes: 32 additions & 5 deletions tests/test_core/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@ def __init__(self, arg):

def __call__(self, x):
return x + self._arg

def method1(self, x):
return x + self._arg

def method2(self, x, y):
return x + self._arg

def method3(self, x, y, z):
return x + y + z + self._arg

@classmethod
def clsmethod(cls, x):
return x * 10
Expand Down Expand Up @@ -47,4 +47,31 @@ def test_adapt_call_stcmethod1(self):
assert value == 4200

value = func(42, 43, 44)
assert value == 4200
assert value == 4200

def test_adapt_call_underlying_error(self):
err_msg = "We should see the original traceback."

def throws(a):
raise TypeError(err_msg)

with self.assertRaises(TypeError) as e:
adapt_call(throws)(None)

self.assertEqual(err_msg, str(e.exception))

def test_adapt_call_adaptation_error(self):

def not_adaptable(a, b, c):
pass

err_msg = "Couldn't adapt function {}".format(not_adaptable.__name__)

with self.assertRaises(TypeError) as e1:
adapt_call(not_adaptable)(None)

with self.assertRaises(TypeError) as e2:
adapt_call(not_adaptable)(None, None)

for e in (e1, e2):
self.assertEqual(err_msg, str(e.exception))
23 changes: 23 additions & 0 deletions tests/test_observable/test_slice.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,29 @@ def create():
on_completed(230))
xs.subscriptions.assert_equal(subscribe(200, 230))

def test_slice_take_last_skip_all(self):
scheduler = TestScheduler()
xs = scheduler.create_hot_observable(
on_next(70, -2),
on_next(150, -1),
on_next(210, 0),
on_next(230, 1),
on_next(270, 2),
on_next(280, 3),
on_next(300, 4),
on_next(310, 5),
on_next(340, 6),
on_next(370, 7),
on_next(410, 8),
on_next(415, 9),
on_completed(690))

def create():
return xs[-2:0]
results = scheduler.start(create)
results.messages.assert_equal(
on_completed(200))

def test_slice_step_2(self):
scheduler = TestScheduler()
xs = scheduler.create_hot_observable(
Expand Down

0 comments on commit 44081a1

Please sign in to comment.