Permalink
Browse files

Check CRC only if protection is turned on

  • Loading branch information...
1 parent 7dcefe5 commit 88c4e636e856c9bc1ae4b30422eea4781bb743c0 @dgomezferro committed Oct 18, 2012
@@ -94,7 +94,7 @@
*
* @throws IOException
*/
- public PaxosClientHandler(PascRuntime<ClientState> runtime, ClientInterface clientInterface, String[] servers,
+ public PaxosClientHandler(final PascRuntime<ClientState> runtime, ClientInterface clientInterface, String[] servers,
int clientId, int clients, int timeout, String zkConnection, final ExecutionHandler executor)
throws IOException {
this.clientId = clientId;
@@ -106,7 +106,8 @@ public PaxosClientHandler(PascRuntime<ClientState> runtime, ClientInterface clie
this.clientInterface.setInterface(this);
this.channelPipelineFactory = new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
- return Channels.pipeline(new ManualEncoder(), new ManualDecoder(), executor, PaxosClientHandler.this);
+ return Channels.pipeline(new ManualEncoder(), new ManualDecoder(runtime.isProtected()), executor,
+ PaxosClientHandler.this);
}
};
this.servers = servers;
@@ -50,6 +50,13 @@
private static final Logger LOG = LoggerFactory.getLogger(ManualDecoder.class);
+ private boolean protection = false;
+
+ public ManualDecoder(boolean protection) {
+ super();
+ this.protection = protection;
+ }
+
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf) throws Exception {
Object result = decode2(ctx, channel, buf);
if (result != null) {
@@ -84,36 +91,10 @@ protected Object decode2(ChannelHandlerContext ctx, Channel channel, ChannelBuff
LOG.warn("Length is 0.");
}
- byte[] bytearray = new byte[length];
- buf.markReaderIndex();
- buf.readBytes(bytearray, 0, length);
- Checksum crc32 = CRC32Pool.getCRC32();
- crc32.reset();
-
- crc32.update(bytearray, 0, bytearray.length);
-
- long result = crc32.getValue();
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("Decoding message with bytes {} computed CRC {} received CRC {}", new Object[] { bytearray,
- result, crc });
- }
-
- if (result != crc) {
- MessageType type = null;
- if (length > 0) {
- byte b = bytearray[0];
- type = MessageType.values()[b];
- }
- LOG.error("Invalid CRC for {}. Expected {} Actual {}", new Object[] { type, crc, result });
+ if (protection && !checkCRC(buf, length, crc)) {
return new InvalidMessage();
}
- // If CRC matches reset reader index
- buf.resetReaderIndex();
-
- CRC32Pool.pushCRC32(crc32);
-
byte b = buf.readByte();
int len;
MessageType type = MessageType.values()[b];
@@ -275,4 +256,37 @@ protected Object decode2(ChannelHandlerContext ctx, Channel channel, ChannelBuff
}
}
+ private boolean checkCRC(ChannelBuffer buf, int length, long crc) {
+ byte[] bytearray = new byte[length];
+ buf.markReaderIndex();
+ buf.readBytes(bytearray, 0, length);
+ Checksum crc32 = CRC32Pool.getCRC32();
+ crc32.reset();
+
+ crc32.update(bytearray, 0, bytearray.length);
+
+ long result = crc32.getValue();
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Decoding message with bytes {} computed CRC {} received CRC {}", new Object[] { bytearray,
+ result, crc });
+ }
+
+ if (result != crc) {
+ MessageType type = null;
+ if (length > 0) {
+ byte b = bytearray[0];
+ type = MessageType.values()[b];
+ }
+ LOG.error("Invalid CRC for {}. Expected {} Actual {}", new Object[] { type, crc, result });
+ return false;
+ }
+
+ // If CRC matches reset reader index
+ buf.resetReaderIndex();
+
+ CRC32Pool.pushCRC32(crc32);
+ return true;
+ }
+
}
@@ -30,6 +30,7 @@
private ExecutionHandler executionHandler;
private boolean twoStages;
+ private boolean protection;
/**
* Constructor
@@ -42,16 +43,17 @@
* @param shared
* The shared state among handlers
*/
- public PipelineFactory(ChannelHandler handler, ExecutionHandler executionHandler, boolean twoStages) {
+ public PipelineFactory(ChannelHandler handler, ExecutionHandler executionHandler, boolean twoStages, boolean protection) {
super();
this.executionHandler = executionHandler;
this.channelHandler = handler;
this.twoStages = twoStages;
+ this.protection = protection;
}
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
- pipeline.addLast("decoder", new ManualDecoder());
+ pipeline.addLast("decoder", new ManualDecoder(protection));
if (twoStages) {
pipeline.addLast("executor", executionHandler);
}
@@ -116,7 +116,7 @@ public Thread newThread(Runnable r) {
}
}));
this.channelHandler = new ServerHandler(runtime, sm, controlHandler, this);
- this.channelPipelineFactory = new PipelineFactory(channelHandler, executionHandler, twoStages);
+ this.channelPipelineFactory = new PipelineFactory(channelHandler, executionHandler, twoStages, runtime.isProtected());
this.leaderElection = new LeaderElection(zk, id, this.channelHandler);
this.barrier = new Barrier(new ZooKeeper(zk, 2000, leaderElection), "/paxos_srv_barrier", "" + id,
servers.length);
@@ -54,7 +54,7 @@
public UdpServer(PascRuntime<PaxosState> runtime, String servers[], String clients[], int port, int threads, int id) {
this.channelFactory = new NioDatagramChannelFactory(Executors.newCachedThreadPool());
- this.channelPipelineFactory = new PipelineFactory(null, null, true);
+ this.channelPipelineFactory = new PipelineFactory(null, null, true, true);
this.servers = servers;
this.clients = clients;
this.port = port;

0 comments on commit 88c4e63

Please sign in to comment.