Skip to content

Commit 625da65

Browse files
dlg99nicoloboschi
authored andcommitted
KCA to use index (if available) instead of sequence and to handle batched messages non-unique sequenceIds (apache#16098)
(cherry picked from commit a18c01d)
1 parent 8281aca commit 625da65

3 files changed

Lines changed: 260 additions & 4 deletions

File tree

pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java

Lines changed: 66 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,13 @@
4848
import org.apache.kafka.connect.sink.SinkConnector;
4949
import org.apache.kafka.connect.sink.SinkRecord;
5050
import org.apache.kafka.connect.sink.SinkTask;
51-
import org.apache.pulsar.client.api.schema.KeyValueSchema;
51+
import org.apache.pulsar.client.api.MessageId;
5252
import org.apache.pulsar.client.api.SubscriptionType;
5353
import org.apache.pulsar.client.api.schema.GenericObject;
54+
import org.apache.pulsar.client.api.schema.KeyValueSchema;
55+
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
56+
import org.apache.pulsar.client.impl.MessageIdImpl;
57+
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
5458
import org.apache.pulsar.common.schema.KeyValue;
5559
import org.apache.pulsar.common.schema.SchemaType;
5660
import org.apache.pulsar.functions.api.Record;
@@ -93,6 +97,9 @@ public class KafkaConnectSink implements Sink<GenericObject> {
9397
CacheBuilder.newBuilder().maximumSize(1000)
9498
.expireAfterAccess(30, TimeUnit.MINUTES).build();
9599

100+
private int maxBatchBitsForOffset = 12;
101+
private boolean useIndexAsOffset = true;
102+
96103
@Override
97104
public void write(Record<GenericObject> sourceRecord) {
98105
if (log.isDebugEnabled()) {
@@ -149,6 +156,11 @@ public void open(Map<String, Object> config, SinkContext ctx) throws Exception {
149156
unwrapKeyValueIfAvailable = kafkaSinkConfig.isUnwrapKeyValueIfAvailable();
150157
sanitizeTopicName = kafkaSinkConfig.isSanitizeTopicName();
151158

159+
useIndexAsOffset = kafkaSinkConfig.isUseIndexAsOffset();
160+
maxBatchBitsForOffset = kafkaSinkConfig.getMaxBatchBitsForOffset();
161+
Preconditions.checkArgument(maxBatchBitsForOffset <= 20,
162+
"Cannot use more than 20 bits for maxBatchBitsForOffset");
163+
152164
String kafkaConnectorFQClassName = kafkaSinkConfig.getKafkaConnectorSinkClass();
153165
kafkaSinkConfig.getKafkaConnectorConfigProperties().forEach(props::put);
154166

@@ -242,6 +254,58 @@ private void ackUntil(Record<GenericObject> lastNotFlushed, java.util.function.C
242254
}
243255
}
244256

257+
private long getMessageOffset(Record<GenericObject> sourceRecord) {
258+
259+
if (sourceRecord.getMessage().isPresent()) {
260+
// Use index added by org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor if present.
261+
// Requires enableExposingBrokerEntryMetadataToClient=true on brokers.
262+
if (useIndexAsOffset && sourceRecord.getMessage().get().hasIndex()) {
263+
return sourceRecord.getMessage().get()
264+
.getIndex().orElse(-1L);
265+
}
266+
267+
MessageId messageId = sourceRecord.getMessage().get().getMessageId();
268+
MessageIdImpl msgId = (MessageIdImpl) ((messageId instanceof TopicMessageIdImpl)
269+
? ((TopicMessageIdImpl) messageId).getInnerMessageId()
270+
: messageId);
271+
272+
// sourceRecord.getRecordSequence() is not unique
273+
// for the messages from the same batch.
274+
// Special case for FunctionCommon.getSequenceId()
275+
if (maxBatchBitsForOffset > 0 && msgId instanceof BatchMessageIdImpl) {
276+
BatchMessageIdImpl batchMsgId = (BatchMessageIdImpl) msgId;
277+
long ledgerId = batchMsgId.getLedgerId();
278+
long entryId = batchMsgId.getEntryId();
279+
280+
if (entryId > (1 << (28 - maxBatchBitsForOffset))) {
281+
log.error("EntryId of the message {} over max, chance of duplicate offsets", entryId);
282+
}
283+
284+
int batchIdx = batchMsgId.getBatchIndex();
285+
286+
if (batchIdx < 0) {
287+
// Should not happen unless data corruption
288+
log.error("BatchIdx {} of the message is negative, chance of duplicate offsets", batchIdx);
289+
batchIdx = 0;
290+
}
291+
if (batchIdx > (1 << maxBatchBitsForOffset)) {
292+
log.error("BatchIdx of the message {} over max, chance of duplicate offsets", batchIdx);
293+
}
294+
// Combine entry id and batchIdx
295+
entryId = (entryId << maxBatchBitsForOffset) | batchIdx;
296+
297+
// The same as FunctionCommon.getSequenceId():
298+
// Combine ledger id and entry id to form offset
299+
// Use less than 32 bits to represent entry id since it will get
300+
// rolled over way before overflowing the max int range
301+
long offset = (ledgerId << 28) | entryId;
302+
return offset;
303+
}
304+
}
305+
return sourceRecord.getRecordSequence()
306+
.orElse(-1L);
307+
}
308+
245309
@SuppressWarnings("rawtypes")
246310
protected SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {
247311
final int partition = sourceRecord.getPartitionIndex().orElse(0);
@@ -284,8 +348,7 @@ protected SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {
284348
value = KafkaConnectData.getKafkaConnectData(sourceRecord.getValue().getNativeObject(), valueSchema);
285349
}
286350

287-
long offset = sourceRecord.getRecordSequence()
288-
.orElse(-1L);
351+
long offset = getMessageOffset(sourceRecord);
289352
if (offset < 0) {
290353
log.error("Message without sequenceId. Key: {} Value: {}", key, value);
291354
throw new IllegalStateException("Message without sequenceId");

pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,20 @@ public class PulsarKafkaConnectSinkConfig implements Serializable {
7474
help = "In case of Record<KeyValue<>> data use key from KeyValue<> instead of one from Record.")
7575
private boolean unwrapKeyValueIfAvailable = true;
7676

77+
@FieldDoc(
78+
defaultValue = "true",
79+
help = "Allows use of message index instead of message sequenceId as offset, if available.\n"
80+
+ "Requires AppendIndexMetadataInterceptor and "
81+
+ "enableExposingBrokerEntryMetadataToClient=true on brokers.")
82+
private boolean useIndexAsOffset = true;
83+
84+
@FieldDoc(
85+
defaultValue = "12",
86+
help = "Number of bits (0 to 20) to use for index of message in the batch for translation into an offset.\n"
87+
+ "0 to disable this behavior (Messages from the same batch will have the same "
88+
+ "offset which can affect some connectors.)")
89+
private int maxBatchBitsForOffset = 12;
90+
7791
@FieldDoc(
7892
defaultValue = "false",
7993
help = "Some connectors cannot handle pulsar topic names like persistent://a/b/topic"

pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java

Lines changed: 180 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.pulsar.client.api.schema.GenericObject;
4747
import org.apache.pulsar.client.api.schema.GenericRecord;
4848
import org.apache.pulsar.client.api.schema.SchemaDefinition;
49+
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
4950
import org.apache.pulsar.client.impl.MessageIdImpl;
5051
import org.apache.pulsar.client.impl.MessageImpl;
5152
import org.apache.pulsar.client.impl.schema.AvroSchema;
@@ -75,9 +76,12 @@
7576
import java.nio.file.Paths;
7677
import java.util.AbstractMap;
7778
import java.util.HashMap;
79+
import java.util.HashSet;
7880
import java.util.LinkedHashMap;
7981
import java.util.List;
8082
import java.util.Map;
83+
import java.util.Optional;
84+
import java.util.Set;
8185
import java.util.UUID;
8286
import java.util.concurrent.atomic.AtomicInteger;
8387
import java.util.concurrent.atomic.AtomicLong;
@@ -91,6 +95,7 @@
9195
import static org.mockito.Mockito.verify;
9296
import static org.mockito.Mockito.when;
9397
import static org.testng.Assert.assertEquals;
98+
import static org.testng.Assert.assertFalse;
9499
import static org.testng.Assert.assertNotEquals;
95100
import static org.testng.Assert.assertTrue;
96101
import static org.testng.Assert.fail;
@@ -929,11 +934,16 @@ public void wrongKeyValueSchemaTest() throws Exception {
929934

930935
@Test
931936
public void offsetTest() throws Exception {
937+
props.put("useIndexAsOffset", "true");
938+
props.put("maxBatchBitsForOffset", "12");
939+
940+
final AtomicLong ledgerId = new AtomicLong(0L);
932941
final AtomicLong entryId = new AtomicLong(0L);
933942
final GenericRecord rec = getGenericRecord("value", Schema.STRING);
934943
Message msg = mock(MessageImpl.class);
935944
when(msg.getValue()).thenReturn(rec);
936-
when(msg.getMessageId()).then(x -> new MessageIdImpl(0, entryId.getAndIncrement(), 0));
945+
when(msg.getMessageId()).then(x -> new MessageIdImpl(ledgerId.get(), entryId.get(), 0));
946+
when(msg.hasIndex()).thenReturn(false);
937947

938948
final String topicName = "testTopic";
939949
final int partition = 1;
@@ -960,6 +970,7 @@ public void offsetTest() throws Exception {
960970
// offset is 0 for the first written record
961971
assertEquals(0, sink.currentOffset(topicName, partition));
962972

973+
entryId.set(1);
963974
sink.write(record);
964975
sink.flush();
965976
// offset is 1 for the second written record
@@ -977,11 +988,179 @@ public void offsetTest() throws Exception {
977988
// offset is 1 after reopening the producer
978989
assertEquals(1, sink.currentOffset(topicName, partition));
979990

991+
entryId.set(2);
980992
sink.write(record);
981993
sink.flush();
982994
// offset is 2 for the next written record
983995
assertEquals(2, sink.currentOffset(topicName, partition));
984996

997+
998+
// use index
999+
entryId.set(999);
1000+
when(msg.hasIndex()).thenReturn(true);
1001+
when(msg.getIndex()).thenReturn(Optional.of(777L));
1002+
1003+
sink.write(record);
1004+
sink.flush();
1005+
// offset is 777 for the next written record according to index
1006+
assertEquals(sink.currentOffset(topicName, partition), 777);
1007+
1008+
final AtomicInteger batchIdx = new AtomicInteger(2);
1009+
1010+
entryId.set(3);
1011+
when(msg.getMessageId()).then(x -> new BatchMessageIdImpl(0, entryId.get(), 0, batchIdx.get()));
1012+
when(msg.hasIndex()).thenReturn(false);
1013+
sink.write(record);
1014+
sink.flush();
1015+
// offset is the batch message id includes batch
1016+
// (3 << 12) | 2
1017+
assertEquals(sink.currentOffset(topicName, partition), 12290);
1018+
1019+
// batch too large
1020+
batchIdx.set(Integer.MAX_VALUE);
1021+
sink.write(record);
1022+
sink.flush();
1023+
assertEquals(sink.currentOffset(topicName, partition), 2147483647L);
1024+
1025+
// batch too large, entryId changed,
1026+
// offset stays the same
1027+
entryId.incrementAndGet();
1028+
sink.write(record);
1029+
sink.flush();
1030+
assertEquals(sink.currentOffset(topicName, partition), 2147483647L);
1031+
1032+
// max usable bits for ledger: 64 - 28 used for entry + batch
1033+
long lastLedger = 1 << (64 - 28);
1034+
// max usable bits for ledger: 28 - 12 used for batch
1035+
long lastEntry = 1 << (28 - 12);
1036+
ledgerId.set(lastLedger);
1037+
entryId.set(lastEntry);
1038+
Set<Long> seenOffsets = new HashSet<>(4096);
1039+
// offsets are unique
1040+
for (int i = 0; i < 4096; i++) {
1041+
batchIdx.set(i);
1042+
sink.write(record);
1043+
sink.flush();
1044+
long offset = sink.currentOffset(topicName, partition);
1045+
assertFalse(seenOffsets.contains(offset));
1046+
seenOffsets.add(offset);
1047+
}
1048+
1049+
ledgerId.set(0);
1050+
entryId.set(0);
1051+
seenOffsets.clear();
1052+
// offsets are unique
1053+
for (int i = 0; i < 4096; i++) {
1054+
batchIdx.set(i);
1055+
sink.write(record);
1056+
sink.flush();
1057+
long offset = sink.currentOffset(topicName, partition);
1058+
assertFalse(seenOffsets.contains(offset));
1059+
seenOffsets.add(offset);
1060+
}
1061+
1062+
sink.close();
1063+
}
1064+
1065+
@Test
1066+
public void offsetNoIndexNoBatchTest() throws Exception {
1067+
props.put("useIndexAsOffset", "false");
1068+
props.put("maxBatchBitsForOffset", "0");
1069+
1070+
final AtomicLong ledgerId = new AtomicLong(0L);
1071+
final AtomicLong entryId = new AtomicLong(0L);
1072+
final GenericRecord rec = getGenericRecord("value", Schema.STRING);
1073+
Message msg = mock(MessageImpl.class);
1074+
when(msg.getValue()).thenReturn(rec);
1075+
when(msg.getMessageId()).then(x -> new MessageIdImpl(ledgerId.get(), entryId.get(), 0));
1076+
when(msg.hasIndex()).thenReturn(false);
1077+
1078+
final String topicName = "testTopic";
1079+
final int partition = 1;
1080+
final AtomicInteger status = new AtomicInteger(0);
1081+
Record<GenericObject> record = PulsarRecord.<String>builder()
1082+
.topicName(topicName)
1083+
.partition(partition)
1084+
.message(msg)
1085+
.ackFunction(status::incrementAndGet)
1086+
.failFunction(status::decrementAndGet)
1087+
.schema(Schema.STRING)
1088+
.build();
1089+
1090+
KafkaConnectSink sink = new KafkaConnectSink();
1091+
when(context.getSubscriptionType()).thenReturn(SubscriptionType.Exclusive);
1092+
sink.open(props, context);
1093+
1094+
// offset is -1 before any data is written (aka no offset)
1095+
assertEquals(sink.currentOffset(topicName, partition), -1L);
1096+
1097+
sink.write(record);
1098+
sink.flush();
1099+
1100+
// offset is 0 for the first written record
1101+
assertEquals(sink.currentOffset(topicName, partition), 0);
1102+
1103+
entryId.set(1);
1104+
sink.write(record);
1105+
sink.flush();
1106+
// offset is 1 for the second written record
1107+
assertEquals(sink.currentOffset(topicName, partition), 1);
1108+
1109+
sink.close();
1110+
1111+
// close the producer, open again
1112+
sink = new KafkaConnectSink();
1113+
when(context.getPulsarClient()).thenReturn(PulsarClient.builder()
1114+
.serviceUrl(brokerUrl.toString())
1115+
.build());
1116+
sink.open(props, context);
1117+
1118+
// offset is 1 after reopening the producer
1119+
assertEquals(sink.currentOffset(topicName, partition), 1);
1120+
1121+
entryId.set(2);
1122+
sink.write(record);
1123+
sink.flush();
1124+
// offset is 2 for the next written record
1125+
assertEquals(sink.currentOffset(topicName, partition), 2);
1126+
1127+
// use of index is disabled
1128+
entryId.set(999);
1129+
when(msg.hasIndex()).thenReturn(true);
1130+
when(msg.getIndex()).thenReturn(Optional.of(777L));
1131+
1132+
sink.write(record);
1133+
sink.flush();
1134+
// offset is 999 for the next written record, index is disabled
1135+
assertEquals(sink.currentOffset(topicName, partition), 999);
1136+
1137+
final AtomicInteger batchIdx = new AtomicInteger(2);
1138+
1139+
entryId.set(3);
1140+
when(msg.getMessageId()).then(x -> new BatchMessageIdImpl(0, entryId.get(), 0, batchIdx.get()));
1141+
when(msg.hasIndex()).thenReturn(false);
1142+
sink.write(record);
1143+
sink.flush();
1144+
// offset does not includes batch - it disabled
1145+
assertEquals(sink.currentOffset(topicName, partition), 3);
1146+
1147+
// max usable bits for ledger: 64 - 28 used for entry + batch
1148+
long lastLedger = 1 << (64 - 28);
1149+
// max usable bits for ledger: 28 - 12 used for batch
1150+
long lastEntry = 1 << (28 - 12);
1151+
ledgerId.set(lastLedger);
1152+
entryId.set(lastEntry);
1153+
Set<Long> seenOffsets = new HashSet<>(4096);
1154+
// offsets are not unique
1155+
for (int i = 0; i < 4096; i++) {
1156+
batchIdx.set(i);
1157+
sink.write(record);
1158+
sink.flush();
1159+
long offset = sink.currentOffset(topicName, partition);
1160+
seenOffsets.add(offset);
1161+
}
1162+
assertEquals(seenOffsets.size(), 1);
1163+
9851164
sink.close();
9861165
}
9871166

0 commit comments

Comments
 (0)