Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compression must be applied during deferred schema preparation and enableBatching is enabled #9396

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,15 @@
import java.util.stream.Collectors;

import lombok.Cleanup;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.client.impl.ConsumerImpl;
Expand Down Expand Up @@ -3854,4 +3857,49 @@ public void testIncomingMessageSize(boolean isPartitioned) throws Exception {
Assert.assertEquals(size, 0);
});
}


@Data
@EqualsAndHashCode
public static class MyBean {
private String field;
}

@DataProvider(name = "enableBatching")
public static Object[] isEnableBatching() {
return new Object[]{false, true};
}

@Test(dataProvider = "enableBatching")
public void testSendCompressedWithDeferredSchemaSetup(boolean enableBatching) throws Exception {
log.info("-- Starting {} test --", methodName);

final String topic = "persistent://my-property/my-ns/deferredSchemaCompressed";
Consumer<GenericRecord> consumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
.topic(topic)
.subscriptionName("testsub")
.subscribe();

// initially we are not setting a Schema in the producer
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
.enableBatching(enableBatching)
.compressionType(CompressionType.LZ4)
.create();
MyBean payload = new MyBean();
payload.setField("aaaaaaaaaaaaaaaaaaaaaaaaa");

// now we send with a schema, but we have enabled compression and batching
// the producer will have to setup the schema and resume the send
producer.newMessage(Schema.AVRO(MyBean.class)).value(payload).send();
producer.close();

GenericRecord res = consumer.receive().getValue();
consumer.close();
for (org.apache.pulsar.client.api.schema.Field f : res.getFields()) {
log.info("field {} {}", f.getName(), res.getField(f));
assertEquals("field", f.getName());
assertEquals("aaaaaaaaaaaaaaaaaaaaaaaaa", res.getField(f));
}
assertEquals(1, res.getFields().size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,17 @@ CompletableFuture<MessageId> internalSendWithTxnAsync(Message<?> message, Transa
}
}

/**
* Compress the payload if compression is configured
* @param payload
* @return a new payload
*/
private ByteBuf applyCompression(ByteBuf payload) {
ByteBuf compressedPayload = compressor.encode(payload);
payload.release();
return compressedPayload;
}

public void sendAsync(Message<?> message, SendCallback callback) {
checkArgument(message instanceof MessageImpl);

Expand All @@ -383,11 +394,12 @@ public void sendAsync(Message<?> message, SendCallback callback) {

// If compression is enabled, we are compressing, otherwise it will simply use the same buffer
ByteBuf compressedPayload = payload;
boolean compressed = false;
// Batch will be compressed when closed
// If a message has a delayed delivery time, we'll always send it individually
if (!isBatchMessagingEnabled() || msgMetadata.hasDeliverAtTime()) {
eolivelli marked this conversation as resolved.
Show resolved Hide resolved
compressedPayload = compressor.encode(payload);
payload.release();
compressedPayload = applyCompression(payload);
compressed = true;

// validate msg-size (For batching this will be check at the batch completion size)
int compressedSize = compressedPayload.readableBytes();
Expand Down Expand Up @@ -442,7 +454,7 @@ public void sendAsync(Message<?> message, SendCallback callback) {
String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;
for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
serializeAndSendMessage(msg, payload, sequenceId, uuid, chunkId, totalChunks,
readStartIndex, ClientCnx.getMaxMessageSize(), compressedPayload,
readStartIndex, ClientCnx.getMaxMessageSize(), compressedPayload, compressed,
compressedPayload.readableBytes(), uncompressedSize, callback);
readStartIndex = ((chunkId + 1) * ClientCnx.getMaxMessageSize());
}
Expand All @@ -457,7 +469,7 @@ public void sendAsync(Message<?> message, SendCallback callback) {

private void serializeAndSendMessage(MessageImpl<?> msg, ByteBuf payload,
long sequenceId, String uuid, int chunkId, int totalChunks, int readStartIndex, int chunkMaxSizeInBytes, ByteBuf compressedPayload,
int compressedPayloadSize,
boolean compressed, int compressedPayloadSize,
int uncompressedSize, SendCallback callback) throws IOException, InterruptedException {
ByteBuf chunkPayload = compressedPayload;
MessageMetadata msgMetadata = msg.getMessageBuilder();
Expand Down Expand Up @@ -524,6 +536,11 @@ private void serializeAndSendMessage(MessageImpl<?> msg, ByteBuf payload,
doBatchSendAndAdd(msg, callback, payload);
}
} else {
// in this case compression has not been applied by the caller
// but we have to compress the payload if compression is configured
if (!compressed) {
chunkPayload = applyCompression(chunkPayload);
}
ByteBuf encryptedPayload = encryptMessage(msgMetadata, chunkPayload);

// When publishing during replication, we need to set the correct number of message in batch
Expand Down