Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add crc32c-checksum verification on message header-payload #43

Merged
merged 1 commit into from
Oct 11, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.yahoo.pulsar.broker.service.persistent.PersistentTopic.DATE_FORMAT;
import static com.yahoo.pulsar.common.api.Commands.readChecksum;

import java.util.Date;
import java.util.Iterator;
Expand All @@ -36,6 +37,7 @@
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import com.yahoo.pulsar.common.api.proto.PulsarApi.MessageIdData;
import com.yahoo.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.policies.data.ConsumerStats;
import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashMap;
Expand Down Expand Up @@ -137,6 +139,11 @@ public ChannelPromise sendMessages(final List<Entry> entries) {

ByteBuf metadataAndPayload = entry.getDataBuffer();

// skip checksum by incrementing reader-index if consumer-client doesn't support checksum verification
if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v6.getNumber()) {
readChecksum(metadataAndPayload);
}

// stats
msgOut.recordEvent(metadataAndPayload.readableBytes());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static com.yahoo.pulsar.broker.service.persistent.PersistentTopic.DATE_FORMAT;
import static com.yahoo.pulsar.common.api.Commands.readChecksum;
import static com.yahoo.pulsar.common.api.Commands.hasChecksum;

import java.util.Date;
import java.util.concurrent.CompletableFuture;
Expand All @@ -29,11 +31,11 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.yahoo.pulsar.broker.service.Topic.PublishCallback;
import static com.yahoo.pulsar.checksum.utils.Crc32cChecksum.computeChecksum;
import com.yahoo.pulsar.common.api.Commands;
import com.yahoo.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import com.yahoo.pulsar.common.api.proto.PulsarApi.ServerError;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.policies.data.PublisherStats;
import com.yahoo.pulsar.common.util.XXHashChecksum;

import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
Expand Down Expand Up @@ -110,7 +112,7 @@ public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndP
if (!verifyChecksum(headersAndPayload)) {
cnx.ctx().channel().eventLoop().execute(() -> {
cnx.ctx().writeAndFlush(
Commands.newSendError(producerId, sequenceId, new Exception("Checksum failed on the broker")));
Commands.newSendError(producerId, sequenceId, ServerError.ChecksumError, "Checksum failed on the broker"));
cnx.completedSendOperation();
});
return;
Expand All @@ -122,42 +124,28 @@ public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndP
}

private boolean verifyChecksum(ByteBuf headersAndPayload) {
MessageMetadata metadata = null;
int readerIndex = headersAndPayload.readerIndex();
try {
metadata = Commands.parseMessageMetadata(headersAndPayload);

if (!metadata.hasChecksum()) {
// if we do not have the checksum, we do not verify checksum and return true
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Payload does not have checksum to verify", topic, producerName);
}
return true;
}

if (metadata.hasCompression()) {
// if the message is compressed, we do not verify checksum and return true
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Payload is compressed, not verifying checksum", topic, producerName);

if (hasChecksum(headersAndPayload)) {
int checksum = readChecksum(headersAndPayload).intValue();
int readerIndex = headersAndPayload.readerIndex();
try {
long computedChecksum = computeChecksum(headersAndPayload);
if (checksum == computedChecksum) {
return true;
} else {
log.error("[{}] [{}] Failed to verify checksum", topic, producerName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to include message id as well at this point

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, at this point we haven't persisted the message so, we don't have message-id so, we are not logging message-id.

return false;
}
return true;
}
long storedChecksum = metadata.getChecksum();
long computedChecksum = XXHashChecksum.computeChecksum(headersAndPayload);
if (storedChecksum == computedChecksum) {
return true;
} else {
log.error("[{}] [{}] Failed to verify checksum", topic, producerName);
} finally {
headersAndPayload.readerIndex(readerIndex);
}
} catch (Throwable t) {
log.error("[{}] [{}] Failed to verify checksum", topic, producerName, t);
} finally {
headersAndPayload.readerIndex(readerIndex);
if (metadata != null) {
metadata.recycle();
} else {
// ignore if checksum is not available
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Payload does not have checksum to verify", topic, producerName);
}
return true;
}
return false;
}

private void startPublishOperation() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1028,8 +1028,7 @@ public void testBrokerTopicStats() throws Exception {
assertTrue(msgInRate > 0);
}

// TODO: Re-enable once header+payload checksum changes are merged
@Test(enabled = false)
@Test
public void testPayloadCorruptionDetection() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/topic1";

Expand Down Expand Up @@ -1066,10 +1065,10 @@ public void testPayloadCorruptionDetection() throws Exception {
}

// We should only receive msg1
Message msg = consumer.receive(10, TimeUnit.SECONDS);
Message msg = consumer.receive(1, TimeUnit.SECONDS);
assertEquals(new String(msg.getData()), "message-1");

while ((msg = consumer.receive(5, TimeUnit.SECONDS)) != null) {
while ((msg = consumer.receive(1, TimeUnit.SECONDS)) != null) {
assertEquals(new String(msg.getData()), "message-1");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import com.yahoo.pulsar.broker.service.persistent.PersistentTopic;
import com.yahoo.pulsar.broker.service.utils.ClientChannelHelper;
import com.yahoo.pulsar.common.api.Commands;
import com.yahoo.pulsar.common.api.Commands.ChecksumType;
import com.yahoo.pulsar.common.api.proto.PulsarApi.AuthMethod;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandConnected;
Expand Down Expand Up @@ -519,7 +520,7 @@ public void testSendCommand() throws Exception {
.setProducerName("prod-name").setSequenceId(0).build();
ByteBuf data = Unpooled.buffer(1024);

clientCommand = Commands.newSend(1, 0, 1, messageMetadata, data);
clientCommand = Commands.newSend(1, 0, 1, ChecksumType.None, messageMetadata, data);
channel.writeInbound(Unpooled.copiedBuffer(clientCommand));
clientCommand.release();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -967,6 +967,7 @@ private void receiveAsync(Consumer consumer, int totalMessage, int currentMessag
* 2. Consumer has receive size (10) and receive message without acknowledging
* 3. Consumer will stop receiving message after unAckThreshold = 500
* 4. Consumer acks messages and starts consuming remanining messages
* This testcase enables checksum sending while producing message and broker verifies the checksum for the message.
*
* @throws Exception
*/
Expand Down Expand Up @@ -1496,5 +1497,42 @@ public void testBlockUnackConsumerAckByDifferentConsumer() throws Exception {
}
}

@Test
public void testEnabledChecksumClient() throws Exception {
log.info("-- Starting {} test --", methodName);

final int totalMsg = 10;
ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setSubscriptionType(SubscriptionType.Exclusive);
Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic1", "my-subscriber-name",
conf);

ProducerConfiguration producerConf = new ProducerConfiguration();
final int batchMessageDelayMs = 300;
if (batchMessageDelayMs != 0) {
producerConf.setBatchingEnabled(true);
producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS);
producerConf.setBatchingMaxMessages(5);
}

Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1", producerConf);
for (int i = 0; i < totalMsg; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}

Message msg = null;
Set<String> messageSet = Sets.newHashSet();
for (int i = 0; i < totalMsg; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
log.debug("Received message: [{}]", receivedMessage);
String expectedMessage = "my-message-" + i;
testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
}
// Acknowledge the consumption of all messages at once
consumer.acknowledgeCumulative(msg);
consumer.close();
log.info("-- Exiting {} test --", methodName);
}
}
Loading