Permalink
Browse files

If CRC doesn't match, return an InvalidMessage

  • Loading branch information...
dgomezferro committed Oct 17, 2012
1 parent 020addf commit a9e6aae1efeca1af94910d36624eec92e36c162d
@@ -63,6 +63,7 @@
import com.yahoo.pasc.paxos.messages.ControlMessage;
import com.yahoo.pasc.paxos.messages.Hello;
import com.yahoo.pasc.paxos.messages.InlineRequest;
+import com.yahoo.pasc.paxos.messages.InvalidMessage;
import com.yahoo.pasc.paxos.messages.Request;
import com.yahoo.pasc.paxos.messages.ServerHello;
import com.yahoo.pasc.paxos.messages.serialization.ManualDecoder;
@@ -287,6 +288,9 @@ public synchronized void channelConnected(ChannelHandlerContext ctx, ChannelStat
@Override
public synchronized void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
Object message = e.getMessage();
+ if (message instanceof InvalidMessage) {
+ return;
+ }
if (message instanceof ServerHello) {
ServerHello hello = (ServerHello) e.getMessage();
serverChannels[hello.getServerId()] = e.getChannel();
@@ -0,0 +1,53 @@
+/**
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package com.yahoo.pasc.paxos.messages;
+
+import java.io.Serializable;
+
+import com.yahoo.pasc.CloneableDeep;
+import com.yahoo.pasc.EqualsDeep;
+import com.yahoo.pasc.Message;
+
+public class InvalidMessage extends PaxosMessage implements Serializable, CloneableDeep<InvalidMessage>, EqualsDeep<InvalidMessage> {
+
+ private static final long serialVersionUID = -3781061394615967506L;
+
+ public InvalidMessage() {
+ }
+
+ @Override
+ public String toString() {
+ return String.format("{Invalid message}",super.toString());
+ }
+
+ public InvalidMessage cloneDeep() {
+ return new InvalidMessage();
+ }
+
+ public boolean equalsDeep(InvalidMessage other) {
+ return true;
+ }
+
+ @Override
+ public void storeReplica(Message m) {
+ }
+
+ @Override
+ protected boolean verify() {
+ return true;
+ }
+}
@@ -26,7 +26,6 @@
import org.slf4j.LoggerFactory;
import com.yahoo.pasc.PascRuntime;
-import com.yahoo.pasc.exceptions.InputMessageException;
import com.yahoo.pasc.paxos.messages.Accept;
import com.yahoo.pasc.paxos.messages.Accepted;
import com.yahoo.pasc.paxos.messages.AsyncMessage;
@@ -35,6 +34,7 @@
import com.yahoo.pasc.paxos.messages.Digest;
import com.yahoo.pasc.paxos.messages.Hello;
import com.yahoo.pasc.paxos.messages.InlineRequest;
+import com.yahoo.pasc.paxos.messages.InvalidMessage;
import com.yahoo.pasc.paxos.messages.MessageType;
import com.yahoo.pasc.paxos.messages.PaxosMessage;
import com.yahoo.pasc.paxos.messages.Prepare;
@@ -71,7 +71,7 @@ protected Object decode2(ChannelHandlerContext ctx, Channel channel, ChannelBuff
int length = buf.readInt();
length -= 4; // length has already been read
-
+
if (buf.readableBytes() < length) {
buf.resetReaderIndex();
return null;
@@ -87,7 +87,6 @@ protected Object decode2(ChannelHandlerContext ctx, Channel channel, ChannelBuff
byte[] bytearray = new byte[length];
buf.markReaderIndex();
buf.readBytes(bytearray, 0, length);
- buf.resetReaderIndex();
Checksum crc32 = CRC32Pool.getCRC32();
crc32.reset();
@@ -103,10 +102,13 @@ protected Object decode2(ChannelHandlerContext ctx, Channel channel, ChannelBuff
if (result != crc) {
byte b = buf.readByte();
MessageType type = MessageType.values()[b];
- LOG.error("Invalid CRC for {}. Expected {} Actual {}", new Object[] {type, crc, result});
- throw new InputMessageException("Invalid CRC", null, null);
+ LOG.error("Invalid CRC for {}. Expected {} Actual {}", new Object[] { type, crc, result });
+ return new InvalidMessage();
}
+ // If CRC matches reset reader index
+ buf.resetReaderIndex();
+
CRC32Pool.pushCRC32(crc32);
byte b = buf.readByte();
@@ -248,7 +250,7 @@ protected Object decode2(ChannelHandlerContext ctx, Channel channel, ChannelBuff
int serverId = buf.readInt();
long ts = buf.readLong();
len = buf.readInt();
- byte [] message = new byte [len];
+ byte[] message = new byte[len];
buf.readBytes(message);
AsyncMessage am = new AsyncMessage(clientId, serverId, ts, message);
am.setCRC(crc);
@@ -257,15 +259,17 @@ protected Object decode2(ChannelHandlerContext ctx, Channel channel, ChannelBuff
case CONTROL: {
int clientId = buf.readInt();
len = buf.readInt();
- byte [] message = new byte [len];
+ byte[] message = new byte[len];
buf.readBytes(message);
ControlMessage cm = new ControlMessage(clientId, message);
cm.setCRC(crc);
return cm;
}
+ default: {
+ LOG.error("Unknown message type");
+ return new InvalidMessage();
+ }
}
- buf.resetReaderIndex();
- throw new IllegalArgumentException("Unknown message type " + b + " " + type);
}
}
@@ -86,6 +86,9 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex
if (LOG.isTraceEnabled()) {
LOG.trace("[{}] Message received {}", serverConnection.getId(), message);
}
+ if (message instanceof InvalidMessage) {
+ return;
+ }
if (message instanceof ControlMessage) {
if (controlHandler != null) {
controlHandler.handleControlMessage((ControlMessage) message);

0 comments on commit a9e6aae

Please sign in to comment.