Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Factor out common client settings into a shared trait #913

Draft
wants to merge 3 commits into
base: series/2.x
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 1 addition & 98 deletions modules/core/src/main/scala/fs2/kafka/AdminClientSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,36 +24,7 @@ import scala.concurrent.duration._
* Use [[AdminClientSettings#apply]] for the default settings, and
* then apply any desired modifications on top of that instance.
*/
sealed abstract class AdminClientSettings {

/**
* Properties which can be provided when creating a Java `KafkaAdminClient`
* instance. Numerous functions in [[AdminClientSettings]] add properties
* here if the settings are used by the Java `KafkaAdminClient`.
*/
def properties: Map[String, String]

/**
* Returns a new [[AdminClientSettings]] instance with the specified
* bootstrap servers. This is equivalent to setting the following
* property using the [[withProperty]] function.
*
* {{{
* AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG
* }}}
*/
def withBootstrapServers(bootstrapServers: String): AdminClientSettings

/**
* Returns a new [[AdminClientSettings]] instance with the specified
* client id. This is equivalent to setting the following property
* using the [[withProperty]] function.
*
* {{{
* AdminClientConfig.CLIENT_ID_CONFIG
* }}}
*/
def withClientId(clientId: String): AdminClientSettings
sealed abstract class AdminClientSettings extends KafkaClientSettings[AdminClientSettings] {

/**
* Returns a new [[AdminClientSettings]] instance with the specified
Expand Down Expand Up @@ -103,18 +74,6 @@ sealed abstract class AdminClientSettings {
*/
def withConnectionsMaxIdle(connectionsMaxIdle: FiniteDuration): AdminClientSettings

/**
* Returns a new [[AdminClientSettings]] instance with the specified
* request timeout. This is equivalent to setting the following
* property using the [[withProperty]] function, except you can
* specify it with a `FiniteDuration` instead of a `String`.
*
* {{{
* AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG
* }}}
*/
def withRequestTimeout(requestTimeout: FiniteDuration): AdminClientSettings

/**
* Returns a new [[AdminClientSettings]] instance with the specified
* max metadata age. This is equivalent to setting the following
Expand All @@ -138,44 +97,6 @@ sealed abstract class AdminClientSettings {
* }}}
*/
def withRetries(retries: Int): AdminClientSettings

/**
* Includes a property with the specified `key` and `value`.
* The key should be one of the keys in `AdminClientConfig`,
* and the value should be a valid choice for the key.
*/
def withProperty(key: String, value: String): AdminClientSettings

/**
* Includes the specified keys and values as properties. The
* keys should be part of the `AdminClientConfig` keys, and
* the values should be valid choices for the keys.
*/
def withProperties(properties: (String, String)*): AdminClientSettings

/**
* Includes the specified keys and values as properties. The
* keys should be part of the `AdminClientConfig` keys, and
* the values should be valid choices for the keys.
*/
def withProperties(properties: Map[String, String]): AdminClientSettings

/**
* The time to wait for the Java `KafkaAdminClient` to shutdown.<br>
* <br>
* The default value is 20 seconds.
*/
def closeTimeout: FiniteDuration

/**
* Creates a new [[AdminClientSettings]] with the specified [[closeTimeout]].
*/
def withCloseTimeout(closeTimeout: FiniteDuration): AdminClientSettings

/**
* Includes the credentials properties from the provided [[KafkaCredentialStore]]
*/
def withCredentials(credentialsStore: KafkaCredentialStore): AdminClientSettings
}

object AdminClientSettings {
Expand All @@ -184,12 +105,6 @@ object AdminClientSettings {
override val closeTimeout: FiniteDuration
) extends AdminClientSettings {

override def withBootstrapServers(bootstrapServers: String): AdminClientSettings =
withProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)

override def withClientId(clientId: String): AdminClientSettings =
withProperty(AdminClientConfig.CLIENT_ID_CONFIG, clientId)

override def withReconnectBackoff(reconnectBackoff: FiniteDuration): AdminClientSettings =
withProperty(
AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG,
Expand All @@ -215,30 +130,18 @@ object AdminClientSettings {
connectionsMaxIdle.toMillis.toString
)

override def withRequestTimeout(requestTimeout: FiniteDuration): AdminClientSettings =
withProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout.toMillis.toString)

override def withMetadataMaxAge(metadataMaxAge: FiniteDuration): AdminClientSettings =
withProperty(AdminClientConfig.METADATA_MAX_AGE_CONFIG, metadataMaxAge.toMillis.toString)

override def withRetries(retries: Int): AdminClientSettings =
withProperty(AdminClientConfig.RETRIES_CONFIG, retries.toString)

override def withProperty(key: String, value: String): AdminClientSettings =
copy(properties = properties.updated(key, value))

override def withProperties(properties: (String, String)*): AdminClientSettings =
copy(properties = this.properties ++ properties.toMap)

override def withProperties(properties: Map[String, String]): AdminClientSettings =
copy(properties = this.properties ++ properties)

override def withCloseTimeout(closeTimeout: FiniteDuration): AdminClientSettings =
copy(closeTimeout = closeTimeout)

override def withCredentials(credentialsStore: KafkaCredentialStore): AdminClientSettings =
withProperties(credentialsStore.properties)

override def toString: String =
s"AdminClientSettings(closeTimeout = $closeTimeout)"
}
Expand Down
120 changes: 2 additions & 118 deletions modules/core/src/main/scala/fs2/kafka/ConsumerSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
package fs2.kafka

import cats.{Applicative, Show}
import fs2.kafka.security.KafkaCredentialStore
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.requests.OffsetFetchResponse

Expand Down Expand Up @@ -35,7 +34,8 @@ import scala.concurrent.duration._
* <br>
* Use `ConsumerSettings#apply` to create a new instance.
*/
sealed abstract class ConsumerSettings[F[_], K, V] {
sealed abstract class ConsumerSettings[F[_], K, V]
extends KafkaClientSettings[ConsumerSettings[F, K, V]] {

/**
* The `Deserializer` to use for deserializing record keys.
Expand Down Expand Up @@ -65,24 +65,6 @@ sealed abstract class ConsumerSettings[F[_], K, V] {
*/
def withCustomBlockingContext(ec: ExecutionContext): ConsumerSettings[F, K, V]

/**
* Properties which can be provided when creating a Java `KafkaConsumer`
* instance. Numerous functions in [[ConsumerSettings]] add properties
* here if the settings are used by the Java `KafkaConsumer`.
*/
def properties: Map[String, String]

/**
* Returns a new [[ConsumerSettings]] instance with the specified
* bootstrap servers. This is equivalent to setting the following
* property using the [[withProperty]] function.
*
* {{{
* ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
* }}}
*/
def withBootstrapServers(bootstrapServers: String): ConsumerSettings[F, K, V]

/**
* Returns a new [[ConsumerSettings]] instance with the specified
* auto offset reset. This is equivalent to setting the following
Expand All @@ -95,17 +77,6 @@ sealed abstract class ConsumerSettings[F[_], K, V] {
*/
def withAutoOffsetReset(autoOffsetReset: AutoOffsetReset): ConsumerSettings[F, K, V]

/**
* Returns a new [[ConsumerSettings]] instance with the specified
* client id. This is equivalent to setting the following property
* using the [[withProperty]] function.
*
* {{{
* ConsumerConfig.CLIENT_ID_CONFIG
* }}}
*/
def withClientId(clientId: String): ConsumerSettings[F, K, V]

/**
* Returns a new [[ConsumerSettings]] instance with the specified
* group id. This is equivalent to setting the following property
Expand Down Expand Up @@ -202,18 +173,6 @@ sealed abstract class ConsumerSettings[F[_], K, V] {
*/
def withAutoCommitInterval(autoCommitInterval: FiniteDuration): ConsumerSettings[F, K, V]

/**
* Returns a new [[ConsumerSettings]] instance with the specified
* request timeout. This is equivalent to setting the following
* property using the [[withProperty]] function, except you can
* specify it with a `FiniteDuration` instead of a `String`.
*
* {{{
* ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG
* }}}
*/
def withRequestTimeout(requestTimeout: FiniteDuration): ConsumerSettings[F, K, V]

/**
* Returns a new [[ConsumerSettings]] instance with the specified
* default api timeout. This is equivalent to setting the following
Expand Down Expand Up @@ -250,50 +209,6 @@ sealed abstract class ConsumerSettings[F[_], K, V] {
*/
def withAllowAutoCreateTopics(allowAutoCreateTopics: Boolean): ConsumerSettings[F, K, V]

/**
* Returns a new [[ConsumerSettings]] instance with the specified
* client rack. This is equivalent to setting the following
* property using the [[withProperty]] function.
*
* {{{
* ConsumerConfig.CLIENT_RACK_CONFIG
* }}}
*/
def withClientRack(clientRack: String): ConsumerSettings[F, K, V]

/**
* Includes a property with the specified `key` and `value`.
* The key should be one of the keys in `ConsumerConfig`,
* and the value should be a valid choice for the key.
*/
def withProperty(key: String, value: String): ConsumerSettings[F, K, V]

/**
* Includes the specified keys and values as properties. The
* keys should be part of the `ConsumerConfig` keys, and
* the values should be valid choices for the keys.
*/
def withProperties(properties: (String, String)*): ConsumerSettings[F, K, V]

/**
* Includes the specified keys and values as properties. The
* keys should be part of the `ConsumerConfig` keys, and
* the values should be valid choices for the keys.
*/
def withProperties(properties: Map[String, String]): ConsumerSettings[F, K, V]

/**
* The time to wait for the Java `KafkaConsumer` to shutdown.<br>
* <br>
* The default value is 20 seconds.
*/
def closeTimeout: FiniteDuration

/**
* Creates a new [[ConsumerSettings]] with the specified [[closeTimeout]].
*/
def withCloseTimeout(closeTimeout: FiniteDuration): ConsumerSettings[F, K, V]

/**
* The time to wait for offset commits to complete. If an offset commit
* doesn't complete within this time, a [[CommitTimeoutException]] will
Expand Down Expand Up @@ -386,11 +301,6 @@ sealed abstract class ConsumerSettings[F[_], K, V] {
* instead be set to `2` and not the specified value.
*/
def withMaxPrefetchBatches(maxPrefetchBatches: Int): ConsumerSettings[F, K, V]

/**
* Includes the credentials properties from the provided [[KafkaCredentialStore]]
*/
def withCredentials(credentialsStore: KafkaCredentialStore): ConsumerSettings[F, K, V]
}

object ConsumerSettings {
Expand All @@ -410,9 +320,6 @@ object ConsumerSettings {
override def withCustomBlockingContext(ec: ExecutionContext): ConsumerSettings[F, K, V] =
copy(customBlockingContext = Some(ec))

override def withBootstrapServers(bootstrapServers: String): ConsumerSettings[F, K, V] =
withProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)

override def withAutoOffsetReset(autoOffsetReset: AutoOffsetReset): ConsumerSettings[F, K, V] =
withProperty(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
Expand All @@ -423,9 +330,6 @@ object ConsumerSettings {
}
)

override def withClientId(clientId: String): ConsumerSettings[F, K, V] =
withProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId)

override def withGroupId(groupId: String): ConsumerSettings[F, K, V] =
withProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)

Expand Down Expand Up @@ -457,12 +361,6 @@ object ConsumerSettings {
autoCommitInterval.toMillis.toString
)

override def withRequestTimeout(requestTimeout: FiniteDuration): ConsumerSettings[F, K, V] =
withProperty(
ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,
requestTimeout.toMillis.toString
)

override def withDefaultApiTimeout(
defaultApiTimeout: FiniteDuration
): ConsumerSettings[F, K, V] =
Expand All @@ -485,15 +383,6 @@ object ConsumerSettings {
): ConsumerSettings[F, K, V] =
withProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, allowAutoCreateTopics.toString)

override def withClientRack(clientRack: String): ConsumerSettings[F, K, V] =
withProperty(ConsumerConfig.CLIENT_RACK_CONFIG, clientRack)

override def withProperty(key: String, value: String): ConsumerSettings[F, K, V] =
copy(properties = properties.updated(key, value))

override def withProperties(properties: (String, String)*): ConsumerSettings[F, K, V] =
copy(properties = this.properties ++ properties.toMap)

override def withProperties(properties: Map[String, String]): ConsumerSettings[F, K, V] =
copy(properties = this.properties ++ properties)

Expand All @@ -520,11 +409,6 @@ object ConsumerSettings {
override def withMaxPrefetchBatches(maxPrefetchBatches: Int): ConsumerSettings[F, K, V] =
copy(maxPrefetchBatches = Math.max(2, maxPrefetchBatches))

override def withCredentials(
credentialsStore: KafkaCredentialStore
): ConsumerSettings[F, K, V] =
withProperties(credentialsStore.properties)

override def toString: String =
s"ConsumerSettings(closeTimeout = $closeTimeout, commitTimeout = $commitTimeout, pollInterval = $pollInterval, pollTimeout = $pollTimeout, commitRecovery = $commitRecovery)"
}
Expand Down
Loading