From 51cd9144e82b9a31aa5ca4a8d5cf48ce1a505d6c Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Mon, 1 Aug 2016 18:13:14 -0700 Subject: [PATCH] Add crc32c-checksum verification on message header-payload --- .../yahoo/pulsar/broker/service/Consumer.java | 7 + .../yahoo/pulsar/broker/service/Producer.java | 58 ++--- .../service/PersistentTopicE2ETest.java | 7 +- .../pulsar/broker/service/ServerCnxTest.java | 3 +- .../api/SimpleProducerConsumerTest.java | 38 ++++ .../pulsar/client/impl/MessageIdTest.java | 215 +++++++++++++++++- .../pulsar/checksum/utils/Crc32cChecksum.java | 15 +- .../client/api/PulsarClientException.java | 6 + .../client/impl/BatchMessageContainer.java | 7 - .../yahoo/pulsar/client/impl/ClientCnx.java | 12 +- .../pulsar/client/impl/ConsumerImpl.java | 51 ++--- .../pulsar/client/impl/ProducerImpl.java | 210 ++++++++++++++++- pulsar-common/pom.xml | 1 - .../com/yahoo/pulsar/common/api/Commands.java | 87 +++++-- .../pulsar/common/api/DoubleByteBuf.java | 8 + .../pulsar/common/api/proto/PulsarApi.java | 75 +----- .../pulsar/common/util/XXHashChecksum.java | 37 --- pulsar-common/src/main/proto/PulsarApi.proto | 7 +- .../common/compression/CommandsTest.java | 95 ++++++++ .../compression/Crc32cChecksumTest.java | 1 - 20 files changed, 723 insertions(+), 217 deletions(-) delete mode 100644 pulsar-common/src/main/java/com/yahoo/pulsar/common/util/XXHashChecksum.java create mode 100644 pulsar-common/src/test/java/com/yahoo/pulsar/common/compression/CommandsTest.java diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java index 735e424fa8dfa..425c5b2b0050e 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java @@ -17,6 +17,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.yahoo.pulsar.broker.service.persistent.PersistentTopic.DATE_FORMAT; +import static com.yahoo.pulsar.common.api.Commands.readChecksum; import java.util.Date; import java.util.Iterator; @@ -36,6 +37,7 @@ import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import com.yahoo.pulsar.common.api.proto.PulsarApi.MessageIdData; +import com.yahoo.pulsar.common.api.proto.PulsarApi.ProtocolVersion; import com.yahoo.pulsar.common.naming.DestinationName; import com.yahoo.pulsar.common.policies.data.ConsumerStats; import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashMap; @@ -137,6 +139,11 @@ public ChannelPromise sendMessages(final List entries) { ByteBuf metadataAndPayload = entry.getDataBuffer(); + // skip checksum by incrementing reader-index if consumer-client doesn't support checksum verification + if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v6.getNumber()) { + readChecksum(metadataAndPayload); + } + // stats msgOut.recordEvent(metadataAndPayload.readableBytes()); diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Producer.java index ffa1607275b49..896a8675b08ce 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Producer.java @@ -17,6 +17,8 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.yahoo.pulsar.broker.service.persistent.PersistentTopic.DATE_FORMAT; +import static com.yahoo.pulsar.common.api.Commands.readChecksum; +import static com.yahoo.pulsar.common.api.Commands.hasChecksum; import java.util.Date; import java.util.concurrent.CompletableFuture; @@ -29,11 +31,11 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; import com.yahoo.pulsar.broker.service.Topic.PublishCallback; +import static com.yahoo.pulsar.checksum.utils.Crc32cChecksum.computeChecksum; import com.yahoo.pulsar.common.api.Commands; -import com.yahoo.pulsar.common.api.proto.PulsarApi.MessageMetadata; +import com.yahoo.pulsar.common.api.proto.PulsarApi.ServerError; import com.yahoo.pulsar.common.naming.DestinationName; import com.yahoo.pulsar.common.policies.data.PublisherStats; -import com.yahoo.pulsar.common.util.XXHashChecksum; import io.netty.buffer.ByteBuf; import io.netty.util.Recycler; @@ -110,7 +112,7 @@ public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndP if (!verifyChecksum(headersAndPayload)) { cnx.ctx().channel().eventLoop().execute(() -> { cnx.ctx().writeAndFlush( - Commands.newSendError(producerId, sequenceId, new Exception("Checksum failed on the broker"))); + Commands.newSendError(producerId, sequenceId, ServerError.ChecksumError, "Checksum failed on the broker")); cnx.completedSendOperation(); }); return; @@ -122,42 +124,28 @@ public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndP } private boolean verifyChecksum(ByteBuf headersAndPayload) { - MessageMetadata metadata = null; - int readerIndex = headersAndPayload.readerIndex(); - try { - metadata = Commands.parseMessageMetadata(headersAndPayload); - - if (!metadata.hasChecksum()) { - // if we do not have the checksum, we do not verify checksum and return true - if (log.isDebugEnabled()) { - log.debug("[{}] [{}] Payload does not have checksum to verify", topic, producerName); - } - return true; - } - - if (metadata.hasCompression()) { - // if the message is compressed, we do not verify checksum and return true - if (log.isDebugEnabled()) { - log.debug("[{}] [{}] Payload is compressed, not verifying checksum", topic, producerName); + + if (hasChecksum(headersAndPayload)) { + int checksum = readChecksum(headersAndPayload).intValue(); + int readerIndex = headersAndPayload.readerIndex(); + try { + long computedChecksum = computeChecksum(headersAndPayload); + if (checksum == computedChecksum) { + return true; + } else { + log.error("[{}] [{}] Failed to verify checksum", topic, producerName); + return false; } - return true; - } - long storedChecksum = metadata.getChecksum(); - long computedChecksum = XXHashChecksum.computeChecksum(headersAndPayload); - if (storedChecksum == computedChecksum) { - return true; - } else { - log.error("[{}] [{}] Failed to verify checksum", topic, producerName); + } finally { + headersAndPayload.readerIndex(readerIndex); } - } catch (Throwable t) { - log.error("[{}] [{}] Failed to verify checksum", topic, producerName, t); - } finally { - headersAndPayload.readerIndex(readerIndex); - if (metadata != null) { - metadata.recycle(); + } else { + // ignore if checksum is not available + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Payload does not have checksum to verify", topic, producerName); } + return true; } - return false; } private void startPublishOperation() { diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicE2ETest.java index 9032b359d0f0b..32f41f1f0d689 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicE2ETest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicE2ETest.java @@ -1028,8 +1028,7 @@ public void testBrokerTopicStats() throws Exception { assertTrue(msgInRate > 0); } - // TODO: Re-enable once header+payload checksum changes are merged - @Test(enabled = false) + @Test public void testPayloadCorruptionDetection() throws Exception { final String topicName = "persistent://prop/use/ns-abc/topic1"; @@ -1066,10 +1065,10 @@ public void testPayloadCorruptionDetection() throws Exception { } // We should only receive msg1 - Message msg = consumer.receive(10, TimeUnit.SECONDS); + Message msg = consumer.receive(1, TimeUnit.SECONDS); assertEquals(new String(msg.getData()), "message-1"); - while ((msg = consumer.receive(5, TimeUnit.SECONDS)) != null) { + while ((msg = consumer.receive(1, TimeUnit.SECONDS)) != null) { assertEquals(new String(msg.getData()), "message-1"); } } diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/ServerCnxTest.java index 8cff414f2b9dd..2cc9dbd29216c 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/ServerCnxTest.java @@ -71,6 +71,7 @@ import com.yahoo.pulsar.broker.service.persistent.PersistentTopic; import com.yahoo.pulsar.broker.service.utils.ClientChannelHelper; import com.yahoo.pulsar.common.api.Commands; +import com.yahoo.pulsar.common.api.Commands.ChecksumType; import com.yahoo.pulsar.common.api.proto.PulsarApi.AuthMethod; import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandConnected; @@ -519,7 +520,7 @@ public void testSendCommand() throws Exception { .setProducerName("prod-name").setSequenceId(0).build(); ByteBuf data = Unpooled.buffer(1024); - clientCommand = Commands.newSend(1, 0, 1, messageMetadata, data); + clientCommand = Commands.newSend(1, 0, 1, ChecksumType.None, messageMetadata, data); channel.writeInbound(Unpooled.copiedBuffer(clientCommand)); clientCommand.release(); diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java index 7828d82b09a07..f8b813bcaf3c2 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java @@ -967,6 +967,7 @@ private void receiveAsync(Consumer consumer, int totalMessage, int currentMessag * 2. Consumer has receive size (10) and receive message without acknowledging * 3. Consumer will stop receiving message after unAckThreshold = 500 * 4. Consumer acks messages and starts consuming remanining messages + * This testcase enables checksum sending while producing message and broker verifies the checksum for the message. * * @throws Exception */ @@ -1496,5 +1497,42 @@ public void testBlockUnackConsumerAckByDifferentConsumer() throws Exception { } } + @Test + public void testEnabledChecksumClient() throws Exception { + log.info("-- Starting {} test --", methodName); + + final int totalMsg = 10; + ConsumerConfiguration conf = new ConsumerConfiguration(); + conf.setSubscriptionType(SubscriptionType.Exclusive); + Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic1", "my-subscriber-name", + conf); + + ProducerConfiguration producerConf = new ProducerConfiguration(); + final int batchMessageDelayMs = 300; + if (batchMessageDelayMs != 0) { + producerConf.setBatchingEnabled(true); + producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS); + producerConf.setBatchingMaxMessages(5); + } + + Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1", producerConf); + for (int i = 0; i < totalMsg; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + Message msg = null; + Set messageSet = Sets.newHashSet(); + for (int i = 0; i < totalMsg; i++) { + msg = consumer.receive(5, TimeUnit.SECONDS); + String receivedMessage = new String(msg.getData()); + log.debug("Received message: [{}]", receivedMessage); + String expectedMessage = "my-message-" + i; + testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); + } + // Acknowledge the consumption of all messages at once + consumer.acknowledgeCumulative(msg); + consumer.close(); + log.info("-- Exiting {} test --", methodName); + } } diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/impl/MessageIdTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/impl/MessageIdTest.java index f88d75abafeb9..6d0bdae4eed14 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/impl/MessageIdTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/impl/MessageIdTest.java @@ -15,37 +15,64 @@ */ package com.yahoo.pulsar.client.impl; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +import java.lang.reflect.Field; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import com.yahoo.pulsar.broker.service.BrokerTestBase; import com.yahoo.pulsar.client.admin.PulsarAdminException; import com.yahoo.pulsar.client.api.Consumer; import com.yahoo.pulsar.client.api.Message; +import com.yahoo.pulsar.client.api.MessageBuilder; import com.yahoo.pulsar.client.api.MessageId; import com.yahoo.pulsar.client.api.Producer; +import com.yahoo.pulsar.client.api.ProducerConfiguration; import com.yahoo.pulsar.client.api.PulsarClientException; +import com.yahoo.pulsar.client.impl.ProducerImpl.OpSendMsg; +import com.yahoo.pulsar.common.api.Commands; +import com.yahoo.pulsar.common.api.Commands.ChecksumType; +import com.yahoo.pulsar.common.api.DoubleByteBuf; +import com.yahoo.pulsar.common.api.proto.PulsarApi.MessageMetadata; +import com.yahoo.pulsar.common.api.proto.PulsarApi.MessageMetadata.Builder; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.util.ResourceLeakDetector; public class MessageIdTest extends BrokerTestBase { private static final Logger log = LoggerFactory.getLogger(MessageIdTest.class); - @BeforeClass + @BeforeMethod @Override public void setup() throws Exception { baseSetup(); } - @AfterClass + @AfterMethod @Override protected void cleanup() throws Exception { internalCleanup(); @@ -55,7 +82,7 @@ protected void cleanup() throws Exception { public void producerSendAsync() throws PulsarClientException { // 1. Basic Config String key = "producerSendAsync"; - final String topicName = "persistent://property/cluster/namespace/topic-" + key; + final String topicName = "persistent://prop/cluster/namespace/topic-" + key; final String subscriptionName = "my-subscription-" + key; final String messagePredicate = "my-message-" + key + "-"; final int numberOfMessages = 30; @@ -108,7 +135,7 @@ public void producerSendAsync() throws PulsarClientException { public void producerSend() throws PulsarClientException { // 1. Basic Config String key = "producerSend"; - final String topicName = "persistent://property/cluster/namespace/topic-" + key; + final String topicName = "persistent://prop/cluster/namespace/topic-" + key; final String subscriptionName = "my-subscription-" + key; final String messagePredicate = "my-message-" + key + "-"; final int numberOfMessages = 30; @@ -143,7 +170,7 @@ public void producerSend() throws PulsarClientException { public void partitionedProducerSendAsync() throws PulsarClientException, PulsarAdminException { // 1. Basic Config String key = "partitionedProducerSendAsync"; - final String topicName = "persistent://property/cluster/namespace/topic-" + key; + final String topicName = "persistent://prop/cluster/namespace/topic-" + key; final String subscriptionName = "my-subscription-" + key; final String messagePredicate = "my-message-" + key + "-"; final int numberOfMessages = 30; @@ -190,7 +217,7 @@ public void partitionedProducerSendAsync() throws PulsarClientException, PulsarA public void partitionedProducerSend() throws PulsarClientException, PulsarAdminException { // 1. Basic Config String key = "partitionedProducerSend"; - final String topicName = "persistent://property/cluster/namespace/topic-" + key; + final String topicName = "persistent://prop/cluster/namespace/topic-" + key; final String subscriptionName = "my-subscription-" + key; final String messagePredicate = "my-message-" + key + "-"; final int numberOfMessages = 30; @@ -223,4 +250,176 @@ public void partitionedProducerSend() throws PulsarClientException, PulsarAdminE // it // consumer.unsubscribe();; } + + /** + * Verifies: different versions of broker-deployment (one broker understands Checksum and other + * doesn't in that case remove checksum before sending to broker-2) + * + * client first produce message with checksum and then retries to send message due to connection unavailable. But this time, if + * broker doesn't understand checksum: then client should remove checksum from the message before sending to broker. + * + * 1. stop broker + * 2. client compute checksum and add into message + * 3. produce 2 messages and corrupt 1 message + * 4. start broker with lower version (which doesn't support checksum) + * 5. client reconnects to broker and due to incompatibility of version: removes checksum from message + * 6. broker doesn't do checksum validation and persist message + * 7. client receives ack + * + * @throws Exception + */ + @Test + public void testChecksumVersionComptability() throws Exception { + final String topicName = "persistent://prop/use/ns-abc/topic1"; + + // 1. producer connect + Producer prod = pulsarClient.createProducer(topicName); + ProducerImpl producer = spy((ProducerImpl) prod); + // return higher version compare to broker : so, it forces client-producer to remove checksum from payload + doReturn(producer.brokerChecksumSupportedVersion() + 1).when(producer).brokerChecksumSupportedVersion(); + + Consumer consumer = pulsarClient.subscribe(topicName, "my-sub"); + + // Stop the broker, and publishes messages. Messages are accumulated in the producer queue and they're checksums + // would have already been computed. If we change the message content at that point, it should result in a + // checksum validation error + stopBroker(); + + // stop timer to auto-reconnect as let spy-Producer connect to broker manually so, spy-producer object can get + // mock-value from brokerChecksumSupportedVersion + ((PulsarClientImpl) pulsarClient).timer().stop(); + + Message msg1 = MessageBuilder.create().setContent("message-1".getBytes()).build(); + CompletableFuture future1 = producer.sendAsync(msg1); + + Message msg2 = MessageBuilder.create().setContent("message-2".getBytes()).build(); + CompletableFuture future2 = producer.sendAsync(msg2); + + // corrupt the message + msg2.getData()[msg2.getData().length - 1] = '3'; // new content would be 'message-3' + + // Restart the broker to have the messages published + startBroker(); + + // grab broker connection with mocked producer which has higher version compare to broker + producer.grabCnx(); + + try { + // it should not fail: as due to unsupported version of broker: client removes checksum and broker should + // ignore the checksum validation + future1.get(); + future2.get(); + } catch (Exception e) { + fail("Broker shouldn't verify checksum for corrupted message and it shouldn't fail"); + } + + ((ConsumerImpl) consumer).grabCnx(); + // We should only receive msg1 + Message msg = consumer.receive(1, TimeUnit.SECONDS); + assertEquals(new String(msg.getData()), "message-1"); + msg = consumer.receive(1, TimeUnit.SECONDS); + assertEquals(new String(msg.getData()), "message-3"); + + } + + /** + * Verifies: if message is corrupted before sending to broker and if broker gives checksum error: then + * 1. Client-Producer recomputes checksum with modified data + * 2. Retry message-send again + * 3. Broker verifies checksum + * 4. client receives send-ack success + * + * @throws Exception + */ + @Test + public void testCorruptMessageRemove() throws Exception { + + final String topicName = "persistent://prop/use/ns-abc/retry-topic"; + + ProducerConfiguration config = new ProducerConfiguration(); + config.setSendTimeout(10, TimeUnit.MINUTES); + // 1. producer connect + Producer prod = pulsarClient.createProducer(topicName, config); + ProducerImpl producer = spy((ProducerImpl) prod); + Field producerIdField = ProducerImpl.class.getDeclaredField("producerId"); + producerIdField.setAccessible(true); + long producerId = (long) producerIdField.get(producer); + producer.cnx().registerProducer(producerId, producer); // registered spy ProducerImpl + Consumer consumer = pulsarClient.subscribe(topicName, "my-sub"); + + // 2. Stop the broker, and publishes messages. Messages are accumulated in the producer queue and they're + // checksums + // would have already been computed. If we change the message content at that point, it should result in a + // checksum validation error + // enable checksum at producer + stopBroker(); + + Message msg = MessageBuilder.create().setContent("message-1".getBytes()).build(); + CompletableFuture future = producer.sendAsync(msg); + + // 3. corrupt the message + msg.getData()[msg.getData().length - 1] = '2'; // new content would be 'message-3' + + // 4. Restart the broker to have the messages published + startBroker(); + + try { + future.get(); + fail("send message should have failed with checksum excetion"); + } catch (Exception e) { + if (e.getCause() instanceof PulsarClientException.ChecksumException) { + //ok (callback should get checksum exception as message was modified and corrupt) + } else { + fail("Callback should have only failed with ChecksumException", e); + } + } + + // 5. Verify + + // (5.1) Verify: producer's recoverChecksumError and updateChecksum invoked + verify(producer, times(1)).recoverChecksumError(any(), anyLong()); + verify(producer, times(1)).verifyLocalBufferIsNotCorrupted(any()); + + + /** + * (5.3) verify: ProducerImpl.verifyLocalBufferIsNotCorrupted() => validates if message + * is corrupt + */ + MessageImpl msg2 = (MessageImpl) MessageBuilder.create().setContent("message-1".getBytes()).build(); + ByteBuf payload = msg2.getDataBuffer(); + Builder metadataBuilder = ((MessageImpl) msg).getMessageBuilder(); + MessageMetadata msgMetadata = metadataBuilder.setProducerName("test").setSequenceId(1).setPublishTime(10L) + .build(); + ByteBuf cmd = Commands.newSend(producerId, 1, 1, ChecksumType.Crc32c, msgMetadata, payload); + // (a) create OpSendMsg with message-data : "message-1" + OpSendMsg op = OpSendMsg.create(((MessageImpl) msg), cmd, 1, null); + // a.verify: as message is not corrupt: no need to update checksum + assertTrue(producer.verifyLocalBufferIsNotCorrupted(op)); + // (b) corrupt message + msg2.getData()[msg2.getData().length - 1] = '2'; // new content would be 'message-2' + // b. verify: as message is corrupt: update checksum + assertFalse(producer.verifyLocalBufferIsNotCorrupted(op)); + + assertEquals(producer.getPendingQueueSize(), 0); + + // [2] test-recoverChecksumError functionality + stopBroker(); + MessageImpl msg1 = (MessageImpl) MessageBuilder.create().setContent("message-1".getBytes()).build(); + future = producer.sendAsync(msg1); + ClientCnx cnx = spy(new ClientCnx((PulsarClientImpl)pulsarClient) {}); + String exc = "broker is already stopped"; + // when client-try to recover checksum by resending to broker: throw exception as broker is stopped + doThrow(new IllegalStateException(exc)).when(cnx).ctx(); + try { + producer.recoverChecksumError(cnx, 1); + fail("it should call : resendMessages() => which should throw above mocked exception"); + }catch(IllegalStateException e) { + assertEquals(exc, e.getMessage()); + } + + producer.close(); + consumer.close(); + producer = null; // clean reference of mocked producer + } + } diff --git a/pulsar-checksum/src/main/java/com/yahoo/pulsar/checksum/utils/Crc32cChecksum.java b/pulsar-checksum/src/main/java/com/yahoo/pulsar/checksum/utils/Crc32cChecksum.java index 8fe02cada2d42..3dd8213390641 100644 --- a/pulsar-checksum/src/main/java/com/yahoo/pulsar/checksum/utils/Crc32cChecksum.java +++ b/pulsar-checksum/src/main/java/com/yahoo/pulsar/checksum/utils/Crc32cChecksum.java @@ -17,21 +17,25 @@ import static com.scurrilous.circe.params.CrcParameters.CRC32C; -import java.nio.ByteBuffer; - import com.scurrilous.circe.IncrementalIntHash; import com.scurrilous.circe.crc.Sse42Crc32C; import com.scurrilous.circe.crc.StandardCrcProvider; import io.netty.buffer.ByteBuf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class Crc32cChecksum { + private static final Logger log = LoggerFactory.getLogger(Crc32cChecksum.class); private final static IncrementalIntHash CRC32C_HASH; static { if (Sse42Crc32C.isSupported()) { CRC32C_HASH = new Crc32cSse42Provider().getIncrementalInt(CRC32C); + if (log.isDebugEnabled()) { + log.debug("SSE4.2 CRC32C provider initialized"); + } } else { CRC32C_HASH = new StandardCrcProvider().getIncrementalInt(CRC32C); } @@ -56,6 +60,13 @@ public static int computeChecksum(ByteBuf payload) { } + /** + * Computes incremental checksum with input previousChecksum and input payload + * + * @param previousChecksum : previously computed checksum + * @param payload + * @return + */ public static int resumeChecksum(int previousChecksum, ByteBuf payload) { if (payload.hasMemoryAddress() && (CRC32C_HASH instanceof Sse42Crc32C)) { return CRC32C_HASH.resume(previousChecksum, payload.memoryAddress() + payload.readerIndex(), diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/PulsarClientException.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/PulsarClientException.java index 367c81ed26ed8..d8c0a13091153 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/PulsarClientException.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/PulsarClientException.java @@ -163,4 +163,10 @@ public ProducerBlockedQuotaExceededException(String msg) { super(msg); } } + + public static class ChecksumException extends PulsarClientException { + public ChecksumException(String msg) { + super(msg); + } + } } \ No newline at end of file diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/BatchMessageContainer.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/BatchMessageContainer.java index a027911a5dbc2..ea9d471e09aaf 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/BatchMessageContainer.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/BatchMessageContainer.java @@ -28,7 +28,6 @@ import com.yahoo.pulsar.common.api.proto.PulsarApi; import com.yahoo.pulsar.common.compression.CompressionCodec; import com.yahoo.pulsar.common.compression.CompressionCodecProvider; -import com.yahoo.pulsar.common.util.XXHashChecksum; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; @@ -114,12 +113,6 @@ ByteBuf getCompressedBatchMetadataAndPayload() { return compressedPayload; } - void setChecksum() { - checkArgument(!messageMetadata.hasChecksum()); - long checksum = XXHashChecksum.computeChecksum(batchedMessageMetadataAndPayload); - messageMetadata.setChecksum(checksum); - } - PulsarApi.MessageMetadata setBatchAndBuild() { messageMetadata.setNumMessagesInBatch(numMessagesInBatch); if (log.isDebugEnabled()) { diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ClientCnx.java index 1f9bb765b5a05..c7f6e921bd50b 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ClientCnx.java @@ -28,7 +28,7 @@ import com.yahoo.pulsar.client.api.PulsarClientException; import com.yahoo.pulsar.common.api.Commands; import com.yahoo.pulsar.common.api.PulsarHandler; -import com.yahoo.pulsar.common.api.proto.PulsarApi; +import com.yahoo.pulsar.common.api.proto.PulsarApi.ServerError; import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer; import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandCloseProducer; import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandConnected; @@ -195,7 +195,13 @@ protected void handleProducerSuccess(CommandProducerSuccess success) { @Override protected void handleSendError(CommandSendError sendError) { log.warn("{} Received send error from server: {}", ctx.channel(), sendError); - ctx.close(); + if (ServerError.ChecksumError.equals(sendError.getError())) { + long producerId = sendError.getProducerId(); + long sequenceId = sendError.getSequenceId(); + producers.get(producerId).recoverChecksumError(this, sequenceId); + } else { + ctx.close(); + } } @Override @@ -204,7 +210,7 @@ protected void handleError(CommandError error) { log.warn("{} Received error from server: {}", ctx.channel(), error.getMessage()); long requestId = error.getRequestId(); - if (error.getError() == PulsarApi.ServerError.ProducerBlockedQuotaExceededError) { + if (error.getError() == ServerError.ProducerBlockedQuotaExceededError) { log.warn("{} Producer creation has been blocked because backlog quota exceeded for producer topic", ctx.channel()); } diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java index 25de6defd975d..88c993a0070c5 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java @@ -52,13 +52,15 @@ import com.yahoo.pulsar.common.api.proto.PulsarApi.ProtocolVersion; import com.yahoo.pulsar.common.compression.CompressionCodec; import com.yahoo.pulsar.common.compression.CompressionCodecProvider; -import com.yahoo.pulsar.common.util.XXHashChecksum; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufUtil; import io.netty.util.Timeout; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; +import static com.yahoo.pulsar.common.api.Commands.readChecksum; +import static com.yahoo.pulsar.checksum.utils.Crc32cChecksum.computeChecksum; +import static com.yahoo.pulsar.common.api.Commands.hasChecksum; +import static com.yahoo.pulsar.common.api.Commands.readChecksum; public class ConsumerImpl extends ConsumerBase { @@ -584,6 +586,13 @@ void messageReceived(MessageIdData messageId, ByteBuf headersAndPayload, ClientC MessageMetadata msgMetadata = null; ByteBuf payload = headersAndPayload; + + if (!verifyChecksum(headersAndPayload, messageId)) { + // discard message with checksum error + discardCorruptedMessage(messageId, cnx, ValidationError.ChecksumMismatch); + return; + } + try { msgMetadata = Commands.parseMessageMetadata(payload); } catch (Throwable t) { @@ -597,11 +606,6 @@ void messageReceived(MessageIdData messageId, ByteBuf headersAndPayload, ClientC return; } - if (!verifyChecksum(messageId, msgMetadata, uncompressedPayload, cnx)) { - // Message discarded for checksum error - return; - } - final int numMessages = msgMetadata.getNumMessagesInBatch(); if (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch()) { @@ -810,28 +814,21 @@ private ByteBuf uncompressPayloadIfNeeded(MessageIdData messageId, MessageMetada } } - private boolean verifyChecksum(MessageIdData messageId, MessageMetadata msgMetadata, ByteBuf payload, - ClientCnx currentCnx) { - if (!msgMetadata.hasChecksum()) { - // No checksum to validate - return true; - } - - long storedChecksum = msgMetadata.getChecksum(); - long computedChecksum = XXHashChecksum.computeChecksum(payload); + private boolean verifyChecksum(ByteBuf headersAndPayload, MessageIdData messageId) { - if (storedChecksum == computedChecksum) { - return true; - } else { - log.error( - "[{}][{}] Checksum mismatch for message at {}:{}. Received content:\n{}" - + "\nReceived checksum: 0x{} -- Computed checksum: 0x{}", - topic, subscription, messageId.getLedgerId(), messageId.getEntryId(), - ByteBufUtil.prettyHexDump(payload), Long.toHexString(storedChecksum), - Long.toHexString(computedChecksum)); - discardCorruptedMessage(messageId, currentCnx, ValidationError.ChecksumMismatch); - return false; + if(hasChecksum(headersAndPayload)) { + int checksum = readChecksum(headersAndPayload).intValue(); + int computedChecksum = computeChecksum(headersAndPayload); + if (checksum != computedChecksum) { + log.error( + "[{}][{}] Checksum mismatch for message at {}:{}. Received checksum: 0x{}, Computed checksum: 0x{}", + topic, subscription, messageId.getLedgerId(), messageId.getEntryId(), + Long.toHexString(checksum), Integer.toHexString(computedChecksum)); + return false; + } } + + return true; } private void discardCorruptedMessage(MessageIdData messageId, ClientCnx currentCnx, diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ProducerImpl.java index 8b460262f5b1e..4d51ee438ba0a 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ProducerImpl.java @@ -16,7 +16,10 @@ package com.yahoo.pulsar.client.impl; import static com.google.common.base.Preconditions.checkArgument; +import static com.yahoo.pulsar.checksum.utils.Crc32cChecksum.computeChecksum; +import static com.yahoo.pulsar.checksum.utils.Crc32cChecksum.resumeChecksum; +import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; @@ -38,11 +41,13 @@ import com.yahoo.pulsar.client.api.ProducerConfiguration; import com.yahoo.pulsar.client.api.PulsarClientException; import com.yahoo.pulsar.common.api.Commands; +import com.yahoo.pulsar.common.api.Commands.ChecksumType; +import com.yahoo.pulsar.common.api.DoubleByteBuf; import com.yahoo.pulsar.common.api.proto.PulsarApi; import com.yahoo.pulsar.common.api.proto.PulsarApi.MessageMetadata; +import com.yahoo.pulsar.common.api.proto.PulsarApi.ProtocolVersion; import com.yahoo.pulsar.common.compression.CompressionCodec; import com.yahoo.pulsar.common.compression.CompressionCodecProvider; -import com.yahoo.pulsar.common.util.XXHashChecksum; import io.netty.buffer.ByteBuf; import io.netty.util.Recycler; @@ -50,6 +55,8 @@ import io.netty.util.ReferenceCountUtil; import io.netty.util.Timeout; import io.netty.util.TimerTask; +import static com.yahoo.pulsar.common.api.Commands.hasChecksum; +import static com.yahoo.pulsar.common.api.Commands.readChecksum; public class ProducerImpl extends ProducerBase implements TimerTask { @@ -183,10 +190,6 @@ public void sendAsync(Message message, SendCallback callback) { MessageMetadata.Builder msgMetadata = msg.getMessageBuilder(); ByteBuf payload = msg.getDataBuffer(); - if (!msgMetadata.hasChecksum()) { - msgMetadata.setChecksum(XXHashChecksum.computeChecksum(payload)); - } - // If compression is enabled, we are compressing, otherwise it will simply use the same buffer int uncompressedSize = payload.readableBytes(); ByteBuf compressedPayload = payload; @@ -233,7 +236,7 @@ public void sendAsync(Message message, SendCallback callback) { doBatchSendAndAdd(msg, callback, payload); } } else { - ByteBuf cmd = Commands.newSend(producerId, sequenceId, 1, msgMetadata.build(), compressedPayload); + ByteBuf cmd = sendMessage(producerId, sequenceId, 1, msgMetadata.build(), compressedPayload); msgMetadata.recycle(); final OpSendMsg op = OpSendMsg.create(msg, cmd, sequenceId, callback); @@ -266,6 +269,18 @@ public void sendAsync(Message message, SendCallback callback) { } } + private ByteBuf sendMessage(long producerId, long sequenceId, int numMessages, MessageMetadata msgMetadata, + ByteBuf compressedPayload) throws IOException { + ChecksumType checksumType; + if (clientCnx.get() == null + || clientCnx.get().getRemoteEndpointProtocolVersion() >= brokerChecksumSupportedVersion()) { + checksumType = ChecksumType.Crc32c; + } else { + checksumType = ChecksumType.None; + } + return Commands.newSend(producerId, sequenceId, numMessages, checksumType, msgMetadata, compressedPayload); + } + private void doBatchSendAndAdd(MessageImpl msg, SendCallback callback, ByteBuf payload) { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Closing out batch to accomodate large message with size {}", topic, producerName, @@ -484,7 +499,102 @@ void ackReceived(ClientCnx cnx, long sequenceId, long ledgerId, long entryId) { } } - private static final class OpSendMsg { + /** + * Checks message checksum to retry if message was corrupted while sending to broker. Recomputes checksum of the + * message header-payload again. + *
    + *
  • if matches with existing checksum: it means message was corrupt while sending to broker. So, resend message
  • + *
  • if doesn't match with existing checksum: it means message is already corrupt and can't retry again. So, fail + * send-message by failing callback
  • + *
+ * + * @param cnx + * @param sequenceId + */ + protected synchronized void recoverChecksumError(ClientCnx cnx, long sequenceId) { + OpSendMsg op = pendingMessages.peek(); + if (op == null) { + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Got send failure for timed out msg {}", topic, producerName, sequenceId); + } + } else { + long expectedSequenceId = op.sequenceId; + if (sequenceId == expectedSequenceId) { + boolean corrupted = !verifyLocalBufferIsNotCorrupted(op); + if (corrupted) { + // remove message from pendingMessages queue and fail callback + pendingMessages.remove(); + semaphore.release(op.numMessagesInBatch); + try { + op.callback.sendComplete( + new PulsarClientException.ChecksumException("Checksum failded on corrupt message")); + } catch (Throwable t) { + log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topic, producerName, + sequenceId, t); + } + ReferenceCountUtil.safeRelease(op.cmd); + op.recycle(); + return; + } else { + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Message is not corrupted, retry send-message with sequenceId {}", topic, + producerName, sequenceId); + } + } + + } else { + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Corrupt message is already timed out {}", topic, producerName, sequenceId); + } + } + } + // as msg is not corrupted : let producer resend pending-messages again including checksum failed message + resendMessages(cnx); + } + + /** + * Computes checksum again and verifies it against existing checksum. If checksum doesn't match it means that + * message is corrupt. + * + * @param op + * @return returns true only if message is not modified and computed-checksum is same as previous checksum else + * return false that means that message is corrupted. Returns true if checksum is not present. + */ + protected boolean verifyLocalBufferIsNotCorrupted(OpSendMsg op) { + DoubleByteBuf msg = getDoubleByteBuf(op.cmd); + + if (msg != null) { + ByteBuf headerFrame = msg.getFirst(); + msg.markReaderIndex(); + headerFrame.markReaderIndex(); + try { + // skip bytes up to checksum index + headerFrame.skipBytes(4); // skip [total-size] + int cmdSize = (int) headerFrame.readUnsignedInt(); + headerFrame.skipBytes(cmdSize); + // verify if checksum present + if (hasChecksum(headerFrame)) { + int checksum = readChecksum(headerFrame).intValue(); + // msg.readerIndex is already at header-payload index, Recompute checksum for headers-payload + int metadataChecksum = computeChecksum(headerFrame); + long computedChecksum = resumeChecksum(metadataChecksum, msg.getSecond()); + return checksum == computedChecksum; + } else { + log.warn("[{}] [{}] checksum is not present into message with id {}", topic, producerName, + op.sequenceId); + } + } finally { + headerFrame.resetReaderIndex(); + msg.resetReaderIndex(); + } + return true; + } else { + log.warn("[{}] Failed while casting {} into DoubleByteBuf", producerName, op.cmd.getClass().getName()); + return false; + } + } + + protected static final class OpSendMsg { MessageImpl msg; List msgs; ByteBuf cmd; @@ -681,6 +791,10 @@ private void resendMessages(ClientCnx cnx) { log.info("[{}] [{}] Re-Sending {} messages to server", topic, producerName, messagesToResend); for (OpSendMsg op : pendingMessages) { + + if (cnx.getRemoteEndpointProtocolVersion() < brokerChecksumSupportedVersion()) { + stripChecksum(op); + } op.cmd.retain(); if (log.isDebugEnabled()) { log.debug("[{}] [{}] Re-Sending message in cnx {}, sequenceId {}", topic, producerName, @@ -701,6 +815,60 @@ private void resendMessages(ClientCnx cnx) { }); } + /** + * Strips checksum from {@link OpSendMsg} command if present else ignore it. + * + * @param op + */ + private void stripChecksum(OpSendMsg op) { + op.cmd.markReaderIndex(); + int totalMsgBufSize = op.cmd.readableBytes(); + DoubleByteBuf msg = getDoubleByteBuf(op.cmd); + if (msg != null) { + ByteBuf headerFrame = msg.getFirst(); + ByteBuf payloadFrame = msg.getSecond(); + msg.markReaderIndex(); + headerFrame.markReaderIndex(); + payloadFrame.markReaderIndex(); + try { + headerFrame.skipBytes(4); // skip [total-size] + int cmdSize = (int) headerFrame.readUnsignedInt(); + + // verify if checksum present + headerFrame.skipBytes(cmdSize); + + if (!hasChecksum(headerFrame)) { + return; + } + + int headerSize = 4 + 4 + cmdSize; // [total-size] [cmd-length] [cmd-size] + int checksumSize = 4 + 2; // [magic-number] [checksum-size] + int checksumMark = (headerSize + checksumSize); // [header-size] [checksum-size] + int metaPayloadSize = (totalMsgBufSize - checksumMark); // metadataPayload = totalSize - checksumMark + int newTotalFrameSizeLength = 4 + cmdSize + metaPayloadSize; // new total-size without checksum + headerFrame.resetReaderIndex(); + int headerFrameSize = headerFrame.readableBytes(); + + headerFrame.setInt(0, newTotalFrameSizeLength); // rewrite new [total-size] + ByteBuf metadata = headerFrame.slice(checksumMark, headerFrameSize - checksumMark); // sliced only + // metadata + headerFrame.writerIndex(headerSize); // set headerFrame write-index to overwrite metadata over checksum + metadata.readBytes(headerFrame, metadata.readableBytes()); + headerFrame.capacity(headerFrameSize - checksumSize); // reduce capacity by removed checksum bytes + headerFrame.resetReaderIndex(); + + } finally { + op.cmd.resetReaderIndex(); + } + } else { + log.warn("[{}] Failed while casting {} into DoubleByteBuf", producerName, op.cmd.getClass().getName()); + } + } + + public int brokerChecksumSupportedVersion() { + return ProtocolVersion.v6.getNumber(); + } + @Override String getHandlerName() { return producerName; @@ -836,11 +1004,9 @@ private void batchMessageAndSend() { try { if (!batchMessageContainer.isEmpty()) { numMessagesInBatch = batchMessageContainer.numMessagesInBatch; - // checksum is on uncompressed payload for batch - batchMessageContainer.setChecksum(); ByteBuf compressedPayload = batchMessageContainer.getCompressedBatchMetadataAndPayload(); long sequenceId = batchMessageContainer.sequenceId; - ByteBuf cmd = Commands.newSend(producerId, sequenceId, batchMessageContainer.numMessagesInBatch, + ByteBuf cmd = sendMessage(producerId, sequenceId, batchMessageContainer.numMessagesInBatch, batchMessageContainer.setBatchAndBuild(), compressedPayload); op = OpSendMsg.create(batchMessageContainer.messages, cmd, sequenceId, @@ -881,6 +1047,30 @@ private void batchMessageAndSend() { } } + /** + * Casts input cmd to {@link DoubleByteBuf} + * + * Incase if leak-detection level is enabled: pulsar instruments {@link DoubleByteBuf} into LeakAwareByteBuf (type of {@link io.netty.buffer.WrappedByteBuf}) + * So, this method casts input cmd to {@link DoubleByteBuf} else retrieves it from LeakAwareByteBuf. + * + * @param cmd + * @return DoubleByteBuf or null in case failed to cast input {@link ByteBuf} + */ + private DoubleByteBuf getDoubleByteBuf(ByteBuf cmd) { + DoubleByteBuf msg = null; + if (cmd instanceof DoubleByteBuf) { + msg = (DoubleByteBuf) cmd; + } else { + try { + msg = (DoubleByteBuf) cmd.unwrap(); + } catch (Exception e) { + log.error("[{}] Failed while casting {} into DoubleByteBuf", producerName, cmd.getClass().getName(), + e); + } + } + return msg; + } + public long getDelayInMillis() { OpSendMsg firstMsg = pendingMessages.peek(); if (firstMsg != null) { diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml index 3dfb9793a672c..64b32be534a9b 100644 --- a/pulsar-common/pom.xml +++ b/pulsar-common/pom.xml @@ -66,7 +66,6 @@ com.yahoo.pulsar pulsar-checksum ${project.version} - test diff --git a/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/Commands.java index 76fa6569e371d..64f6ab3532060 100644 --- a/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/Commands.java +++ b/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/Commands.java @@ -18,10 +18,15 @@ import java.io.IOException; import com.google.protobuf.ByteString; +import static com.yahoo.pulsar.checksum.utils.Crc32cChecksum.computeChecksum; +import static com.yahoo.pulsar.checksum.utils.Crc32cChecksum.resumeChecksum; import com.yahoo.pulsar.common.api.proto.PulsarApi; import com.yahoo.pulsar.common.api.proto.PulsarApi.AuthMethod; import com.yahoo.pulsar.common.api.proto.PulsarApi.BaseCommand; +import com.yahoo.pulsar.common.api.proto.PulsarApi.BaseCommand.Type; import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandAck; +import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandAck.ValidationError; import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer; import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandCloseProducer; import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandConnect; @@ -38,16 +43,13 @@ import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSendError; import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSendReceipt; import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe; +import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSuccess; import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandUnsubscribe; import com.yahoo.pulsar.common.api.proto.PulsarApi.MessageIdData; import com.yahoo.pulsar.common.api.proto.PulsarApi.MessageMetadata; import com.yahoo.pulsar.common.api.proto.PulsarApi.ProtocolVersion; import com.yahoo.pulsar.common.api.proto.PulsarApi.ServerError; -import com.yahoo.pulsar.common.api.proto.PulsarApi.BaseCommand.Type; -import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; -import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandAck.ValidationError; -import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import com.yahoo.pulsar.common.util.protobuf.ByteBufCodedInputStream; import com.yahoo.pulsar.common.util.protobuf.ByteBufCodedOutputStream; @@ -60,6 +62,10 @@ import io.netty.util.Recycler.Handle; public class Commands { + + public static final short magicCrc32c = 0x0e01; + private static final int checksumSize = 4; + public static ByteBuf newConnect(String authMethodName, String authData) { return newConnect(authMethodName, authData, getCurrentProtocolVersion()); } @@ -184,20 +190,41 @@ public static ByteBuf newSendReceipt(long producerId, long sequenceId, long ledg } public static ByteBuf newSendError(long producerId, long sequenceId, Throwable t) { + return newSendError(producerId, sequenceId, ServerError.PersistenceError, t.getMessage()); + } + + public static ByteBuf newSendError(long producerId, long sequenceId, ServerError error, String errorMsg) { CommandSendError.Builder sendErrorBuilder = CommandSendError.newBuilder(); sendErrorBuilder.setProducerId(producerId); sendErrorBuilder.setSequenceId(sequenceId); - sendErrorBuilder.setError(ServerError.PersistenceError); - sendErrorBuilder.setMessage(t.getMessage()); + sendErrorBuilder.setError(error); + sendErrorBuilder.setMessage(errorMsg); CommandSendError sendError = sendErrorBuilder.build(); ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.SEND_ERROR).setSendError(sendError)); sendErrorBuilder.recycle(); sendError.recycle(); return res; } + + + public static boolean hasChecksum(ByteBuf buffer) { + return buffer.getShort(buffer.readerIndex()) == magicCrc32c; + } + + public static Long readChecksum(ByteBuf buffer) { + if(hasChecksum(buffer)) { + buffer.skipBytes(2); //skip magic bytes + return buffer.readUnsignedInt(); + } else{ + return null; + } + } public static MessageMetadata parseMessageMetadata(ByteBuf buffer) { try { + // initially reader-index may point to start_of_checksum : increment reader-index to start_of_metadata to parse + // metadata + readChecksum(buffer); int metadataSize = (int) buffer.readUnsignedInt(); int writerIndex = buffer.writerIndex(); @@ -230,8 +257,8 @@ public static ByteBuf newMessage(long consumerId, MessageIdData messageId, ByteB return res; } - public static ByteBuf newSend(long producerId, long sequenceId, int numMessages, MessageMetadata messageData, - ByteBuf payload) { + public static ByteBuf newSend(long producerId, long sequenceId, int numMessages, ChecksumType checksumType, + MessageMetadata messageData, ByteBuf payload) { CommandSend.Builder sendBuilder = CommandSend.newBuilder(); sendBuilder.setProducerId(producerId); sendBuilder.setSequenceId(sequenceId); @@ -241,7 +268,7 @@ public static ByteBuf newSend(long producerId, long sequenceId, int numMessages, CommandSend send = sendBuilder.build(); ByteBuf res = serializeCommandSendWithSize(BaseCommand.newBuilder().setType(Type.SEND).setSend(send), - messageData, payload); + checksumType, messageData, payload); send.recycle(); sendBuilder.recycle(); return res; @@ -406,18 +433,24 @@ private static ByteBuf serializeWithSize(BaseCommand.Builder cmdBuilder) { return buf; } - private static ByteBuf serializeCommandSendWithSize(BaseCommand.Builder cmdBuilder, MessageMetadata msgMetadata, + private static ByteBuf serializeCommandSendWithSize(BaseCommand.Builder cmdBuilder, ChecksumType checksumType, MessageMetadata msgMetadata, ByteBuf payload) { // / Wire format - // [TOTAL_SIZE] [CMD_SIZE][CMD] [METADATA_SIZE][METADATA] [PAYLOAD] + // [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD] BaseCommand cmd = cmdBuilder.build(); int cmdSize = cmd.getSerializedSize(); int msgMetadataSize = msgMetadata.getSerializedSize(); int payloadSize = payload.readableBytes(); - int totalSize = 4 + cmdSize + 4 + msgMetadataSize + payloadSize; - int headersSize = 4 + 4 + cmdSize + 4 + msgMetadataSize; - + int magicAndChecksumLength = ChecksumType.Crc32c.equals(checksumType) ? (2 + 4 /* magic + checksumLength*/) : 0; + boolean includeChecksum = magicAndChecksumLength > 0; + int headerContentSize = 4 + cmdSize + magicAndChecksumLength + 4 + msgMetadataSize; // cmdLength + cmdSize + magicLength + + // checksumSize + msgMetadataLength + + // msgMetadataSize + int totalSize = headerContentSize + payloadSize; + int headersSize = 4 + headerContentSize; // totalSize + headerLength + int checksumReaderIndex = -1; + ByteBuf headers = PooledByteBufAllocator.DEFAULT.buffer(headersSize, headersSize); headers.writeInt(totalSize); // External frame @@ -429,18 +462,36 @@ private static ByteBuf serializeCommandSendWithSize(BaseCommand.Builder cmdBuild cmd.writeTo(outStream); cmd.recycle(); cmdBuilder.recycle(); + + //Create checksum placeholder + if (includeChecksum) { + headers.writeShort(magicCrc32c); + checksumReaderIndex = headers.writerIndex(); + headers.writerIndex(headers.writerIndex() + checksumSize); //skip 4 bytes of checksum + } // Write metadata headers.writeInt(msgMetadataSize); msgMetadata.writeTo(outStream); - outStream.recycle(); } catch (IOException e) { // This is in-memory serialization, should not fail throw new RuntimeException(e); } - return DoubleByteBuf.get(headers, payload); + ByteBuf command = DoubleByteBuf.get(headers, payload); + + // write checksum at created checksum-placeholder + if (includeChecksum) { + headers.markReaderIndex(); + headers.readerIndex(checksumReaderIndex + checksumSize); + int metadataChecksum = computeChecksum(headers); + int computedChecksum = resumeChecksum(metadataChecksum, payload); + // set computed checksum + headers.setInt(checksumReaderIndex, computedChecksum); + headers.resetReaderIndex(); + } + return command; } public static long initBatchMessageMetadata(PulsarApi.MessageMetadata.Builder messageMetadata, @@ -566,4 +617,8 @@ public void recycle() { } } + public static enum ChecksumType { + Crc32c, + None; + } } diff --git a/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/DoubleByteBuf.java b/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/DoubleByteBuf.java index bcec00ad066cf..c2b20cf8a86ed 100644 --- a/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/DoubleByteBuf.java +++ b/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/DoubleByteBuf.java @@ -87,6 +87,14 @@ public static ByteBuf get(ByteBuf b1, ByteBuf b2) { return toLeakAwareBuffer(buf); } + public ByteBuf getFirst() { + return b1; + } + + public ByteBuf getSecond() { + return b2; + } + @Override public boolean isDirect() { return b1.isDirect() && b2.isDirect(); diff --git a/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/proto/PulsarApi.java index eef3511bbd1e9..82e262c0cf98a 100644 --- a/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/proto/PulsarApi.java +++ b/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/proto/PulsarApi.java @@ -63,6 +63,7 @@ public enum ServerError ServiceNotReady(6, 6), ProducerBlockedQuotaExceededError(7, 7), ProducerBlockedQuotaExceededException(8, 8), + ChecksumError(9, 9), ; public static final int UnknownError_VALUE = 0; @@ -74,6 +75,7 @@ public enum ServerError public static final int ServiceNotReady_VALUE = 6; public static final int ProducerBlockedQuotaExceededError_VALUE = 7; public static final int ProducerBlockedQuotaExceededException_VALUE = 8; + public static final int ChecksumError_VALUE = 9; public final int getNumber() { return value; } @@ -89,6 +91,7 @@ public static ServerError valueOf(int value) { case 6: return ServiceNotReady; case 7: return ProducerBlockedQuotaExceededError; case 8: return ProducerBlockedQuotaExceededException; + case 9: return ChecksumError; default: return null; } } @@ -166,6 +169,7 @@ public enum ProtocolVersion v3(3, 3), v4(4, 4), v5(5, 5), + v6(6, 6), ; public static final int v0_VALUE = 0; @@ -174,6 +178,7 @@ public enum ProtocolVersion public static final int v3_VALUE = 3; public static final int v4_VALUE = 4; public static final int v5_VALUE = 5; + public static final int v6_VALUE = 6; public final int getNumber() { return value; } @@ -186,6 +191,7 @@ public static ProtocolVersion valueOf(int value) { case 3: return v3; case 4: return v4; case 5: return v5; + case 6: return v6; default: return null; } } @@ -1227,10 +1233,6 @@ public interface MessageMetadataOrBuilder boolean hasUncompressedSize(); int getUncompressedSize(); - // optional sfixed64 checksum = 10; - boolean hasChecksum(); - long getChecksum(); - // optional int32 num_messages_in_batch = 11 [default = 1]; boolean hasNumMessagesInBatch(); int getNumMessagesInBatch(); @@ -1441,21 +1443,11 @@ public int getUncompressedSize() { return uncompressedSize_; } - // optional sfixed64 checksum = 10; - public static final int CHECKSUM_FIELD_NUMBER = 10; - private long checksum_; - public boolean hasChecksum() { - return ((bitField0_ & 0x00000080) == 0x00000080); - } - public long getChecksum() { - return checksum_; - } - // optional int32 num_messages_in_batch = 11 [default = 1]; public static final int NUM_MESSAGES_IN_BATCH_FIELD_NUMBER = 11; private int numMessagesInBatch_; public boolean hasNumMessagesInBatch() { - return ((bitField0_ & 0x00000100) == 0x00000100); + return ((bitField0_ & 0x00000080) == 0x00000080); } public int getNumMessagesInBatch() { return numMessagesInBatch_; @@ -1471,7 +1463,6 @@ private void initFields() { replicateTo_ = com.google.protobuf.LazyStringArrayList.EMPTY; compression_ = com.yahoo.pulsar.common.api.proto.PulsarApi.CompressionType.NONE; uncompressedSize_ = 0; - checksum_ = 0L; numMessagesInBatch_ = 1; } private byte memoizedIsInitialized = -1; @@ -1537,9 +1528,6 @@ public void writeTo(com.yahoo.pulsar.common.util.protobuf.ByteBufCodedOutputStre output.writeUInt32(9, uncompressedSize_); } if (((bitField0_ & 0x00000080) == 0x00000080)) { - output.writeSFixed64(10, checksum_); - } - if (((bitField0_ & 0x00000100) == 0x00000100)) { output.writeInt32(11, numMessagesInBatch_); } } @@ -1592,10 +1580,6 @@ public int getSerializedSize() { .computeUInt32Size(9, uncompressedSize_); } if (((bitField0_ & 0x00000080) == 0x00000080)) { - size += com.google.protobuf.CodedOutputStream - .computeSFixed64Size(10, checksum_); - } - if (((bitField0_ & 0x00000100) == 0x00000100)) { size += com.google.protobuf.CodedOutputStream .computeInt32Size(11, numMessagesInBatch_); } @@ -1730,10 +1714,8 @@ public Builder clear() { bitField0_ = (bitField0_ & ~0x00000080); uncompressedSize_ = 0; bitField0_ = (bitField0_ & ~0x00000100); - checksum_ = 0L; - bitField0_ = (bitField0_ & ~0x00000200); numMessagesInBatch_ = 1; - bitField0_ = (bitField0_ & ~0x00000400); + bitField0_ = (bitField0_ & ~0x00000200); return this; } @@ -1809,10 +1791,6 @@ public com.yahoo.pulsar.common.api.proto.PulsarApi.MessageMetadata buildPartial( if (((from_bitField0_ & 0x00000200) == 0x00000200)) { to_bitField0_ |= 0x00000080; } - result.checksum_ = checksum_; - if (((from_bitField0_ & 0x00000400) == 0x00000400)) { - to_bitField0_ |= 0x00000100; - } result.numMessagesInBatch_ = numMessagesInBatch_; result.bitField0_ = to_bitField0_; return result; @@ -1861,9 +1839,6 @@ public Builder mergeFrom(com.yahoo.pulsar.common.api.proto.PulsarApi.MessageMeta if (other.hasUncompressedSize()) { setUncompressedSize(other.getUncompressedSize()); } - if (other.hasChecksum()) { - setChecksum(other.getChecksum()); - } if (other.hasNumMessagesInBatch()) { setNumMessagesInBatch(other.getNumMessagesInBatch()); } @@ -1964,13 +1939,8 @@ public Builder mergeFrom( uncompressedSize_ = input.readUInt32(); break; } - case 81: { - bitField0_ |= 0x00000200; - checksum_ = input.readSFixed64(); - break; - } case 88: { - bitField0_ |= 0x00000400; + bitField0_ |= 0x00000200; numMessagesInBatch_ = input.readInt32(); break; } @@ -2320,43 +2290,22 @@ public Builder clearUncompressedSize() { return this; } - // optional sfixed64 checksum = 10; - private long checksum_ ; - public boolean hasChecksum() { - return ((bitField0_ & 0x00000200) == 0x00000200); - } - public long getChecksum() { - return checksum_; - } - public Builder setChecksum(long value) { - bitField0_ |= 0x00000200; - checksum_ = value; - - return this; - } - public Builder clearChecksum() { - bitField0_ = (bitField0_ & ~0x00000200); - checksum_ = 0L; - - return this; - } - // optional int32 num_messages_in_batch = 11 [default = 1]; private int numMessagesInBatch_ = 1; public boolean hasNumMessagesInBatch() { - return ((bitField0_ & 0x00000400) == 0x00000400); + return ((bitField0_ & 0x00000200) == 0x00000200); } public int getNumMessagesInBatch() { return numMessagesInBatch_; } public Builder setNumMessagesInBatch(int value) { - bitField0_ |= 0x00000400; + bitField0_ |= 0x00000200; numMessagesInBatch_ = value; return this; } public Builder clearNumMessagesInBatch() { - bitField0_ = (bitField0_ & ~0x00000400); + bitField0_ = (bitField0_ & ~0x00000200); numMessagesInBatch_ = 1; return this; diff --git a/pulsar-common/src/main/java/com/yahoo/pulsar/common/util/XXHashChecksum.java b/pulsar-common/src/main/java/com/yahoo/pulsar/common/util/XXHashChecksum.java deleted file mode 100644 index ffcc5db24e21b..0000000000000 --- a/pulsar-common/src/main/java/com/yahoo/pulsar/common/util/XXHashChecksum.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Copyright 2016 Yahoo Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.yahoo.pulsar.common.util; - -import java.nio.ByteBuffer; - -import io.netty.buffer.ByteBuf; -import net.jpountz.xxhash.XXHash64; -import net.jpountz.xxhash.XXHashFactory; - -public class XXHashChecksum { - - private static final XXHash64 checksum = XXHashFactory.fastestInstance().hash64(); - - public static long computeChecksum(ByteBuf payload) { - if (payload.hasArray()) { - return checksum.hash(payload.array(), payload.arrayOffset() + payload.readerIndex(), - payload.readableBytes(), 0L); - } else { - ByteBuffer payloadNio = payload.nioBuffer(payload.readerIndex(), payload.readableBytes()); - return checksum.hash(payloadNio, 0, payload.readableBytes(), 0L); - } - } -} diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 4844440516fea..5de4523979483 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -50,8 +50,9 @@ message MessageMetadata { repeated string replicate_to = 7; optional CompressionType compression = 8 [default = NONE]; optional uint32 uncompressed_size = 9 [default = 0]; - // XXHash64 checksum of the original message payload - optional sfixed64 checksum = 10; + // Removed below checksum field from Metadata as + // it should be part of send-command which keeps checksum of header + payload + //optional sfixed64 checksum = 10; // differentiate single and batch message metadata optional int32 num_messages_in_batch = 11 [default = 1]; } @@ -75,6 +76,7 @@ enum ServerError { ServiceNotReady = 6; // Any error that requires client retry operation with a fresh lookup ProducerBlockedQuotaExceededError = 7; // Unable to create producer because backlog quota exceeded ProducerBlockedQuotaExceededException = 8; // Exception while creating producer because quota exceeded + ChecksumError = 9; // Error while verifying message checksum } enum AuthMethod { @@ -92,6 +94,7 @@ enum ProtocolVersion { v3 = 3; // Added compression with LZ4 and ZLib v4 = 4; // Added batch message support v5 = 5; // Added disconnect client w/o closing connection + v6 = 6; // Added checksum computation for metadata + payload } message CommandConnect { diff --git a/pulsar-common/src/test/java/com/yahoo/pulsar/common/compression/CommandsTest.java b/pulsar-common/src/test/java/com/yahoo/pulsar/common/compression/CommandsTest.java new file mode 100644 index 0000000000000..e3b570d21be90 --- /dev/null +++ b/pulsar-common/src/test/java/com/yahoo/pulsar/common/compression/CommandsTest.java @@ -0,0 +1,95 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yahoo.pulsar.common.compression; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.testng.annotations.Test; + +import com.yahoo.pulsar.checksum.utils.Crc32cChecksum; +import com.yahoo.pulsar.common.api.Commands; +import com.yahoo.pulsar.common.api.Commands.ChecksumType; +import com.yahoo.pulsar.common.api.DoubleByteBuf; +import com.yahoo.pulsar.common.api.proto.PulsarApi.MessageMetadata; +import com.yahoo.pulsar.common.util.protobuf.ByteBufCodedOutputStream; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.Unpooled; + +public class CommandsTest { + + @Test + public void testChecksumSendCommand() throws Exception { + + // test checksum in send command + String producerName = "prod-name"; + int sequenceId = 0; + ByteBuf data = Unpooled.buffer(1024); + MessageMetadata messageMetadata = MessageMetadata.newBuilder().setPublishTime(System.currentTimeMillis()) + .setProducerName(producerName).setSequenceId(sequenceId).build(); + int expectedChecksum = computeChecksum(messageMetadata, data); + ByteBuf clientCommand = Commands.newSend(1, 0, 1, ChecksumType.Crc32c, messageMetadata, data); + clientCommand.retain(); + ByteBuffer inputBytes = clientCommand.nioBuffer(); + ByteBuf receivedBuf = Unpooled.wrappedBuffer(inputBytes); + receivedBuf.skipBytes(4); //skip [total-size] + int cmdSize = (int) receivedBuf.readUnsignedInt(); + receivedBuf.readerIndex(8 + cmdSize); + int startMessagePos = receivedBuf.readerIndex(); + + /*** 1. verify checksum and metadataParsing ***/ + boolean hasChecksum = Commands.hasChecksum(receivedBuf); + int checksum = Commands.readChecksum(receivedBuf).intValue(); + + + // verify checksum is present + assertTrue(hasChecksum); + // verify checksum value + assertEquals(expectedChecksum, checksum); + MessageMetadata metadata = Commands.parseMessageMetadata(receivedBuf); + // verify metadata parsing + assertEquals(metadata.getProducerName(), producerName); + + /** 2. parseMessageMetadata should skip checksum if present **/ + receivedBuf.readerIndex(startMessagePos); + metadata = Commands.parseMessageMetadata(receivedBuf); + // verify metadata parsing + assertEquals(metadata.getProducerName(), producerName); + + } + + private int computeChecksum(MessageMetadata msgMetadata, ByteBuf compressedPayload) throws IOException { + int metadataSize = msgMetadata.getSerializedSize(); + int metadataFrameSize = 4 + metadataSize; + ByteBuf metaPayloadFrame = PooledByteBufAllocator.DEFAULT.buffer(metadataFrameSize, metadataFrameSize); + ByteBufCodedOutputStream outStream = ByteBufCodedOutputStream.get(metaPayloadFrame); + metaPayloadFrame.writeInt(metadataSize); + msgMetadata.writeTo(outStream); + ByteBuf payload = compressedPayload.copy(); + ByteBuf metaPayloadBuf = DoubleByteBuf.get(metaPayloadFrame, payload); + int computedChecksum = Crc32cChecksum.computeChecksum(metaPayloadBuf); + outStream.recycle(); + metaPayloadBuf.release(); + return computedChecksum; + } + + +} diff --git a/pulsar-common/src/test/java/com/yahoo/pulsar/common/compression/Crc32cChecksumTest.java b/pulsar-common/src/test/java/com/yahoo/pulsar/common/compression/Crc32cChecksumTest.java index dea986f8bd8bd..ce3e9cdb504f2 100644 --- a/pulsar-common/src/test/java/com/yahoo/pulsar/common/compression/Crc32cChecksumTest.java +++ b/pulsar-common/src/test/java/com/yahoo/pulsar/common/compression/Crc32cChecksumTest.java @@ -148,7 +148,6 @@ public void testCrc32cIncrementalUsingProvider() { incrementalChecksum = Crc32cChecksum.resumeChecksum(checksum, payload); assertEquals(expectedChecksum, incrementalChecksum); payload.release(); - }