Skip to content

Commit

Permalink
Use synchronized wrapper instead
Browse files Browse the repository at this point in the history
  • Loading branch information
dbrattli committed Mar 12, 2022
1 parent 9c344f1 commit 3e2ef0c
Show file tree
Hide file tree
Showing 10 changed files with 30 additions and 25 deletions.
5 changes: 5 additions & 0 deletions reactivex/internal/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,18 @@
SequenceContainsNoElementsError,
)
from .priorityqueue import PriorityQueue
from .utils import NotSet, add_ref, alias, infinite

__all__ = [
"add_ref",
"alias",
"ArgumentOutOfRangeException",
"DisposedException",
"default_comparer",
"default_error",
"infinite",
"noop",
"NotSet",
"SequenceContainsNoElementsError",
"concurrency",
"DELTA_ZERO",
Expand Down
3 changes: 3 additions & 0 deletions reactivex/internal/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,6 @@ def __eq__(self, other: Any) -> bool:

def __repr__(self) -> str:
return "NotSet"


__all__ = ["add_ref", "infinite", "alias", "NotSet"]
2 changes: 1 addition & 1 deletion reactivex/observable/zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from reactivex import Observable, abc, from_future
from reactivex.disposable import CompositeDisposable, SingleAssignmentDisposable
from reactivex.internal.concurrency import synchronized
from reactivex.internal import synchronized


def zip_(*args: Observable[Any]) -> Observable[Tuple[Any, ...]]:
Expand Down
2 changes: 1 addition & 1 deletion reactivex/operators/_groupjoin.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
RefCountDisposable,
SingleAssignmentDisposable,
)
from reactivex.internal.utils import add_ref
from reactivex.internal import add_ref
from reactivex.subject import Subject

_TLeft = TypeVar("_TLeft")
Expand Down
2 changes: 1 addition & 1 deletion reactivex/operators/_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import reactivex
from reactivex import Observable, abc, from_future, typing
from reactivex.disposable import CompositeDisposable, SingleAssignmentDisposable
from reactivex.internal.concurrency import synchronized
from reactivex.internal import synchronized

_T = TypeVar("_T")

Expand Down
3 changes: 1 addition & 2 deletions reactivex/operators/_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
SerialDisposable,
SingleAssignmentDisposable,
)
from reactivex.internal import noop
from reactivex.internal.utils import add_ref
from reactivex.internal import add_ref, noop
from reactivex.subject import Subject

log = logging.getLogger("Rx")
Expand Down
3 changes: 1 addition & 2 deletions reactivex/operators/_windowwithcount.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@

from reactivex import Observable, abc
from reactivex.disposable import RefCountDisposable, SingleAssignmentDisposable
from reactivex.internal.exceptions import ArgumentOutOfRangeException
from reactivex.internal.utils import add_ref
from reactivex.internal import ArgumentOutOfRangeException, add_ref
from reactivex.subject import Subject

log = logging.getLogger("Rx")
Expand Down
31 changes: 15 additions & 16 deletions reactivex/operators/_windowwithtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
SerialDisposable,
SingleAssignmentDisposable,
)
from reactivex.internal.constants import DELTA_ZERO
from reactivex.internal.utils import add_ref
from reactivex.internal import DELTA_ZERO, add_ref, synchronized
from reactivex.scheduler import TimeoutScheduler
from reactivex.subject import Subject

Expand Down Expand Up @@ -69,18 +68,18 @@ def create_timer():
if is_shift:
next_shift[0] += timeshift

@synchronized(source.lock)
def action(scheduler: abc.SchedulerBase, state: Any = None):
s: Optional[Subject[_T]] = None

with source.lock:
if is_shift:
s = Subject()
queue.append(s)
observer.on_next(add_ref(s, ref_count_disposable))
if is_shift:
s = Subject()
queue.append(s)
observer.on_next(add_ref(s, ref_count_disposable))

if is_span:
s = queue.pop(0)
s.on_completed()
if is_span:
s = queue.pop(0)
s.on_completed()

create_timer()

Expand All @@ -95,17 +94,17 @@ def on_next(x: _T) -> None:
for s in queue:
s.on_next(x)

@synchronized(source.lock)
def on_error(e: Exception) -> None:
with source.lock:
for s in queue:
s.on_error(e)
for s in queue:
s.on_error(e)

observer.on_error(e)

@synchronized(source.lock)
def on_completed() -> None:
with source.lock:
for s in queue:
s.on_completed()
for s in queue:
s.on_completed()

observer.on_completed()

Expand Down
2 changes: 1 addition & 1 deletion reactivex/operators/_windowwithtimeorcount.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
SerialDisposable,
SingleAssignmentDisposable,
)
from reactivex.internal.utils import add_ref
from reactivex.internal import add_ref
from reactivex.scheduler import TimeoutScheduler
from reactivex.subject import Subject

Expand Down
2 changes: 1 addition & 1 deletion tests/test_observable/test_withlatestfrom.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class RxException(Exception):
# Helper function for raising exceptions within lambdas


def _raise(ex):
def _raise(ex: str) -> None:
raise RxException(ex)


Expand Down

0 comments on commit 3e2ef0c

Please sign in to comment.