diff --git a/faust/transport/drivers/aiokafka.py b/faust/transport/drivers/aiokafka.py index f2a56ea54..003f0170f 100644 --- a/faust/transport/drivers/aiokafka.py +++ b/faust/transport/drivers/aiokafka.py @@ -533,6 +533,7 @@ def _create_worker_consumer( heartbeat_interval_ms=int(conf.broker_heartbeat_interval * 1000.0), isolation_level=isolation_level, metadata_max_age_ms=conf.consumer_metadata_max_age_ms, + connections_max_idle_ms=conf.consumer_connections_max_idle_ms, # traced_from_parent_span=self.traced_from_parent_span, # start_rebalancing_span=self.start_rebalancing_span, # start_coordinator_span=self.start_coordinator_span, @@ -1103,6 +1104,7 @@ def _settings_default(self) -> Mapping[str, Any]: "request_timeout_ms": int(self.request_timeout * 1000), "api_version": self._api_version, "metadata_max_age_ms": self.app.conf.producer_metadata_max_age_ms, + "connections_max_idle_ms": self.app.conf.producer_connections_max_idle_ms, } def _settings_auth(self) -> Mapping[str, Any]: diff --git a/faust/types/settings/settings.py b/faust/types/settings/settings.py index 07b02d5ac..f2203fff5 100644 --- a/faust/types/settings/settings.py +++ b/faust/types/settings/settings.py @@ -113,6 +113,7 @@ def __init__( consumer_auto_offset_reset: Optional[str] = None, consumer_group_instance_id: Optional[str] = None, consumer_metadata_max_age_ms: Optional[int] = None, + consumer_connections_max_idle_ms: Optional[int] = None, # Topic serialization settings: key_serializer: CodecArg = None, value_serializer: CodecArg = None, @@ -130,6 +131,7 @@ def __init__( producer_request_timeout: Optional[Seconds] = None, producer_threaded: bool = False, producer_metadata_max_age_ms: Optional[int] = None, + producer_connections_max_idle_ms: Optional[int] = None, # RPC settings: reply_create_topic: Optional[bool] = None, reply_expires: Optional[Seconds] = None, @@ -1132,6 +1134,21 @@ def consumer_metadata_max_age_ms(self) -> int: Default: 300000 """ + @sections.Consumer.setting( + params.Int, + version_introduced="0.8.5", + env_name="CONSUMER_CONNECTIONS_MAX_IDLE_MS", + default=9 * 60 * 1000, + ) + def consumer_connections_max_idle_ms(self) -> int: + """Consumer connections max idle milliseconds. + + Close idle connections after the number of milliseconds + specified by this config. + + Default: 540000 (9 minutes). + """ + @sections.Serialization.setting( params.Codec, env_name="APP_KEY_SERIALIZER", @@ -1379,6 +1396,21 @@ def producer_metadata_max_age_ms(self) -> int: """ + @sections.Producer.setting( + params.Int, + version_introduced="0.8.5", + env_name="PRODUCER_CONNECTIONS_MAX_IDLE_MS", + default=9 * 60 * 1000, + ) + def producer_connections_max_idle_ms(self) -> int: + """Producer connections max idle milliseconds. + + Close idle connections after the number of milliseconds + specified by this config. + + Default: 540000 (9 minutes). + """ + @sections.Stream.setting( params.Bool, version_introduced="0.4.7",