Skip to content

Commit

Permalink
Revert "KAFKA-13598: enable idempotence producer by default and valid…
Browse files Browse the repository at this point in the history
…ate the configs (#11691)"

This reverts commit 43cbf17.
  • Loading branch information
jeffkbkim committed Mar 31, 2022
1 parent 63b76b8 commit f96b021
Show file tree
Hide file tree
Showing 11 changed files with 113 additions and 296 deletions.
Expand Up @@ -194,9 +194,8 @@ public class CommonClientConfigs {
public static Map<String, Object> postProcessReconnectBackoffConfigs(AbstractConfig config,
Map<String, Object> parsedValues) {
HashMap<String, Object> rval = new HashMap<>();
Map<String, Object> originalConfig = config.originals();
if ((!originalConfig.containsKey(RECONNECT_BACKOFF_MAX_MS_CONFIG)) &&
originalConfig.containsKey(RECONNECT_BACKOFF_MS_CONFIG)) {
if ((!config.originals().containsKey(RECONNECT_BACKOFF_MAX_MS_CONFIG)) &&
config.originals().containsKey(RECONNECT_BACKOFF_MS_CONFIG)) {
log.debug("Disabling exponential reconnect backoff because {} is set, but {} is not.",
RECONNECT_BACKOFF_MS_CONFIG, RECONNECT_BACKOFF_MAX_MS_CONFIG);
rval.put(RECONNECT_BACKOFF_MAX_MS_CONFIG, parsedValues.get(RECONNECT_BACKOFF_MS_CONFIG));
Expand Down
Expand Up @@ -445,7 +445,7 @@ public KafkaProducer(Properties properties, Serializer<K> keySerializer, Seriali

// visible for testing
Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
int maxInflightRequests = producerConfig.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
int maxInflightRequests = configureInflightRequests(producerConfig);
int requestTimeoutMs = producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(producerConfig, time, logContext);
ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
Expand All @@ -468,8 +468,7 @@ Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadat
apiVersions,
throttleTimeSensor,
logContext);

short acks = Short.parseShort(producerConfig.getString(ProducerConfig.ACKS_CONFIG));
short acks = configureAcks(producerConfig, log);
return new Sender(logContext,
client,
metadata,
Expand Down Expand Up @@ -515,8 +514,15 @@ private static int configureDeliveryTimeout(ProducerConfig config, Logger log) {

private TransactionManager configureTransactionState(ProducerConfig config,
LogContext logContext) {

TransactionManager transactionManager = null;

final boolean userConfiguredIdempotence = config.originals().containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG);
final boolean userConfiguredTransactions = config.originals().containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
if (userConfiguredTransactions && !userConfiguredIdempotence)
log.info("Overriding the default {} to true since {} is specified.", ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,
ProducerConfig.TRANSACTIONAL_ID_CONFIG);

if (config.idempotenceEnabled()) {
final String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
final int transactionTimeoutMs = config.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
Expand All @@ -537,6 +543,28 @@ private TransactionManager configureTransactionState(ProducerConfig config,
return transactionManager;
}

private static int configureInflightRequests(ProducerConfig config) {
if (config.idempotenceEnabled() && 5 < config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) {
throw new ConfigException("Must set " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" +
" to use the idempotent producer.");
}
return config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
}

private static short configureAcks(ProducerConfig config, Logger log) {
boolean userConfiguredAcks = config.originals().containsKey(ProducerConfig.ACKS_CONFIG);
short acks = Short.parseShort(config.getString(ProducerConfig.ACKS_CONFIG));

if (config.idempotenceEnabled()) {
if (!userConfiguredAcks)
log.info("Overriding the default {} to all since idempotence is enabled.", ProducerConfig.ACKS_CONFIG);
else if (acks != -1)
throw new ConfigException("Must set " + ProducerConfig.ACKS_CONFIG + " to all in order to use the idempotent " +
"producer. Otherwise we cannot guarantee idempotence.");
}
return acks;
}

/**
* Needs to be called before any other methods when the transactional.id is set in the configuration.
*
Expand Down
Expand Up @@ -207,8 +207,6 @@ public class ProducerConfig extends AbstractConfig {
private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking."
+ " Note that if this config is set to be greater than 1 and <code>enable.idempotence</code> is set to false, there is a risk of"
+ " message re-ordering after a failed send due to retries (i.e., if retries are enabled).";
// max.in.flight.requests.per.connection should be less than or equal to 5 when idempotence producer enabled to ensure message ordering
private static final int MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE = 5;

/** <code>retries</code> */
public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG;
Expand Down Expand Up @@ -271,8 +269,8 @@ public class ProducerConfig extends AbstractConfig {
public static final String ENABLE_IDEMPOTENCE_CONFIG = "enable.idempotence";
public static final String ENABLE_IDEMPOTENCE_DOC = "When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer "
+ "retries due to broker failures, etc., may write duplicates of the retried message in the stream. "
+ "Note that enabling idempotence requires <code>" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "</code> to be less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE
+ " (with message ordering preserved for any allowable value), <code>" + RETRIES_CONFIG + "</code> to be greater than 0, and <code>"
+ "Note that enabling idempotence requires <code>" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "</code> to be less than or equal to 5 "
+ "(with message ordering preserved for any allowable value), <code>" + RETRIES_CONFIG + "</code> to be greater than 0, and <code>"
+ ACKS_CONFIG + "</code> must be 'all'. If these values are not explicitly set by the user, suitable values will be chosen. If incompatible "
+ "values are set, a <code>ConfigException</code> will be thrown.";

Expand Down Expand Up @@ -440,8 +438,9 @@ public class ProducerConfig extends AbstractConfig {
@Override
protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) {
Map<String, Object> refinedConfigs = CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
postProcessAndValidateIdempotenceConfigs(refinedConfigs);
maybeOverrideEnableIdempotence(refinedConfigs);
maybeOverrideClientId(refinedConfigs);
maybeOverrideAcksAndRetries(refinedConfigs);
return refinedConfigs;
}

Expand All @@ -457,30 +456,33 @@ private void maybeOverrideClientId(final Map<String, Object> configs) {
configs.put(CLIENT_ID_CONFIG, refinedClientId);
}

private void postProcessAndValidateIdempotenceConfigs(final Map<String, Object> configs) {
final Map<String, Object> originalConfigs = this.originals();
private void maybeOverrideEnableIdempotence(final Map<String, Object> configs) {
boolean userConfiguredIdempotence = this.originals().containsKey(ENABLE_IDEMPOTENCE_CONFIG);
boolean userConfiguredTransactions = this.originals().containsKey(TRANSACTIONAL_ID_CONFIG);

if (userConfiguredTransactions && !userConfiguredIdempotence) {
configs.put(ENABLE_IDEMPOTENCE_CONFIG, true);
}
}

private void maybeOverrideAcksAndRetries(final Map<String, Object> configs) {
final String acksStr = parseAcks(this.getString(ACKS_CONFIG));
configs.put(ACKS_CONFIG, acksStr);

// For idempotence producers, values for `RETRIES_CONFIG` and `ACKS_CONFIG` need validation
// For idempotence producers, values for `RETRIES_CONFIG` and `ACKS_CONFIG` might need to be overridden.
if (idempotenceEnabled()) {
boolean userConfiguredRetries = originalConfigs.containsKey(RETRIES_CONFIG);
if (userConfiguredRetries && this.getInt(RETRIES_CONFIG) == 0) {
boolean userConfiguredRetries = this.originals().containsKey(RETRIES_CONFIG);
if (this.getInt(RETRIES_CONFIG) == 0) {
throw new ConfigException("Must set " + ProducerConfig.RETRIES_CONFIG + " to non-zero when using the idempotent producer.");
}
configs.put(RETRIES_CONFIG, userConfiguredRetries ? this.getInt(RETRIES_CONFIG) : Integer.MAX_VALUE);

boolean userConfiguredAcks = originalConfigs.containsKey(ACKS_CONFIG);
boolean userConfiguredAcks = this.originals().containsKey(ACKS_CONFIG);
final short acks = Short.valueOf(acksStr);
if (userConfiguredAcks && acks != (short) -1) {
throw new ConfigException("Must set " + ACKS_CONFIG + " to all in order to use the idempotent " +
"producer. Otherwise we cannot guarantee idempotence.");
}

boolean userConfiguredInflightRequests = originalConfigs.containsKey(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
if (userConfiguredInflightRequests && MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE < this.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) {
throw new ConfigException("Must set " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" +
" to use the idempotent producer.");
}
configs.put(ACKS_CONFIG, "-1");
}
}

Expand Down Expand Up @@ -512,12 +514,13 @@ public ProducerConfig(Map<String, Object> props) {
}

boolean idempotenceEnabled() {
boolean userConfiguredIdempotence = this.originals().containsKey(ENABLE_IDEMPOTENCE_CONFIG);
boolean userConfiguredTransactions = this.originals().containsKey(TRANSACTIONAL_ID_CONFIG);
boolean idempotenceEnabled = this.getBoolean(ENABLE_IDEMPOTENCE_CONFIG);
if (!idempotenceEnabled && userConfiguredTransactions)
throw new ConfigException("Cannot set a " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence.");
boolean idempotenceEnabled = userConfiguredIdempotence && this.getBoolean(ENABLE_IDEMPOTENCE_CONFIG);

return idempotenceEnabled;
if (!idempotenceEnabled && userConfiguredIdempotence && userConfiguredTransactions)
throw new ConfigException("Cannot set a " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence.");
return userConfiguredTransactions || idempotenceEnabled;
}

ProducerConfig(Map<?, ?> props, boolean doLog) {
Expand Down

0 comments on commit f96b021

Please sign in to comment.