Skip to content

Commit

Permalink
Added connections_max_idle_ms for consumer and producer (#281)
Browse files Browse the repository at this point in the history
Added connections_max_idle_ms for both aiokafka Consumer
and Producer. This is needed in order to comply with
recommended configs for use with Azure Event Hubs.

Co-authored-by: Magnus Zotterman <magnus.zotterman@zenseact.com>
  • Loading branch information
magzot and Magnus Zotterman committed Mar 8, 2022
1 parent 3b7f079 commit 84302f5
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 0 deletions.
2 changes: 2 additions & 0 deletions faust/transport/drivers/aiokafka.py
Expand Up @@ -533,6 +533,7 @@ def _create_worker_consumer(
heartbeat_interval_ms=int(conf.broker_heartbeat_interval * 1000.0),
isolation_level=isolation_level,
metadata_max_age_ms=conf.consumer_metadata_max_age_ms,
connections_max_idle_ms=conf.consumer_connections_max_idle_ms,
# traced_from_parent_span=self.traced_from_parent_span,
# start_rebalancing_span=self.start_rebalancing_span,
# start_coordinator_span=self.start_coordinator_span,
Expand Down Expand Up @@ -1103,6 +1104,7 @@ def _settings_default(self) -> Mapping[str, Any]:
"request_timeout_ms": int(self.request_timeout * 1000),
"api_version": self._api_version,
"metadata_max_age_ms": self.app.conf.producer_metadata_max_age_ms,
"connections_max_idle_ms": self.app.conf.producer_connections_max_idle_ms,
}

def _settings_auth(self) -> Mapping[str, Any]:
Expand Down
32 changes: 32 additions & 0 deletions faust/types/settings/settings.py
Expand Up @@ -113,6 +113,7 @@ def __init__(
consumer_auto_offset_reset: Optional[str] = None,
consumer_group_instance_id: Optional[str] = None,
consumer_metadata_max_age_ms: Optional[int] = None,
consumer_connections_max_idle_ms: Optional[int] = None,
# Topic serialization settings:
key_serializer: CodecArg = None,
value_serializer: CodecArg = None,
Expand All @@ -130,6 +131,7 @@ def __init__(
producer_request_timeout: Optional[Seconds] = None,
producer_threaded: bool = False,
producer_metadata_max_age_ms: Optional[int] = None,
producer_connections_max_idle_ms: Optional[int] = None,
# RPC settings:
reply_create_topic: Optional[bool] = None,
reply_expires: Optional[Seconds] = None,
Expand Down Expand Up @@ -1132,6 +1134,21 @@ def consumer_metadata_max_age_ms(self) -> int:
Default: 300000
"""

@sections.Consumer.setting(
params.Int,
version_introduced="0.8.5",
env_name="CONSUMER_CONNECTIONS_MAX_IDLE_MS",
default=9 * 60 * 1000,
)
def consumer_connections_max_idle_ms(self) -> int:
"""Consumer connections max idle milliseconds.
Close idle connections after the number of milliseconds
specified by this config.
Default: 540000 (9 minutes).
"""

@sections.Serialization.setting(
params.Codec,
env_name="APP_KEY_SERIALIZER",
Expand Down Expand Up @@ -1379,6 +1396,21 @@ def producer_metadata_max_age_ms(self) -> int:
"""

@sections.Producer.setting(
params.Int,
version_introduced="0.8.5",
env_name="PRODUCER_CONNECTIONS_MAX_IDLE_MS",
default=9 * 60 * 1000,
)
def producer_connections_max_idle_ms(self) -> int:
"""Producer connections max idle milliseconds.
Close idle connections after the number of milliseconds
specified by this config.
Default: 540000 (9 minutes).
"""

@sections.Stream.setting(
params.Bool,
version_introduced="0.4.7",
Expand Down

0 comments on commit 84302f5

Please sign in to comment.