Skip to content

Commit

Permalink
MINOR: Cleanup handling of mixed transactional/idempotent records (#6172
Browse files Browse the repository at this point in the history
)

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Ismael Juma <ismael@juma.me.uk>, Colin Patrick McCabe <colin@cmccabe.xyz>
  • Loading branch information
hachikuji committed Jan 22, 2019
1 parent df96b7a commit 59cdb69
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ public String toString() {
// put in the purgatory (due to client throttling, it can take a while before the response is sent).
// Care should be taken in methods that use this field.
private volatile Map<TopicPartition, MemoryRecords> partitionRecords;
private boolean transactional = false;
private boolean idempotent = false;
private boolean hasTransactionalRecords = false;
private boolean hasIdempotentRecords = false;

private ProduceRequest(short version, short acks, int timeout, Map<TopicPartition, MemoryRecords> partitionRecords, String transactionalId) {
super(version);
Expand Down Expand Up @@ -167,8 +167,8 @@ private void validateRecords(short version, MemoryRecords records) {
if (iterator.hasNext())
throw new InvalidRecordException("Produce requests with version " + version + " are only allowed to " +
"contain exactly one record batch");
idempotent = entry.hasProducerId();
transactional = entry.isTransactional();
hasIdempotentRecords = hasIdempotentRecords || entry.hasProducerId();
hasTransactionalRecords = hasTransactionalRecords || entry.isTransactional();
}

// Note that we do not do similar validation for older versions to ensure compatibility with
Expand Down Expand Up @@ -269,12 +269,12 @@ public String transactionalId() {
return transactionalId;
}

public boolean isTransactional() {
return transactional;
public boolean hasTransactionalRecords() {
return hasTransactionalRecords;
}

public boolean isIdempotent() {
return idempotent;
public boolean hasIdempotentRecords() {
return hasIdempotentRecords;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ public void testClusterAuthorizationExceptionInProduceRequest() throws Exception
client.prepareResponse(new MockClient.RequestMatcher() {
@Override
public boolean matches(AbstractRequest body) {
return body instanceof ProduceRequest && ((ProduceRequest) body).isIdempotent();
return body instanceof ProduceRequest && ((ProduceRequest) body).hasIdempotentRecords();
}
}, produceResponse(tp0, -1, Errors.CLUSTER_AUTHORIZATION_FAILED, 0));

Expand Down Expand Up @@ -514,7 +514,7 @@ public void testUnsupportedForMessageFormatInProduceRequest() throws Exception {
client.prepareResponse(new MockClient.RequestMatcher() {
@Override
public boolean matches(AbstractRequest body) {
return body instanceof ProduceRequest && ((ProduceRequest) body).isIdempotent();
return body instanceof ProduceRequest && ((ProduceRequest) body).hasIdempotentRecords();
}
}, produceResponse(tp0, -1, Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, 0));

Expand Down Expand Up @@ -546,7 +546,7 @@ public void testUnsupportedVersionInProduceRequest() throws Exception {
client.prepareUnsupportedVersionResponse(new MockClient.RequestMatcher() {
@Override
public boolean matches(AbstractRequest body) {
return body instanceof ProduceRequest && ((ProduceRequest) body).isIdempotent();
return body instanceof ProduceRequest && ((ProduceRequest) body).hasIdempotentRecords();
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.junit.Test;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
Expand All @@ -49,38 +51,84 @@ public void shouldBeFlaggedAsTransactionalWhenTransactionalRecords() throws Exce
10,
Collections.singletonMap(
new TopicPartition("topic", 1), memoryRecords)).build();
assertTrue(request.isTransactional());
assertTrue(request.hasTransactionalRecords());
}

@Test
public void shouldNotBeFlaggedAsTransactionalWhenNoRecords() throws Exception {
final ProduceRequest request = createNonIdempotentNonTransactionalRecords();
assertFalse(request.isTransactional());
assertFalse(request.hasTransactionalRecords());
}

@Test
public void shouldNotBeFlaggedAsIdempotentWhenRecordsNotIdempotent() throws Exception {
final ProduceRequest request = createNonIdempotentNonTransactionalRecords();
assertFalse(request.isTransactional());
assertFalse(request.hasTransactionalRecords());
}

@Test
public void shouldBeFlaggedAsIdempotentWhenIdempotentRecords() throws Exception {
final MemoryRecords memoryRecords = MemoryRecords.withIdempotentRecords(1,
CompressionType.NONE,
1L,
(short) 1,
1,
1,
simpleRecord);
CompressionType.NONE,
1L,
(short) 1,
1,
1,
simpleRecord);

final ProduceRequest request = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE,
(short) -1,
10,
Collections.singletonMap(
new TopicPartition("topic", 1), memoryRecords)).build();
assertTrue(request.isIdempotent());
(short) -1,
10,
Collections.singletonMap(
new TopicPartition("topic", 1), memoryRecords)).build();
assertTrue(request.hasIdempotentRecords());
}

@Test
public void testMixedTransactionalData() {
final long producerId = 15L;
final short producerEpoch = 5;
final int sequence = 10;
final String transactionalId = "txnlId";

final MemoryRecords nonTxnRecords = MemoryRecords.withRecords(CompressionType.NONE,
new SimpleRecord("foo".getBytes()));
final MemoryRecords txnRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId,
producerEpoch, sequence, new SimpleRecord("bar".getBytes()));

final Map<TopicPartition, MemoryRecords> recordsByPartition = new LinkedHashMap<>();
recordsByPartition.put(new TopicPartition("foo", 0), txnRecords);
recordsByPartition.put(new TopicPartition("foo", 1), nonTxnRecords);

final ProduceRequest.Builder builder = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, (short) -1, 5000,
recordsByPartition, transactionalId);

final ProduceRequest request = builder.build();
assertTrue(request.hasTransactionalRecords());
assertTrue(request.hasIdempotentRecords());
}

@Test
public void testMixedIdempotentData() {
final long producerId = 15L;
final short producerEpoch = 5;
final int sequence = 10;

final MemoryRecords nonTxnRecords = MemoryRecords.withRecords(CompressionType.NONE,
new SimpleRecord("foo".getBytes()));
final MemoryRecords txnRecords = MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId,
producerEpoch, sequence, new SimpleRecord("bar".getBytes()));

final Map<TopicPartition, MemoryRecords> recordsByPartition = new LinkedHashMap<>();
recordsByPartition.put(new TopicPartition("foo", 0), txnRecords);
recordsByPartition.put(new TopicPartition("foo", 1), nonTxnRecords);

final ProduceRequest.Builder builder = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, (short) -1, 5000,
recordsByPartition, null);

final ProduceRequest request = builder.build();
assertFalse(request.hasTransactionalRecords());
assertTrue(request.hasIdempotentRecords());
}

private ProduceRequest createNonIdempotentNonTransactionalRecords() {
Expand Down
8 changes: 5 additions & 3 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -378,14 +378,16 @@ class KafkaApis(val requestChannel: RequestChannel,
produceRequest.getErrorResponse(requestThrottleMs, error.exception))
}

if (produceRequest.isTransactional) {
if (!authorize(request.session, Write, new Resource(TransactionalId, produceRequest.transactionalId))) {
if (produceRequest.hasTransactionalRecords) {
val isAuthorizedTransactional = produceRequest.transactionalId != null &&
authorize(request.session, Write, Resource(TransactionalId, produceRequest.transactionalId))
if (!isAuthorizedTransactional) {
sendErrorResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED)
return
}
// Note that authorization to a transactionalId implies ProducerId authorization

} else if (produceRequest.isIdempotent && !authorize(request.session, IdempotentWrite, Resource.ClusterResource)) {
} else if (produceRequest.hasIdempotentRecords && !authorize(request.session, IdempotentWrite, Resource.ClusterResource)) {
sendErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED)
return
}
Expand Down

0 comments on commit 59cdb69

Please sign in to comment.