Skip to content

Commit

Permalink
Python 3.10 (faust-streaming#205)
Browse files Browse the repository at this point in the history
* Remove deprecated loop argument for python-3.10

* reformat file for black

* remove unused variable, fix typo

* at least point to the faust-streaming/mode repo

* fix noack_take() for python-3.10

* use black to add commas

* set mode-streaming to 0.2.0 or higher

* remove loop argument from Event (thank you @theY4Kman)
  • Loading branch information
taybin authored and cbrand committed Jun 5, 2022
1 parent b7eefcc commit ac45ced
Show file tree
Hide file tree
Showing 18 changed files with 40 additions and 52 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/python-package.yml
Expand Up @@ -19,7 +19,7 @@ jobs:
# for example if a test fails only when Cython is enabled
fail-fast: false
matrix:
python-version: ["3.6", "3.7", "3.8", "3.9"]
python-version: ["3.6", "3.7", "3.8", "3.9", "3.10"]
use-cython: ["true", "false"]
env:
USE_CYTHON: ${{ matrix.use-cython }}
Expand Down
2 changes: 1 addition & 1 deletion faust/_cython/streams.pyx
Expand Up @@ -84,7 +84,7 @@ cdef class StreamIterator:
event = None

while value is None and event is None:
await sleep(0, loop=self.loop)
await sleep(0)
need_slow_get, channel_value = self._try_get_quick_value()
if need_slow_get:
channel_value = await self.chan_slow_get()
Expand Down
2 changes: 1 addition & 1 deletion faust/agents/agent.py
Expand Up @@ -1098,7 +1098,7 @@ def __init__(
) -> None:
super().__init__(*args, **kwargs)
self.results = {}
self.new_value_processed = asyncio.Condition(loop=self.loop)
self.new_value_processed = asyncio.Condition()
self.original_channel = cast(ChannelT, original_channel)
self.add_sink(self._on_value_processed)
self._stream = self.channel.stream()
Expand Down
3 changes: 1 addition & 2 deletions faust/agents/replies.py
Expand Up @@ -76,8 +76,7 @@ class BarrierState(ReplyPromise):

def __post_init__(self) -> None:
self.pending = set()
loop: asyncio.AbstractEventLoop = self._loop
self._results = asyncio.Queue(maxsize=1000, loop=loop)
self._results = asyncio.Queue(maxsize=1000)

def _verify_correlation_id(self, correlation_id: str) -> None:
pass # barrier does not require a correlation id.
Expand Down
1 change: 0 additions & 1 deletion faust/app/_attached.py
Expand Up @@ -186,7 +186,6 @@ async def commit(self, tp: TP, offset: int) -> None:
await asyncio.wait(
await self.publish_for_tp_offset(tp, offset),
return_when=asyncio.ALL_COMPLETED,
loop=self.app.loop,
)

async def publish_for_tp_offset(
Expand Down
2 changes: 0 additions & 2 deletions faust/app/base.py
Expand Up @@ -1822,14 +1822,12 @@ def FlowControlQueue(
maxsize: Optional[int] = None,
*,
clear_on_resume: bool = False,
loop: Optional[asyncio.AbstractEventLoop] = None,
) -> ThrowableQueue:
"""Like :class:`asyncio.Queue`, but can be suspended/resumed."""
return ThrowableQueue(
maxsize=maxsize,
flow_control=self.flow_control,
clear_on_resume=clear_on_resume,
loop=loop or self.loop,
)

def Worker(self, **kwargs: Any) -> _Worker:
Expand Down
1 change: 0 additions & 1 deletion faust/channels.py
Expand Up @@ -141,7 +141,6 @@ def queue(self) -> ThrowableQueue:
maxsize = self.app.conf.stream_buffer_maxsize
self._queue = self.app.FlowControlQueue(
maxsize=maxsize,
loop=self.loop,
clear_on_resume=True,
)
return self._queue
Expand Down
19 changes: 8 additions & 11 deletions faust/streams.py
Expand Up @@ -134,10 +134,9 @@ def __init__(
self.channel = channel
self.outbox = self.app.FlowControlQueue(
maxsize=self.app.conf.stream_buffer_maxsize,
loop=self.loop,
clear_on_resume=True,
)
self._passive_started = asyncio.Event(loop=self.loop)
self._passive_started = asyncio.Event()
self.join_strategy = join_strategy
self.combined = combined if combined is not None else []
self.concurrency_index = concurrency_index
Expand Down Expand Up @@ -318,8 +317,8 @@ async def take(self, max_: int, within: Seconds) -> AsyncIterable[Sequence[T_co]
buffer_add = buffer.append
event_add = events.append
buffer_size = buffer.__len__
buffer_full = asyncio.Event(loop=self.loop)
buffer_consumed = asyncio.Event(loop=self.loop)
buffer_full = asyncio.Event()
buffer_consumed = asyncio.Event()
timeout = want_seconds(within) if within else None
stream_enable_acks: bool = self.enable_acks

Expand Down Expand Up @@ -415,8 +414,8 @@ async def take_with_timestamp(
buffer_add = buffer.append
event_add = events.append
buffer_size = buffer.__len__
buffer_full = asyncio.Event(loop=self.loop)
buffer_consumed = asyncio.Event(loop=self.loop)
buffer_full = asyncio.Event()
buffer_consumed = asyncio.Event()
timeout = want_seconds(within) if within else None
stream_enable_acks: bool = self.enable_acks

Expand Down Expand Up @@ -518,8 +517,8 @@ async def noack_take(
buffer_add = buffer.append
event_add = events.append
buffer_size = buffer.__len__
buffer_full = asyncio.Event(loop=self.loop)
buffer_consumed = asyncio.Event(loop=self.loop)
buffer_full = asyncio.Event()
buffer_consumed = asyncio.Event()
timeout = want_seconds(within) if within else None
stream_enable_acks: bool = self.enable_acks

Expand Down Expand Up @@ -691,7 +690,6 @@ def echo(self, *channels: Union[str, ChannelT]) -> StreamT:
async def echoing(value: T) -> T:
await asyncio.wait(
[maybe_forward(value, channel) for channel in _channels],
loop=self.loop,
return_when=asyncio.ALL_COMPLETED,
)
return value
Expand Down Expand Up @@ -997,7 +995,6 @@ def _set_current_event(self, event: Optional[EventT] = None) -> None:

async def _py_aiter(self) -> AsyncIterator[T_co]:
self._finalized = True
loop = self.loop
started_by_aiter = await self.maybe_start()
on_merge = self.on_merge
on_stream_event_out = self._on_stream_event_out
Expand Down Expand Up @@ -1049,7 +1046,7 @@ async def _py_aiter(self) -> AsyncIterator[T_co]:
value: Any = None
# we iterate until on_merge gives value.
while value is None and event is None:
await sleep(0, loop=loop)
await sleep(0)
# get message from channel
# This inlines ThrowableQueue.get for performance:
# We selectively call `await Q.put`/`Q.put_nowait`,
Expand Down
7 changes: 3 additions & 4 deletions faust/tables/manager.py
Expand Up @@ -48,9 +48,9 @@ def __init__(self, app: AppT, **kwargs: Any) -> None:
self._changelog_queue = None
self._channels = {}
self._changelogs = {}
self._tables_finalized = asyncio.Event(loop=self.loop)
self._tables_registered = asyncio.Event(loop=self.loop)
self._recovery_started = asyncio.Event(loop=self.loop)
self._tables_finalized = asyncio.Event()
self._tables_registered = asyncio.Event()
self._recovery_started = asyncio.Event()

self.actives_ready = False
self.standbys_ready = False
Expand Down Expand Up @@ -111,7 +111,6 @@ def changelog_queue(self) -> ThrowableQueue:
if self._changelog_queue is None:
self._changelog_queue = self.app.FlowControlQueue(
maxsize=self.app.conf.stream_buffer_maxsize,
loop=self.loop,
clear_on_resume=True,
)
return self._changelog_queue
Expand Down
4 changes: 2 additions & 2 deletions faust/tables/recovery.py
Expand Up @@ -195,14 +195,14 @@ def __init__(self, app: AppT, tables: TableManagerT, **kwargs: Any) -> None:
def signal_recovery_start(self) -> Event:
"""Event used to signal that recovery has started."""
if self._signal_recovery_start is None:
self._signal_recovery_start = Event(loop=self.loop)
self._signal_recovery_start = Event()
return self._signal_recovery_start

@property
def signal_recovery_end(self) -> Event:
"""Event used to signal that recovery has ended."""
if self._signal_recovery_end is None:
self._signal_recovery_end = Event(loop=self.loop)
self._signal_recovery_end = Event()
return self._signal_recovery_end

async def on_stop(self) -> None:
Expand Down
2 changes: 1 addition & 1 deletion faust/transport/conductor.py
Expand Up @@ -294,7 +294,7 @@ async def _subscriber(self) -> None: # pragma: no cover
notify(self._subscription_done)

# Now we wait for changes
ev = self._subscription_changed = asyncio.Event(loop=self.loop)
ev = self._subscription_changed = asyncio.Event()
while not self.should_stop:
# Wait for something to add/remove topics from subscription.
await ev.wait()
Expand Down
2 changes: 1 addition & 1 deletion faust/transport/consumer.py
Expand Up @@ -832,7 +832,7 @@ async def _wait_for_ack(self, timeout: float) -> None:
self._waiting_for_ack = asyncio.Future(loop=self.loop)
try:
# wait for `ack()` to wake us up
await asyncio.wait_for(self._waiting_for_ack, loop=self.loop, timeout=1)
await asyncio.wait_for(self._waiting_for_ack, timeout=1)
except (asyncio.TimeoutError, asyncio.CancelledError): # pragma: no cover
pass
finally:
Expand Down
25 changes: 12 additions & 13 deletions faust/types/app.py
Expand Up @@ -141,7 +141,7 @@ def __init__(
enable_kafka: bool = True,
enable_kafka_producer: Optional[bool] = None,
enable_kafka_consumer: Optional[bool] = None,
enable_sensors: bool = True
enable_sensors: bool = True,
) -> None:
...

Expand Down Expand Up @@ -248,7 +248,7 @@ def discover(
self,
*extra_modules: str,
categories: Iterable[str] = ("a", "b", "c"),
ignore: Iterable[Any] = ("foo", "bar")
ignore: Iterable[Any] = ("foo", "bar"),
) -> None:
...

Expand All @@ -273,7 +273,7 @@ def topic(
maxsize: Optional[int] = None,
allow_empty: bool = False,
has_prefix: bool = False,
loop: Optional[asyncio.AbstractEventLoop] = None
loop: Optional[asyncio.AbstractEventLoop] = None,
) -> TopicT:
...

Expand All @@ -285,7 +285,7 @@ def channel(
key_type: _ModelArg = None,
value_type: _ModelArg = None,
maxsize: Optional[int] = None,
loop: Optional[asyncio.AbstractEventLoop] = None
loop: Optional[asyncio.AbstractEventLoop] = None,
) -> ChannelT:
...

Expand All @@ -300,7 +300,7 @@ def agent(
sink: Iterable[SinkT] = None,
isolated_partitions: bool = False,
use_reply_headers: bool = True,
**kwargs: Any
**kwargs: Any,
) -> Callable[[AgentFun[_T]], AgentT[_T]]:
...

Expand Down Expand Up @@ -329,7 +329,7 @@ def crontab(
*,
timezone: tzinfo = None,
on_leader: bool = False,
traced: bool = True
traced: bool = True,
) -> Callable:
...

Expand All @@ -352,7 +352,7 @@ def Table(
window: Optional[WindowT] = None,
partitions: Optional[int] = None,
help: Optional[str] = None,
**kwargs: Any
**kwargs: Any,
) -> TableT:
...

Expand All @@ -365,7 +365,7 @@ def GlobalTable(
window: Optional[WindowT] = None,
partitions: Optional[int] = None,
help: Optional[str] = None,
**kwargs: Any
**kwargs: Any,
) -> TableT:
...

Expand All @@ -378,7 +378,7 @@ def SetTable(
partitions: Optional[int] = None,
start_manager: bool = False,
help: Optional[str] = None,
**kwargs: Any
**kwargs: Any,
) -> TableT:
...

Expand All @@ -391,7 +391,7 @@ def SetGlobalTable(
partitions: Optional[int] = None,
start_manager: bool = False,
help: Optional[str] = None,
**kwargs: Any
**kwargs: Any,
) -> TableT:
...

Expand All @@ -402,7 +402,7 @@ def page(
*,
base: Type[View] = View,
cors_options: Mapping[str, ResourceOptions] = None,
name: Optional[str] = None
name: Optional[str] = None,
) -> Callable[[PageArg], Type[View]]:
...

Expand All @@ -414,7 +414,7 @@ def table_route(
*,
query_param: Optional[str] = None,
match_info: Optional[str] = None,
exact_key: Optional[str] = None
exact_key: Optional[str] = None,
) -> ViewDecorator:
...

Expand Down Expand Up @@ -479,7 +479,6 @@ def FlowControlQueue(
maxsize: Optional[int] = None,
*,
clear_on_resume: bool = False,
loop: Optional[asyncio.AbstractEventLoop] = None
) -> ThrowableQueue:
...

Expand Down
2 changes: 1 addition & 1 deletion faust/types/settings/params.py
Expand Up @@ -196,7 +196,7 @@ class Param(Generic[IT, OT], property):
#: Default template.
#: If set the default value will be generated from this format string
#: template.
#: For exmaple the :setting:`canonical_url` setting uses
#: For example the :setting:`canonical_url` setting uses
#: ``default_template='http://{conf.web_host}:{conf.web_port}' to
#: generate a default value from the :setting:`web_host` and
#: :setting:`web_port` settings.
Expand Down
2 changes: 1 addition & 1 deletion requirements/requirements.txt
Expand Up @@ -2,7 +2,7 @@ aiohttp>=3.5.2,<4.0
aiohttp_cors>=0.7,<2.0
aiokafka>=0.7.1,<0.8.0
click>=6.7,<8.1
mode-streaming==0.1.0
mode-streaming>=0.2.0
opentracing>=1.3.0,<=2.4.0
terminaltables>=3.1,<4.0
yarl>=1.0,<2.0
Expand Down
5 changes: 1 addition & 4 deletions tests/consistency/test_consistency.py
Expand Up @@ -16,7 +16,7 @@ def __init__(self, num_workers, num_producers, loop):
self.producers = set(range(num_producers))
self._producer_procs = {}
self.loop = loop
self._stop_stresser = asyncio.Event(loop=loop)
self._stop_stresser = asyncio.Event()

@property
def _stopped(self):
Expand Down Expand Up @@ -75,7 +75,6 @@ async def start(self, stopped_at_start=0):
)
await asyncio.wait(
[self._start_worker(worker) for worker in start_workers],
loop=self.loop,
return_when=asyncio.ALL_COMPLETED,
)
asyncio.ensure_future(self._run_stresser(), loop=self.loop)
Expand Down Expand Up @@ -120,14 +119,12 @@ async def _exec_worker(
async def stop_all(self):
await asyncio.wait(
[self._stop_worker(worker) for worker in self._running],
loop=self.loop,
return_when=asyncio.ALL_COMPLETED,
)

async def stop_all_producers(self):
await asyncio.wait(
[self._stop_producer(producer) for producer in self._running_producers],
loop=self.loop,
return_when=asyncio.ALL_COMPLETED,
)

Expand Down
8 changes: 4 additions & 4 deletions tests/functional/agents/helpers.py
Expand Up @@ -67,10 +67,10 @@ def __init__(

super().__init__(**kwargs)

self.agent_started = asyncio.Event(loop=self.loop)
self.agent_started_processing = asyncio.Event(loop=self.loop)
self.agent_stopped_processing = asyncio.Event(loop=self.loop)
self.finished = asyncio.Event(loop=self.loop)
self.agent_started = asyncio.Event()
self.agent_started_processing = asyncio.Event()
self.agent_stopped_processing = asyncio.Event()
self.finished = asyncio.Event()

async def on_start(self) -> None:
app = self.app
Expand Down
3 changes: 2 additions & 1 deletion tox.ini
@@ -1,5 +1,5 @@
[tox]
envlist = 3.9,3.8,3.7,3.6,flake8,apicheck,configcheck,typecheck,docstyle,bandit,spell
envlist = 3.10,3.9,3.8,3.7,3.6,flake8,apicheck,configcheck,typecheck,docstyle,bandit,spell

[testenv]
deps=
Expand All @@ -17,6 +17,7 @@ recreate = False
commands = py.test --random-order --open-files -xvv --cov=faust tests/unit tests/functional tests/integration tests/meticulous/ tests/regression

basepython =
3.10: python3.10
3.9,flake8,apicheck,linkcheck,configcheck,typecheck,docstyle,bandit,spell: python3.9
3.8,flake8,apicheck,linkcheck,configcheck,typecheck,docstyle,bandit,spell: python3.8
3.7: python3.7
Expand Down

0 comments on commit ac45ced

Please sign in to comment.