Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python 3.10 #205

Merged
merged 11 commits into from Apr 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/python-package.yml
Expand Up @@ -19,7 +19,7 @@ jobs:
# for example if a test fails only when Cython is enabled
fail-fast: false
matrix:
python-version: ["3.6", "3.7", "3.8", "3.9"]
python-version: ["3.6", "3.7", "3.8", "3.9", "3.10"]
use-cython: ["true", "false"]
env:
USE_CYTHON: ${{ matrix.use-cython }}
Expand Down
2 changes: 1 addition & 1 deletion faust/_cython/streams.pyx
Expand Up @@ -84,7 +84,7 @@ cdef class StreamIterator:
event = None

while value is None and event is None:
await sleep(0, loop=self.loop)
await sleep(0)
need_slow_get, channel_value = self._try_get_quick_value()
if need_slow_get:
channel_value = await self.chan_slow_get()
Expand Down
2 changes: 1 addition & 1 deletion faust/agents/agent.py
Expand Up @@ -1098,7 +1098,7 @@ def __init__(
) -> None:
super().__init__(*args, **kwargs)
self.results = {}
self.new_value_processed = asyncio.Condition(loop=self.loop)
self.new_value_processed = asyncio.Condition()
self.original_channel = cast(ChannelT, original_channel)
self.add_sink(self._on_value_processed)
self._stream = self.channel.stream()
Expand Down
3 changes: 1 addition & 2 deletions faust/agents/replies.py
Expand Up @@ -76,8 +76,7 @@ class BarrierState(ReplyPromise):

def __post_init__(self) -> None:
self.pending = set()
loop: asyncio.AbstractEventLoop = self._loop
self._results = asyncio.Queue(maxsize=1000, loop=loop)
self._results = asyncio.Queue(maxsize=1000)

def _verify_correlation_id(self, correlation_id: str) -> None:
pass # barrier does not require a correlation id.
Expand Down
1 change: 0 additions & 1 deletion faust/app/_attached.py
Expand Up @@ -186,7 +186,6 @@ async def commit(self, tp: TP, offset: int) -> None:
await asyncio.wait(
await self.publish_for_tp_offset(tp, offset),
return_when=asyncio.ALL_COMPLETED,
loop=self.app.loop,
)

