Skip to content

Commit

Permalink
Remove gotchas
Browse files Browse the repository at this point in the history
Fix comment on test_hot marbles
  • Loading branch information
mat authored and dbrattli committed Dec 31, 2022
1 parent a79f969 commit d82b7ab
Showing 1 changed file with 6 additions and 45 deletions.
51 changes: 6 additions & 45 deletions docs/testing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -245,50 +245,11 @@ The examples below showcase some less commonly needed testing tools.
))
message = result.messages[0]
# sub starts at 200 and we emit at 300 - since this is a hot observable,
# aka 5 ticks of 20 (timespan=20 in to_marbles)
# then we get the 42 emit and then blank until 500, so 10 ticks*20
# sub starts at 200 and we emit at 300
# since `source` is a hot observable, the emission @190 will not be caught
# instead on our subscription, it will look like emission at 300-200=100 ticks
# aka 5 "-" each representing 20 ticks (timespan=20 in to_marbles)
# then the 42 is received emit
# and then nothing for another 500-300 ticks 500, so 10 "-" before complete
assert message.value.value == '-----(42)----------|'
Gotchas
.......

Directly using observables in code
**********************************

If your code creates observables directly in the code you wish to test e.g. `timeout = reactivex.timer(3)`
you will not be able to test properly as it will *actually* attempt to wait 3 real life seconds.

Some suggestions:

.. code:: python
# Difficult to test because reactivex.timer is real time
def do_or_timeout(doer_observable: Observable[int]):
reactivex.merge(
doer_observable,
reactivex.timer(5).pipe(
operators.flat_map(lambda _: reactivex.throw(Exception('abc')))
)
)
# option 1: accept scheduler as arg, and pass the TestScheduler
def do_or_timeout(doer_observable: Observable[int], scheduler=None):
reactivex.merge(
doer_observable,
reactivex.timer(5.0, scheduler=scheduler).pipe(
operators.flat_map(lambda _: reactivex.throw(Exception('abc')))
)
)
# option 2: dependency injection: optional timeout
def do_or_timeout(doer_observable: Observable[int], timeout=None):
timeout = timeout or reactivex.timer(5.0)
reactivex.merge(
doer_observable,
timeout.pipe(
operators.flat_map(lambda _: reactivex.throw(Exception('abc')))
)
)

0 comments on commit d82b7ab

Please sign in to comment.