Skip to content

Commit

Permalink
Discard messages from Agent when protocol version is incorrect instea…
Browse files Browse the repository at this point in the history
…d of closing the connection (criteo#20)
  • Loading branch information
Willymontaz committed Nov 16, 2018
1 parent b364218 commit 63f891f
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.criteo.hadoop.garmadon.forwarder.handler;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

//TODO the following class has to be removed when no more agent suffering
//https://github.com/criteo/garmadon/issues/17 run on the cluster
public class Discarder extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//release the bytebuffer just acts as reading and dropping the data
((ByteBuf) msg).release();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,20 @@
public class GreetingHandler extends SimpleChannelInboundHandler<ByteBuf> {
private static final Logger LOGGER = LoggerFactory.getLogger(GreetingHandler.class);

private byte[] lastAgentGreetings = null;

@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
PrometheusHttpMetrics.greetingsReceived.inc();

byte[] greetings = new byte[msg.readableBytes()];
msg.readBytes(greetings);

lastAgentGreetings = greetings;

ProtocolVersion.checkVersion(greetings);

ByteBuf pGreeting = Unpooled.buffer();
pGreeting.writeBytes(ProtocolVersion.GREETINGS);
ctx.writeAndFlush(pGreeting);
sendValidGreetings(ctx, ProtocolVersion.GREETINGS);

ctx.pipeline().addBefore(KafkaHandler.class.getSimpleName(), CloseHandler.class.getSimpleName(), new CloseHandler());
ctx.pipeline().remove(this);
Expand All @@ -33,6 +35,25 @@ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Excep
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
PrometheusHttpMetrics.greetingsInError.inc();
LOGGER.error("",cause);
ctx.close();

//TODO the following code has to be removed when no more agent suffering
//https://github.com/criteo/garmadon/issues/17 run on the cluster
if(cause instanceof ProtocolVersion.InvalidProtocolVersionException) {
//We make the agent think it can talk to us. For that we need to return its protocol version
sendValidGreetings(ctx, lastAgentGreetings);

//Don't close the connection but ignore sent data to the forwarder and handle close
ctx.pipeline().addFirst(CloseHandler.class.getSimpleName(), new CloseHandler());
ctx.pipeline().addFirst(Discarder.class.getSimpleName(), new Discarder());
} else {
//otherwise it could be another process so close the connection
ctx.close();
}
}

private void sendValidGreetings(ChannelHandlerContext ctx, byte[] greeting) {
ByteBuf pGreeting = Unpooled.buffer();
pGreeting.writeBytes(greeting);
ctx.writeAndFlush(pGreeting);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@
import java.util.Iterator;
import java.util.Map;

import static junit.framework.TestCase.assertFalse;
import static junit.framework.TestCase.assertNull;
import static junit.framework.TestCase.*;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.mock;

public class GreetingHandlerTest {

public static final byte[] BAD_VERSION_GREETING = {0, 0, 'V', 0};

@Rule
public WithEmbeddedChannel channel = new WithEmbeddedChannel();
private ChannelHandler mockedKafkaHandler;
Expand Down Expand Up @@ -70,4 +71,39 @@ public void GreetingHandler_should_install_CloseHandler_before_KafkaHandler_afte
assertThat(handlerIterator.next().getValue(), equalTo(mockedKafkaHandler));
}

//TODO the following tests have to be removed when no more agent suffering
//https://github.com/criteo/garmadon/issues/17 run on the cluster
@Test
public void GreetingHandler_should_not_close_cnx_on_bad_version() {
ByteBuf byteBuf = Unpooled.wrappedBuffer(BAD_VERSION_GREETING);
channel.get().writeInbound(byteBuf);

assertTrue(channel.get().isOpen());
}

@Test
public void GreetingHandler_should_send_back_a_greeting_with_agent_version_on_bad_version() {
ByteBuf byteBuf = Unpooled.wrappedBuffer(BAD_VERSION_GREETING);
channel.get().writeInbound(byteBuf);

ByteBuf output = channel.get().readOutbound();
byte[] greetings = new byte[4];
output.readBytes(greetings); //when reading output we must copy bytes because it is a direct bytebuf

assertEquals(0, greetings[0]);
assertEquals(0, greetings[1]);
assertEquals('V', greetings[2]);
assertEquals(0, greetings[3]);
}

@Test
public void GreetingHandler_should_install_Discarder_first_and_then_CloseHandler_after_receiving_bad_version(){
ByteBuf byteBuf = Unpooled.wrappedBuffer(BAD_VERSION_GREETING);
channel.get().writeInbound(byteBuf);

Iterator<Map.Entry<String, ChannelHandler>> handlerIterator = channel.get().pipeline().iterator();
assertThat(handlerIterator.next().getValue().getClass(), equalTo(Discarder.class));
assertThat(handlerIterator.next().getValue().getClass(), equalTo(CloseHandler.class));
}

}

0 comments on commit 63f891f

Please sign in to comment.