Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions faust/transport/drivers/aiokafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,8 @@ def _create_worker_consumer(
rebalance_timeout_ms=int(rebalance_timeout * 1000.0),
heartbeat_interval_ms=int(conf.broker_heartbeat_interval * 1000.0),
isolation_level=isolation_level,
metadata_max_age_ms=int(conf.broker_metadata_max_age * 1000.0),
connections_max_idle_ms=int(conf.broker_connections_max_idle * 1000.0),
# traced_from_parent_span=self.traced_from_parent_span,
# start_rebalancing_span=self.start_rebalancing_span,
# start_coordinator_span=self.start_coordinator_span,
Expand Down Expand Up @@ -1101,6 +1103,8 @@ def _settings_default(self) -> Mapping[str, Any]:
"partitioner": self.partitioner,
"request_timeout_ms": int(self.request_timeout * 1000),
"api_version": self._api_version,
"metadata_max_age_ms": self.metadata_max_age_ms,
"connections_max_idle_ms": self.connections_max_idle_ms,
}

def _settings_auth(self) -> Mapping[str, Any]:
Expand Down
2 changes: 2 additions & 0 deletions faust/transport/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ def __init__(
self.ssl_context = conf.ssl_context
self.credentials = conf.broker_credentials
self.partitioner = conf.producer_partitioner
self.metadata_max_age_ms = int(conf.broker_metadata_max_age * 1000.0)
self.connections_max_idle_ms = int(conf.broker_connections_max_idle * 1000.0)
api_version = self._api_version = conf.producer_api_version
assert api_version is not None
super().__init__(loop=loop or self.transport.loop, **kwargs)
Expand Down
26 changes: 26 additions & 0 deletions faust/types/settings/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ def __init__(
broker_rebalance_timeout: Optional[Seconds] = None,
broker_request_timeout: Optional[Seconds] = None,
broker_session_timeout: Optional[Seconds] = None,
broker_metadata_max_age: Optional[Seconds] = None,
broker_connections_max_idle: Optional[Seconds] = None,
ssl_context: ssl.SSLContext = None,
# Consumer settings:
consumer_api_version: Optional[str] = None,
Expand Down Expand Up @@ -1036,6 +1038,30 @@ def broker_session_timeout(self) -> float:
:setting:`broker_request_timeout`.
"""

@sections.Broker.setting(
params.Seconds,
env_name="BROKER_METADATA_MAX_AGE",
default=300.0,
)
def broker_metadata_max_age(self) -> float:
"""Broker metadata max age.

The period of time in seconds after which we force a refresh of metadata even
if we haven't seen any partition leadership changes to proactively discover any new
brokers or partitions.
"""

@sections.Broker.setting(
params.Seconds,
env_name="BROKER_CONNECTIONS_MAX_IDLE",
default=540.0,
)
def broker_connections_max_idle(self) -> float:
"""Broker connections max idle.

Close idle connections after the number of seconds specified by this config.
"""

@sections.Common.setting(
params.SSLContext,
default=None,
Expand Down