Skip to content

Commit

Permalink
Add crc32c-checksum verification on message header-payload
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia committed Oct 11, 2016
1 parent 95b8239 commit 51cd914
Show file tree
Hide file tree
Showing 20 changed files with 723 additions and 217 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.yahoo.pulsar.broker.service.persistent.PersistentTopic.DATE_FORMAT;
import static com.yahoo.pulsar.common.api.Commands.readChecksum;

import java.util.Date;
import java.util.Iterator;
Expand All @@ -36,6 +37,7 @@
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import com.yahoo.pulsar.common.api.proto.PulsarApi.MessageIdData;
import com.yahoo.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.policies.data.ConsumerStats;
import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashMap;
Expand Down Expand Up @@ -137,6 +139,11 @@ public ChannelPromise sendMessages(final List<Entry> entries) {

ByteBuf metadataAndPayload = entry.getDataBuffer();

// skip checksum by incrementing reader-index if consumer-client doesn't support checksum verification
if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v6.getNumber()) {
readChecksum(metadataAndPayload);
}

// stats
msgOut.recordEvent(metadataAndPayload.readableBytes());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static com.yahoo.pulsar.broker.service.persistent.PersistentTopic.DATE_FORMAT;
import static com.yahoo.pulsar.common.api.Commands.readChecksum;
import static com.yahoo.pulsar.common.api.Commands.hasChecksum;

import java.util.Date;
import java.util.concurrent.CompletableFuture;
Expand All @@ -29,11 +31,11 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.yahoo.pulsar.broker.service.Topic.PublishCallback;
import static com.yahoo.pulsar.checksum.utils.Crc32cChecksum.computeChecksum;
import com.yahoo.pulsar.common.api.Commands;
import com.yahoo.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import com.yahoo.pulsar.common.api.proto.PulsarApi.ServerError;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.policies.data.PublisherStats;
import com.yahoo.pulsar.common.util.XXHashChecksum;

import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
Expand Down Expand Up @@ -110,7 +112,7 @@ public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndP
if (!verifyChecksum(headersAndPayload)) {
cnx.ctx().channel().eventLoop().execute(() -> {
cnx.ctx().writeAndFlush(
Commands.newSendError(producerId, sequenceId, new Exception("Checksum failed on the broker")));
Commands.newSendError(producerId, sequenceId, ServerError.ChecksumError, "Checksum failed on the broker"));
cnx.completedSendOperation();
});
return;
Expand All @@ -122,42 +124,28 @@ public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndP
}

private boolean verifyChecksum(ByteBuf headersAndPayload) {
MessageMetadata metadata = null;
int readerIndex = headersAndPayload.readerIndex();
try {
metadata = Commands.parseMessageMetadata(headersAndPayload);

if (!metadata.hasChecksum()) {
// if we do not have the checksum, we do not verify checksum and return true
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Payload does not have checksum to verify", topic, producerName);
}
return true;
}

if (metadata.hasCompression()) {
// if the message is compressed, we do not verify checksum and return true
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Payload is compressed, not verifying checksum", topic, producerName);

if (hasChecksum(headersAndPayload)) {
int checksum = readChecksum(headersAndPayload).intValue();
int readerIndex = headersAndPayload.readerIndex();
try {
long computedChecksum = computeChecksum(headersAndPayload);
if (checksum == computedChecksum) {
return true;
} else {
log.error("[{}] [{}] Failed to verify checksum", topic, producerName);
return false;
}
return true;
}
long storedChecksum = metadata.getChecksum();
long computedChecksum = XXHashChecksum.computeChecksum(headersAndPayload);
if (storedChecksum == computedChecksum) {
return true;
} else {
log.error("[{}] [{}] Failed to verify checksum", topic, producerName);
} finally {
headersAndPayload.readerIndex(readerIndex);
}
} catch (Throwable t) {
log.error("[{}] [{}] Failed to verify checksum", topic, producerName, t);
} finally {
headersAndPayload.readerIndex(readerIndex);
if (metadata != null) {
metadata.recycle();
} else {
// ignore if checksum is not available
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Payload does not have checksum to verify", topic, producerName);
}
return true;
}
return false;
}

private void startPublishOperation() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1028,8 +1028,7 @@ public void testBrokerTopicStats() throws Exception {
assertTrue(msgInRate > 0);
}

// TODO: Re-enable once header+payload checksum changes are merged
@Test(enabled = false)
@Test
public void testPayloadCorruptionDetection() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/topic1";

Expand Down Expand Up @@ -1066,10 +1065,10 @@ public void testPayloadCorruptionDetection() throws Exception {
}

// We should only receive msg1
Message msg = consumer.receive(10, TimeUnit.SECONDS);
Message msg = consumer.receive(1, TimeUnit.SECONDS);
assertEquals(new String(msg.getData()), "message-1");

while ((msg = consumer.receive(5, TimeUnit.SECONDS)) != null) {
while ((msg = consumer.receive(1, TimeUnit.SECONDS)) != null) {
assertEquals(new String(msg.getData()), "message-1");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import com.yahoo.pulsar.broker.service.persistent.PersistentTopic;
import com.yahoo.pulsar.broker.service.utils.ClientChannelHelper;
import com.yahoo.pulsar.common.api.Commands;
import com.yahoo.pulsar.common.api.Commands.ChecksumType;
import com.yahoo.pulsar.common.api.proto.PulsarApi.AuthMethod;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandConnected;
Expand Down Expand Up @@ -519,7 +520,7 @@ public void testSendCommand() throws Exception {
.setProducerName("prod-name").setSequenceId(0).build();
ByteBuf data = Unpooled.buffer(1024);

clientCommand = Commands.newSend(1, 0, 1, messageMetadata, data);
clientCommand = Commands.newSend(1, 0, 1, ChecksumType.None, messageMetadata, data);
channel.writeInbound(Unpooled.copiedBuffer(clientCommand));
clientCommand.release();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -967,6 +967,7 @@ private void receiveAsync(Consumer consumer, int totalMessage, int currentMessag
* 2. Consumer has receive size (10) and receive message without acknowledging
* 3. Consumer will stop receiving message after unAckThreshold = 500
* 4. Consumer acks messages and starts consuming remanining messages
* This testcase enables checksum sending while producing message and broker verifies the checksum for the message.
*
* @throws Exception
*/
Expand Down Expand Up @@ -1496,5 +1497,42 @@ public void testBlockUnackConsumerAckByDifferentConsumer() throws Exception {
}
}

@Test
public void testEnabledChecksumClient() throws Exception {
log.info("-- Starting {} test --", methodName);

final int totalMsg = 10;
ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setSubscriptionType(SubscriptionType.Exclusive);
Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic1", "my-subscriber-name",
conf);

ProducerConfiguration producerConf = new ProducerConfiguration();
final int batchMessageDelayMs = 300;
if (batchMessageDelayMs != 0) {
producerConf.setBatchingEnabled(true);
producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS);
producerConf.setBatchingMaxMessages(5);
}

Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1", producerConf);
for (int i = 0; i < totalMsg; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}

Message msg = null;
Set<String> messageSet = Sets.newHashSet();
for (int i = 0; i < totalMsg; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
log.debug("Received message: [{}]", receivedMessage);
String expectedMessage = "my-message-" + i;
testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
}
// Acknowledge the consumption of all messages at once
consumer.acknowledgeCumulative(msg);
consumer.close();
log.info("-- Exiting {} test --", methodName);
}
}
Loading

0 comments on commit 51cd914

Please sign in to comment.