Skip to content

Commit

Permalink
Clean up the rlpx pipeline code.
Browse files Browse the repository at this point in the history
  • Loading branch information
Nashatyrev committed Oct 15, 2015
1 parent 17dcd5f commit d5a0bc1
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,30 @@
import java.util.List;

/**
* The Netty handler responsible for decrypting/encrypting RLPx frames
* with the FrameCodec crated during HandshakeHandler initial work
*
* Created by Anton Nashatyrev on 15.10.2015.
*/
public class MedianFrameCodec extends ByteToMessageCodec<FrameCodec.Frame> {
public class FrameCodecHandler extends ByteToMessageCodec<FrameCodec.Frame> {
private static final Logger loggerWire = LoggerFactory.getLogger("wire");
private static final Logger loggerNet = LoggerFactory.getLogger("net");

public FrameCodec frameCodec;
public Channel channel;

public MedianFrameCodec(FrameCodec frameCodec, Channel channel) {
public FrameCodecHandler(FrameCodec frameCodec, Channel channel) {
this.frameCodec = frameCodec;
this.channel = channel;
}

protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws IOException {
if (in.readableBytes() == 0) {
loggerWire.debug("in.readableBytes() == 0");
loggerWire.trace("in.readableBytes() == 0");
return;
}

loggerWire.debug("Decoding frame (" + in.readableBytes() + " bytes)");
loggerWire.trace("Decoding frame (" + in.readableBytes() + " bytes)");
List<FrameCodec.Frame> frames = frameCodec.readFrames(in);


Expand All @@ -41,13 +44,10 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
for (int i = 0; i < frames.size(); i++) {
FrameCodec.Frame frame = frames.get(i);

if (loggerWire.isDebugEnabled())
loggerWire.debug("Recv: Encoded: (" + (i + 1) + " of " + frames.size() + ") " +
frame.getType() + " [size: " + frame.getStream().available() + "]");
channel.getNodeStatistics().rlpxInMessages.add();
}

out.addAll(frames);
channel.getNodeStatistics().rlpxInMessages.add();
}

@Override
Expand All @@ -57,4 +57,19 @@ protected void encode(ChannelHandlerContext ctx, FrameCodec.Frame frame, ByteBuf

channel.getNodeStatistics().rlpxOutMessages.add();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (channel.isDiscoveryMode()) {
loggerNet.debug("FrameCodec failed: ", cause);
} else {
if (cause instanceof IOException) {
loggerNet.info("FrameCodec failed: " + ctx.channel().remoteAddress() + "(" + cause.getMessage() + ")");
loggerNet.debug("FrameCodec failed: " + ctx.channel().remoteAddress(), cause);
} else {
loggerNet.error("FrameCodec failed: ", cause);
}
}
ctx.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,116 +31,65 @@
import static org.ethereum.net.rlpx.FrameCodec.Frame;

/**
* The PacketDecoder parses every valid Ethereum packet to a Message object
* The Netty handler which manages initial negotiation with peer
* (when either we initiating connection or remote peer initiates)
*
* The initial handshake includes:
* - first AuthInitiate -> AuthResponse messages when peers exchange with secrets
* - second P2P Hello messages when P2P protocol and subprotocol capabilities are negotiated
*
* After the handshake is done this handler reports secrets and other data to the Channel
* which installs further handlers depending on the protocol parameters.
* This handler is finally removed from the pipeline.
*/
@Component
@Scope("prototype")
public class HandshakeHandler extends ByteToMessageDecoder /*ChannelInboundHandlerAdapter*/ {
public class HandshakeHandler extends ByteToMessageDecoder {

@Autowired
SystemProperties config;

private static final Logger loggerWire = LoggerFactory.getLogger("wire");
private static final Logger loggerNet = LoggerFactory.getLogger("net");

public FrameCodec frameCodec;
private FrameCodec frameCodec;
private ECKey myKey;
private byte[] nodeId;
private byte[] remoteId;
private EncryptionHandshake handshake;
private byte[] initiatePacket;
private Channel channel;
private boolean isHandshakeDone;
private final InitiateHandler initiator = new InitiateHandler();

public InitiateHandler getInitiator() {
return initiator;
}

public class InitiateHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
channel.setInetSocketAddress((InetSocketAddress) ctx.channel().remoteAddress());
if (remoteId.length == 64) {
channel.setNode(remoteId);
initiate(ctx);
} else {
handshake = new EncryptionHandshake();
nodeId = myKey.getNodeId();
}
}
}

public HandshakeHandler() {
}

// for testing purposes
HandshakeHandler(ECKey myKey, EncryptionHandshake.Secrets secrets) {
this.myKey = myKey;
nodeId = myKey.getNodeId();
frameCodec = new FrameCodec(secrets);
}

@PostConstruct
private void init() {
myKey = config.getMyKey();
}


@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (isHandshakeDone) {
loggerWire.debug("=== ctx.fireChannelRead: " + ((ByteBuf) msg).readableBytes());
ctx.fireChannelRead(msg);
public void channelActive(ChannelHandlerContext ctx) throws Exception {
channel.setInetSocketAddress((InetSocketAddress) ctx.channel().remoteAddress());
if (remoteId.length == 64) {
channel.setNode(remoteId);
initiate(ctx);
} else {
super.channelRead(ctx, msg);
if (isHandshakeDone) {
loggerWire.debug("=== ctx.fireChannelRead: " + ((ByteBuf) msg).readableBytes());
ctx.fireChannelRead(msg);
}
handshake = new EncryptionHandshake();
nodeId = myKey.getNodeId();
}
}

protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
loggerWire.debug("Received packet bytes: " + in.readableBytes());
if (!isHandshakeDone) {
loggerWire.debug("Decoding handshake... (" + in.readableBytes() + " bytes available)");
decodeHandshake(ctx, in);
loggerWire.debug("Decoded handshake (" + in.readableBytes() + " bytes available)");
in.retain();
}
// decodeMessage(ctx, in, out);
}

public void decodeMessage(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws IOException {
if (in.readableBytes() == 0) return;

List<Frame> frames = frameCodec.readFrames(in);


// Check if a full frame was available. If not, we'll try later when more bytes come in.
if (frames == null || frames.isEmpty()) return;

for (int i = 0; i < frames.size(); i++) {
Frame frame = frames.get(i);

if (loggerWire.isDebugEnabled())
loggerWire.debug("Recv: Encoded: (" + (i + 1) + " of " + frames.size() + ") " +
frame.getType() + " [size: " + frame.getStream().available() + "]");
loggerWire.debug("Decoding handshake... (" + in.readableBytes() + " bytes available)");
decodeHandshake(ctx, in);
if (isHandshakeDone) {
loggerWire.debug("Handshake done, removing HandshakeHandler from pipeline.");
ctx.pipeline().remove(this);
}

out.addAll(frames);
channel.getNodeStatistics().rlpxInMessages.add();
}

public void encode(ChannelHandlerContext ctx, Frame frame, ByteBuf out) throws Exception {

frameCodec.writeFrame(frame, out);

channel.getNodeStatistics().rlpxOutMessages.add();
}


public void initiate(ChannelHandlerContext ctx) throws Exception {

loggerNet.info("RLPX protocol activated");
Expand Down Expand Up @@ -183,17 +132,6 @@ private void decodeHandshake(final ChannelHandlerContext ctx, ByteBuf buffer) th
this.frameCodec = new FrameCodec(secrets);

loggerNet.info("auth exchange done");
// new Thread(new Runnable() {
// @Override
// public void run() {
// try {
// Thread.sleep(500);
// channel.sendHelloMessage(ctx, frameCodec, Hex.toHexString(nodeId));
// } catch (Exception e) {
// e.printStackTrace();
// }
// }
// }).start();
channel.sendHelloMessage(ctx, frameCodec, Hex.toHexString(nodeId), null);
} else {
loggerWire.info("MessageCodec: Buffer bytes: " + buffer.readableBytes());
Expand All @@ -207,7 +145,7 @@ private void decodeHandshake(final ChannelHandlerContext ctx, ByteBuf buffer) th
if (loggerNet.isInfoEnabled())
loggerNet.info("From: \t{} \tRecv: \t{}", ctx.channel().remoteAddress(), helloMessage);
isHandshakeDone = true;
this.channel.publicRLPxHandshakeFinished(ctx, helloMessage);
this.channel.publicRLPxHandshakeFinished(ctx, frameCodec, helloMessage);
} else {
DisconnectMessage message = new DisconnectMessage(payload);
if (loggerNet.isInfoEnabled())
Expand Down Expand Up @@ -282,7 +220,7 @@ private void decodeHandshake(final ChannelHandlerContext ctx, ByteBuf buffer) th
// Secret authentication finish here
channel.sendHelloMessage(ctx, frameCodec, Hex.toHexString(nodeId), inboundHelloMessage);
isHandshakeDone = true;
this.channel.publicRLPxHandshakeFinished(ctx, inboundHelloMessage);
this.channel.publicRLPxHandshakeFinished(ctx, frameCodec, inboundHelloMessage);
}
}
channel.getNodeStatistics().rlpxInHello.add();
Expand All @@ -308,13 +246,13 @@ public byte[] getRemoteId() {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (channel.isDiscoveryMode()) {
loggerNet.debug("MultiFrameCodec handling failed", cause);
loggerNet.debug("Handshake failed: ", cause);
} else {
if (cause instanceof IOException) {
loggerNet.info("Connection with peer terminated: " + ctx.channel().remoteAddress() + "(" + cause.getMessage() + ")");
loggerNet.debug("Connection with peer terminated: " + ctx.channel().remoteAddress(), cause);
loggerNet.info("Handshake failed: " + ctx.channel().remoteAddress() + "(" + cause.getMessage() + ")");
loggerNet.debug("Handshake failed: " + ctx.channel().remoteAddress(), cause);
} else {
loggerNet.error("MultiFrameCodec handling failed", cause);
loggerNet.error("Handshake failed: ", cause);
}
}
ctx.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import static org.ethereum.net.rlpx.FrameCodec.Frame;

/**
* The PacketDecoder parses every valid Ethereum packet to a Message object
* The Netty codec which encodes/decodes RPLx frames to subprotocol Messages
*/
@Component
@Scope("prototype")
Expand Down Expand Up @@ -243,7 +243,7 @@ private Message createMessage(byte code, byte[] payload) {
throw new IllegalArgumentException("No such message: " + code + " [" + Hex.toHexString(payload) + "]");
}

public void setRemoteId(String remoteId, Channel channel){
public void setChannel(Channel channel){
this.channel = channel;
}

Expand Down
24 changes: 8 additions & 16 deletions ethereumj-core/src/main/java/org/ethereum/net/server/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,9 @@ public class Channel {

public void init(ChannelPipeline pipeline, String remoteId, boolean discoveryMode) {

// medianFrameCodec = new MedianFrameCodec(null, this);
// medianFrameCodec.multiFrameCodec = multiFrameCodec;
pipeline.addLast("readTimeoutHandler",
new ReadTimeoutHandler(config.peerChannelReadTimeout(), TimeUnit.SECONDS));
pipeline.addLast("initiator", handshakeHandler.getInitiator());
pipeline.addLast("multiFrameCodec", handshakeHandler);
// pipeline.addLast("medianFrameCodec", medianFrameCodec);
// pipeline.addLast("messageCodec", messageCodec);
// pipeline.addLast("messageCodec", messageCodec);
pipeline.addLast("handshakeHandler", handshakeHandler);

this.discoveryMode = discoveryMode;

Expand All @@ -113,9 +107,10 @@ public void init(ChannelPipeline pipeline, String remoteId, boolean discoveryMod
handshakeHandler.generateTempKey();
}

messageCodec.setRemoteId(remoteId, this);
handshakeHandler.setRemoteId(remoteId, this);

messageCodec.setChannel(this);

p2pHandler.setMsgQueue(msgQueue);
messageCodec.setP2pMessageFactory(new P2pMessageFactory());

Expand All @@ -126,22 +121,19 @@ public void init(ChannelPipeline pipeline, String remoteId, boolean discoveryMod
messageCodec.setBzzMessageFactory(new BzzMessageFactory());
}

public void publicRLPxHandshakeFinished(ChannelHandlerContext ctx, HelloMessage helloRemote) throws IOException, InterruptedException {
public void publicRLPxHandshakeFinished(ChannelHandlerContext ctx, FrameCodec frameCodec,
HelloMessage helloRemote) throws IOException, InterruptedException {

logger.debug("publicRLPxHandshakeFinished with " + ctx.channel().remoteAddress());
if (P2pHandler.isProtocolVersionSupported(helloRemote.getP2PVersion())) {

if (helloRemote.getP2PVersion() < 5) {
messageCodec.setSupportChunkedFrames(false);
}

MedianFrameCodec medianFrameCodec = new MedianFrameCodec(handshakeHandler.frameCodec, this);
logger.debug("=== publicRLPxHandshakeFinished");
ctx.pipeline().addLast("medianFrameCodec", medianFrameCodec);
FrameCodecHandler frameCodecHandler = new FrameCodecHandler(frameCodec, this);
ctx.pipeline().addLast("medianFrameCodec", frameCodecHandler);
ctx.pipeline().addLast("messageCodec", messageCodec);
// logger.debug("=== remove(\"multiFrameCodec\")");
// ctx.pipeline().remove("multiFrameCodec");
// logger.debug("=== removed");
// medianFrameCodec.frameCodec = multiFrameCodec.frameCodec;
ctx.pipeline().addLast(Capability.P2P, p2pHandler);

p2pHandler.setChannel(this);
Expand Down
47 changes: 47 additions & 0 deletions ethereumj-core/src/test/java/org/ethereum/net/NettyTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package org.ethereum.net;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import org.junit.Ignore;
import org.junit.Test;

import java.util.List;

/**
* Created by Anton Nashatyrev on 16.10.2015.
*/
@Ignore
public class NettyTest {
@Test
public void pipelineTest() {

ByteToMessageDecoder decoder1 = new ByteToMessageDecoder() {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int i = in.readInt();
System.out.println("decoder1 read int (4 bytes). Needs no more: " + Integer.toHexString(i));
ctx.pipeline().remove(this);
}
};

ByteToMessageDecoder decoder2 = new ByteToMessageDecoder() {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int i = in.readInt();
System.out.println("decoder2 read int (4 bytes): " + Integer.toHexString(i));
}
};

EmbeddedChannel channel = new EmbeddedChannel(decoder1, decoder2);
ByteBuf buffer = Unpooled.buffer();
buffer.writeInt(0x12345678);
buffer.writeInt(0xabcdefff);
channel.writeInbound(buffer);

}

}

0 comments on commit d5a0bc1

Please sign in to comment.