Skip to content

Commit

Permalink
chore: flake8 fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
cbrand committed Mar 24, 2022
1 parent 221689f commit 92d415f
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 5 deletions.
4 changes: 2 additions & 2 deletions faust/agents/manager.py
Expand Up @@ -125,7 +125,7 @@ async def on_rebalance(self, revoked: Set[TP], newly_assigned: Set[TP]) -> None:

def _collect_agents_for_update(self, tps: Set[TP]) -> Dict[AgentT, Set[TP]]:
by_agent: Dict[AgentT, Set[TP]] = defaultdict(set)
for topic, tps in tp_set_to_map(tps).items():
for topic, tps_of_topic in tp_set_to_map(tps).items():
for agent in self._by_topic[topic]:
by_agent[agent].update(tps)
by_agent[agent].update(tps_of_topic)
return by_agent
14 changes: 12 additions & 2 deletions faust/tables/base.py
Expand Up @@ -4,7 +4,6 @@
from collections import defaultdict
from contextlib import suppress
from datetime import datetime
from functools import lru_cache
from heapq import heappop, heappush
from typing import (
Any,
Expand Down Expand Up @@ -169,6 +168,8 @@ def __init__(
self._sensor_on_set = self.app.sensors.on_table_set
self._sensor_on_del = self.app.sensors.on_table_del

self._verified_source_topics_for_partitions: Set[str] = set()

def _serializer_from_type(self, typ: Optional[ModelArg]) -> Optional[CodecArg]:
if typ is bytes:
return "raw"
Expand Down Expand Up @@ -312,8 +313,15 @@ def partition_for_key(self, key: Any) -> Optional[int]:
self._verify_source_topic_partitions(event.message.topic)
return event.message.partition

@lru_cache()
def _verify_source_topic_partitions(self, source_topic: str) -> None:
# This was formerly wrapped in an lru_cache. The linter sees issues with this
# as an instance stays cached.
# This is why we implement a non lru_cached lookup which checks if the
# function has already been executed once on the instance level.

if source_topic in self._verified_source_topics_for_partitions:
return

change_topic = self.changelog_topic_name
source_n = self.app.consumer.topic_partitions(source_topic)
if source_n is not None:
Expand All @@ -330,6 +338,8 @@ def _verify_source_topic_partitions(self, source_topic: str) -> None:
),
)

self._verified_source_topics_for_partitions.add(source_topic)

def _on_changelog_sent(self, fut: FutureMessage) -> None:
# This is what keeps the offset in RocksDB so that at startup
# we know what offsets we already have data for in the database.
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/test_app.py
Expand Up @@ -668,7 +668,7 @@ def assert_config_equivalent(
web_transport="udp://",
web_in_thread=True,
web_cors_options={ # noqa: B006
"http://example.com": ResourceOptions(
"http://example.com": ResourceOptions( # noqa: B008
allow_credentials=True,
expose_headers="*",
allow_headers="*",
Expand Down

0 comments on commit 92d415f

Please sign in to comment.