Skip to content

Commit

Permalink
KAFKA-5259; TransactionalId auth implies ProducerId auth
Browse files Browse the repository at this point in the history
Author: Jason Gustafson <jason@confluent.io>

Reviewers: Apurva Mehta <apurva@confluent.io>, Jun Rao <junrao@gmail.com>

Closes #3075 from hachikuji/KAFKA-5259-FIXED
  • Loading branch information
hachikuji committed May 24, 2017
1 parent 8820093 commit 38f6cae
Show file tree
Hide file tree
Showing 42 changed files with 1,280 additions and 833 deletions.
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients;

import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.RequestHeader;

Expand All @@ -31,7 +32,7 @@ public class ClientResponse {
private final long receivedTimeMs;
private final long latencyMs;
private final boolean disconnected;
private final RuntimeException versionMismatch;
private final UnsupportedVersionException versionMismatch;
private final AbstractResponse responseBody;

/**
Expand All @@ -51,7 +52,7 @@ public ClientResponse(RequestHeader requestHeader,
long createdTimeMs,
long receivedTimeMs,
boolean disconnected,
RuntimeException versionMismatch,
UnsupportedVersionException versionMismatch,
AbstractResponse responseBody) {
this.requestHeader = requestHeader;
this.callback = callback;
Expand All @@ -71,7 +72,7 @@ public boolean wasDisconnected() {
return disconnected;
}

public RuntimeException versionMismatch() {
public UnsupportedVersionException versionMismatch() {
return versionMismatch;
}

Expand Down
Expand Up @@ -83,7 +83,12 @@ public enum AclOperation {
/**
* ALTER_CONFIGS operation.
*/
ALTER_CONFIGS((byte) 11);
ALTER_CONFIGS((byte) 11),

/**
* IDEMPOTENT_WRITE operation.
*/
IDEMPOTENT_WRITE((byte) 12);

private final static HashMap<Byte, AclOperation> CODE_TO_VALUE = new HashMap<>();

Expand Down
Expand Up @@ -416,7 +416,7 @@ private void resetOffsets(final Set<TopicPartition> partitions) {
}
// we might lose the assignment while fetching the offset, so check it is still active
if (subscriptions.isAssigned(partition)) {
log.debug("Resetting offset for partition {} to {} offset.", partition, offsetData.offset);
log.debug("Resetting offset for partition {} to offset {}.", partition, offsetData.offset);
this.subscriptions.seek(partition, offsetData.offset);
}
}
Expand Down
Expand Up @@ -51,7 +51,6 @@
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.RecordBatch;
Expand Down Expand Up @@ -607,7 +606,9 @@ public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callbac
* Implementation of asynchronously send a record to a topic.
*/
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
ensureProperTransactionalState();
if (transactionManager != null)
ensureProperTransactionalState();

TopicPartition tp = null;
try {
// first make sure the metadata for the topic is available
Expand Down Expand Up @@ -642,9 +643,9 @@ private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback call
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
// producer callback will make sure to call both 'callback' and interceptor callback
Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp, transactionManager);
Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);

if (transactionManager != null)
if (transactionManager != null && transactionManager.isTransactional())
transactionManager.maybeAddPartitionToTransaction(tp);

RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
Expand Down Expand Up @@ -690,27 +691,17 @@ private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback call
}

private void ensureProperTransactionalState() {
if (transactionManager == null)
return;

if (transactionManager.isTransactional() && !transactionManager.hasProducerId())
throw new IllegalStateException("Cannot perform a 'send' before completing a call to initTransactions when transactions are enabled.");

if (transactionManager.isFenced())
throw Errors.INVALID_PRODUCER_EPOCH.exception();
throw new IllegalStateException("Cannot perform a 'send' before completing a call to initTransactions " +
"when transactions are enabled.");

if (transactionManager.isInErrorState()) {
String errorMessage =
"Cannot perform send because at least one previous transactional or idempotent request has failed with errors.";
Exception lastError = transactionManager.lastError();
if (lastError != null)
throw new KafkaException(errorMessage, lastError);
else
throw new KafkaException(errorMessage);
throw new KafkaException("Cannot perform send because at least one previous transactional or " +
"idempotent request has failed with errors.", lastError);
}
if (transactionManager.isCompletingTransaction())
throw new IllegalStateException("Cannot call send while a commit or abort is in progress.");

}

private void setReadOnly(Headers headers) {
Expand Down Expand Up @@ -1013,14 +1004,11 @@ private static class InterceptorCallback<K, V> implements Callback {
private final Callback userCallback;
private final ProducerInterceptors<K, V> interceptors;
private final TopicPartition tp;
private final TransactionManager transactionManager;

public InterceptorCallback(Callback userCallback, ProducerInterceptors<K, V> interceptors,
TopicPartition tp, TransactionManager transactionManager) {
private InterceptorCallback(Callback userCallback, ProducerInterceptors<K, V> interceptors, TopicPartition tp) {
this.userCallback = userCallback;
this.interceptors = interceptors;
this.tp = tp;
this.transactionManager = transactionManager;
}

public void onCompletion(RecordMetadata metadata, Exception exception) {
Expand All @@ -1034,9 +1022,6 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
}
if (this.userCallback != null)
this.userCallback.onCompletion(metadata, exception);

if (exception != null && transactionManager != null)
transactionManager.setError(exception);
}
}
}

0 comments on commit 38f6cae

Please sign in to comment.