Skip to content

Commit

Permalink
tempeventqueue
Browse files Browse the repository at this point in the history
  • Loading branch information
erikkemperman committed Mar 1, 2019
1 parent 941a1ca commit fd9bd47
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 14 deletions.
35 changes: 21 additions & 14 deletions rx/concurrency/eventloopscheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,17 +114,18 @@ def _ensure_thread(self) -> None:
"""Ensures there is an event loop thread running. Should be
called under the gate."""

if not self.thread:
if self.thread is None:
thread = self.thread_factory(self.run)
self.thread = thread
thread.start()

def run(self) -> None:
"""Event loop scheduled on the designated event loop thread.
The loop is suspended/resumed using the condition which gets notified
by calls to Schedule or calls to dispose."""
by calls to schedule or calls to dispose."""

ready: Deque[ScheduledItem] = deque()
item: ScheduledItem = None

while True:

Expand All @@ -135,28 +136,34 @@ def run(self) -> None:
# immediately. Subsequent calls to Schedule won't ever create a
# new thread.
if self.is_disposed:
return
break

# Sort the ready_list (from recent calls for immediate schedule)
# and the due subset of previously queued items.
# and the due subset of previously queued items into ready.
time = self.now
while self.queue:
due = self.queue.peek().duetime
item = self.queue.peek()
while item.is_cancelled():
self.queue.dequeue()
item = self.queue.peek()

due = item.duetime
if due > time:
break

while self.ready_list and due > self.ready_list[0].duetime:
ready.append(self.ready_list.popleft())

ready.append(self.queue.dequeue())

while self.ready_list:
ready.append(self.ready_list.popleft())

# Execute the gathered actions
# Invoke the collected ready actions
while ready:
item = ready.popleft()
if not item.is_cancelled():
item.invoke()
ready.popleft().invoke()

# Wait for next cycle, or if we're done let's exit if so configured
# Go to next cycle, or if we're done let's exit if so configured
with self.condition:

if self.ready_list:
Expand All @@ -169,11 +176,11 @@ def run(self) -> None:
log.debug("timeout: %s", seconds)
self.condition.wait(seconds)

elif self.exit_if_empty:
self.thread = None
return

else:
self.queue.count = 0
if self.exit_if_empty:
self.thread = None
break
self.condition.wait()

def dispose(self) -> None:
Expand Down
3 changes: 3 additions & 0 deletions rx/internal/priorityqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ def __init__(self) -> None:
self.items: List[Any] = []
self.count = 0 # Monotonic increasing for sort stability

def __bool__(self):
return bool(self.items)

def __len__(self):
"""Returns length of queue"""

Expand Down

0 comments on commit fd9bd47

Please sign in to comment.