async def publish_for_tp_offset(
Expand Down
2 changes: 0 additions & 2 deletions faust/app/base.py
Expand Up @@ -1822,14 +1822,12 @@ def FlowControlQueue(
maxsize: Optional[int] = None,
*,
clear_on_resume: bool = False,
loop: Optional[asyncio.AbstractEventLoop] = None,
) -> ThrowableQueue:
"""Like :class:`asyncio.Queue`, but can be suspended/resumed."""
return ThrowableQueue(
maxsize=maxsize,
flow_control=self.flow_control,
clear_on_resume=clear_on_resume,
loop=loop or self.loop,
)

def Worker(self, **kwargs: Any) -> _Worker:
Expand Down
1 change: 0 additions & 1 deletion faust/channels.py
Expand Up @@ -141,7 +141,6 @@ def queue(self) -> ThrowableQueue:
maxsize = self.app.conf.stream_buffer_maxsize
self._queue = self.app.FlowControlQueue(
maxsize=maxsize,
loop=self.loop,
clear_on_resume=True,
)
return self._queue
Expand Down
19 changes: 8 additions & 11 deletions faust/streams.py
Expand Up @@ -134,10 +134,9 @@ def __init__(
self.channel = channel
self.outbox = self.app.FlowControlQueue(
maxsize=self.app.conf.stream_buffer_maxsize,
loop=self.loop,
clear_on_resume=True,
)
self._passive_started = asyncio.Event(loop=self.loop)
self._passive_started = asyncio.Event()
self.join_strategy = join_strategy
self.combined = combined if combined is not None else []
self.concurrency_index = concurrency_index
Expand Down Expand Up @@ -318,8 +317,8 @@ async def take(self, max_: int, within: Seconds) -> AsyncIterable[Sequence[T_co]
buffer_add = buffer.append
event_add = events.append
buffer_size = buffer.__len__
buffer_full = asyncio.Event(loop=self.loop)
buffer_consumed = asyncio.Event(loop=self.loop)
buffer_full = asyncio.Event()
buffer_consumed = asyncio.Event()
timeout = want_seconds(within) if within else None
stream_enable_acks: bool = self.enable_acks

Expand Down Expand Up @@ -415,8 +414,8 @@ async def take_with_timestamp(
buffer_add = buffer.append
event_add = events.append
buffer_size = buffer.__len__
buffer_full = asyncio.Event(loop=self.loop)
buffer_consumed = asyncio.Event(loop=self.loop)
buffer_full = asyncio.Event()
buffer_consumed = asyncio.Event()
timeout = want_seconds(within) if within else None
stream_enable_acks: bool = self.enable_acks

Expand Down Expand Up @@ -518,8 +517,8 @@ async def noack_take(
buffer_add = buffer.append
event_add = events.append
buffer_size = buffer.__len__
buffer_full = asyncio.Event(loop=self.loop)
buffer_consumed = asyncio.Event(loop=self.loop)
buffer_full = asyncio.Event()
buffer_consumed = asyncio.Event()
timeout = want_seconds(within) if within else None
stream_enable_acks: bool = self.enable_acks

Expand Down Expand Up @@ -691,7 +690,6 @@ def echo(self, *channels: Union[str, ChannelT]) -> StreamT:
async def echoing(value: T) -> T:
await asyncio.wait(
[maybe_forward(value, channel) for channel in _channels],
loop=self.loop,
return_when=asyncio.ALL_COMPLETED,
)
return value
Expand Down Expand Up @@ -997,7 +995,6 @@ def _set_current_event(self, event: Optional[EventT] = None) -> None:

async def _py_aiter(self) -> AsyncIterator[T_co]:
self._finalized = True
loop = self.loop
started_by_aiter = await self.maybe_start()
on_merge = self.on_merge
on_stream_event_out = self._on_stream_event_out
Expand Down Expand Up @@ -1049,7 +1046,7 @@ async def _py_aiter(self) -> AsyncIterator[T_co]:
value: Any = None
# we iterate until on_merge gives value.
while value is None and event is None:
await sleep(0, loop=loop)
await sleep(0)
# get message from channel
# This inlines ThrowableQueue.get for performance:
# We selectively call `await Q.put`/`Q.put_nowait`,
Expand Down
7 changes: 3 additions & 4 deletions faust/tables/manager.py
Expand Up @@ -48,9 +48,9 @@ def __init__(self, app: AppT, **kwargs: Any) -> None:
self._changelog_queue = None
self._channels = {}
self._changelogs = {}
self._tables_finalized = asyncio.Event(loop=self.loop)
self._tables_registered = asyncio.Event(loop=self.loop)
self._recovery_started = asyncio.Event(loop=self.loop)
self._tables_finalized = asyncio.Event()
self._tables_registered = asyncio.Event()
self._recovery_started = asyncio.Event()

self.actives_ready = False
self.standbys_ready = False
Expand Down Expand Up @@ -111,7 +111,6 @@ def changelog_queue(self) -> ThrowableQueue:
if self._changelog_queue is None:
self._changelog_queue = self.app.FlowControlQueue(
maxsize=self.app.conf.stream_buffer_maxsize,
loop=self.loop,
clear_on_resume=True,
)
return self._changelog_queue
Expand Down
4 changes: 2 additions & 2 deletions faust/tables/recovery.py
Expand Up @@ -195,14 +195,14 @@ def __init__(self, app: AppT, tables: TableManagerT, **kwargs: Any) -> None:
def signal_recovery_start(self) -> Event:
"""Event used to signal that recovery has started."""
if self._signal_recovery_start is None:
self._signal_recovery_start = Event(loop=self.loop)
self._signal_recovery_start = Event()
return self._signal_recovery_start

@property
def signal_recovery_end(self) -> Event:
"""Event used to signal that recovery has ended."""
if self._signal_recovery_end is None:
self._signal_recovery_end = Event(loop=self.loop)
self._signal_recovery_end = Event()
return self._signal_recovery_end

async def on_stop(self) -> None:
Expand Down
2 changes: 1 addition & 1 deletion faust/transport/conductor.py
Expand Up @@ -294,7 +294,7 @@ async def _subscriber(self) -> None: # pragma: no cover
notify(self._subscription_done)

# Now we wait for changes
ev = self._subscription_changed = asyncio.Event(loop=self.loop)
ev = self._subscription_changed = asyncio.Event()
while not self.should_stop:
# Wait for something to add/remove topics from subscription.
await ev.wait()
Expand Down
2 changes: 1 addition & 1 deletion faust/transport/consumer.py
Expand Up @@ -832,7 +832,7 @@ async def _wait_for_ack(self, timeout: float) -> None:
self._waiting_for_ack = asyncio.Future(loop=self.loop)
try:
# wait for `ack()` to wake us up
await asyncio.wait_for(self._waiting_for_ack, loop=self.loop, timeout=1)
await asyncio.wait_for(self._waiting_for_ack, timeout=1)
except (asyncio.TimeoutError, asyncio.CancelledError): # pragma: no cover
pass
finally:
Expand Down
25 changes: 12 additions & 13 deletions faust/types/app.py
Expand Up @@ -141,7 +141,7 @@ def __init__(
enable_kafka: bool = True,
enable_kafka_producer: Optional[bool] = None,
enable_kafka_consumer: Optional[bool] = None,
enable_sensors: bool = True
enable_sensors: bool = True,
) -> None:
...

Expand Down Expand Up @@ -248,7 +248,7 @@ def discover(
self,
*extra_modules: str,
categories: Iterable[str] = ("a", "b", "c"),
ignore: Iterable[Any] = ("foo", "bar")
ignore: Iterable[Any] = ("foo", "bar"),
) -> None:
...

Expand All @@ -273,7 +273,7 @@ def topic(
maxsize: Optional[int] = None,
allow_empty: bool = False,
has_prefix: bool = False,
loop: Optional[asyncio.AbstractEventLoop] = None
loop: Optional[asyncio.AbstractEventLoop] = None,
) -> TopicT:
...

Expand All @@ -285,7 +285,7 @@ def channel(
key_type: _ModelArg = None,
value_type: _ModelArg = None,
maxsize: Optional[int] = None,
loop: Optional[asyncio.AbstractEventLoop] = None
loop: Optional[asyncio.AbstractEventLoop] = None,
) -> ChannelT:
...

Expand All @@ -300,7 +300,7 @@ def agent(
sink: Iterable[SinkT] = None,
isolated_partitions: bool = False,
use_reply_headers: bool = True,
**kwargs: Any
**kwargs: Any,
) -> Callable[[AgentFun[_T]], AgentT[_T]]:
...

Expand Down Expand Up @@ -329,7 +329,7 @@ def crontab(
*,
timezone: tzinfo = None,
on_leader: bool = False,
traced: bool = True
traced: bool = True,
) -> Callable:
...

Expand All @@ -352,7 +352,7 @@ def Table(
window: Optional[WindowT] = None,
partitions: Optional[int] = None,
help: Optional[str] = None,
**kwargs: Any
**kwargs: Any,
) -> TableT:
...

Expand All @@ -365,7 +365,7 @@ def GlobalTable(
window: Optional[WindowT] = None,
partitions: Optional[int] = None,
help: Optional[str] = None,
**kwargs: Any
**kwargs: Any,
) -> TableT:
...

Expand All @@ -378,7 +378,7 @@ def SetTable(
partitions: Optional[int] = None,
start_manager: bool = False,
help: Optional[str] = None,
**kwargs: Any
**kwargs: Any,
) -> TableT:
...

Expand All @@ -391,7 +391,7 @@ def SetGlobalTable(
partitions: Optional[int] = None,
start_manager: bool = False,
help: Optional[str] = None,
**kwargs: Any
**kwargs: Any,
) -> TableT:
...

Expand All @@ -402,7 +402,7 @@ def page(
*,
base: Type[View] = View,
cors_options: Mapping[str, ResourceOptions] = None,
name: Optional[str] = None
name: Optional[str] = None,
) -> Callable[[PageArg], Type[View]]:
...

Expand All @@ -414,7 +414,7 @@ def table_route(
*,
query_param: Optional[str] = None,
match_info: Optional[str] = None,
exact_key: Optional[str] = None
exact_key: Optional[str] = None,
) -> ViewDecorator:
...

Expand Down Expand Up @@ -479,7 +479,6 @@ def FlowControlQueue(
maxsize: Optional[int] = None,
*,
clear_on_resume: bool = False,
loop: Optional[asyncio.AbstractEventLoop] = None
) -> ThrowableQueue:
...

Expand Down
2 changes: 1 addition & 1 deletion faust/types/settings/params.py
Expand Up @@ -196,7 +196,7 @@ class Param(Generic[IT, OT], property):
#: Default template.
#: If set the default value will be generated from this format string
#: template.
#: For exmaple the :setting:`canonical_url` setting uses
#: For example the :setting:`canonical_url` setting uses
#: ``default_template='http://{conf.web_host}:{conf.web_port}' to
#: generate a default value from the :setting:`web_host` and
#: :setting:`web_port` settings.
Expand Down
2 changes: 1 addition & 1 deletion requirements/requirements.txt
Expand Up @@ -2,7 +2,7 @@ aiohttp>=3.5.2,<4.0
aiohttp_cors>=0.7,<2.0
aiokafka>=0.7.1,<0.8.0
click>=6.7,<8.1
mode-streaming==0.1.0
mode-streaming>=0.2.0
opentracing>=1.3.0,<=2.4.0
terminaltables>=3.1,<4.0
yarl>=1.0,<2.0
Expand Down
5 changes: 1 addition & 4 deletions tests/consistency/test_consistency.py
Expand Up @@ -16,7 +16,7 @@ def __init__(self, num_workers, num_producers, loop):
self.producers = set(range(num_producers))
self._producer_procs = {}
self.loop = loop
self._stop_stresser = asyncio.Event(loop=loop)
self._stop_stresser = asyncio.Event()

@property
def _stopped(self):
Expand Down Expand Up @@ -75,7 +75,6 @@ async def start(self, stopped_at_start=0):
)
await asyncio.wait(
[self._start_worker(worker) for worker in start_workers],
loop=self.loop,
return_when=asyncio.ALL_COMPLETED,
)
asyncio.ensure_future(self._run_stresser(), loop=self.loop)
Expand Down Expand Up @@ -120,14 +119,12 @@ async def _exec_worker(
async def stop_all(self):
await asyncio.wait(
[self._stop_worker(worker) for worker in self._running],
loop=self.loop,
return_when=asyncio.ALL_COMPLETED,
)

async def stop_all_producers(self):
await asyncio.wait(
[self._stop_producer(producer) for producer in self._running_producers],
loop=self.loop,
return_when=asyncio.ALL_COMPLETED,
)

Expand Down
8 changes: 4 additions & 4 deletions tests/functional/agents/helpers.py
Expand Up @@ -67,10 +67,10 @@ def __init__(

super().__init__(**kwargs)

self.agent_started = asyncio.Event(loop=self.loop)
self.agent_started_processing = asyncio.Event(loop=self.loop)
self.agent_stopped_processing = asyncio.Event(loop=self.loop)
self.finished = asyncio.Event(loop=self.loop)
self.agent_started = asyncio.Event()
self.agent_started_processing = asyncio.Event()
self.agent_stopped_processing = asyncio.Event()
self.finished = asyncio.Event()

async def on_start(self) -> None:
app = self.app
Expand Down
3 changes: 2 additions & 1 deletion tox.ini
@@ -1,5 +1,5 @@
[tox]
envlist = 3.9,3.8,3.7,3.6,flake8,apicheck,configcheck,typecheck,docstyle,bandit,spell
envlist = 3.10,3.9,3.8,3.7,3.6,flake8,apicheck,configcheck,typecheck,docstyle,bandit,spell

[testenv]
deps=
Expand All @@ -17,6 +17,7 @@ recreate = False
commands = py.test --random-order --open-files -xvv --cov=faust tests/unit tests/functional tests/integration tests/meticulous/ tests/regression

basepython =
3.10: python3.10
3.9,flake8,apicheck,linkcheck,configcheck,typecheck,docstyle,bandit,spell: python3.9
3.8,flake8,apicheck,linkcheck,configcheck,typecheck,docstyle,bandit,spell: python3.8
3.7: python3.7
Expand Down