Skip to content

Commit

Permalink
Discovery work
Browse files Browse the repository at this point in the history
  • Loading branch information
Nashatyrev committed Jul 20, 2015
1 parent 4bc7f7b commit 65fd770
Show file tree
Hide file tree
Showing 22 changed files with 693 additions and 314 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ private SystemProperties() {
}
}

public Config getConfig() {
return config;
}

/**
* Puts a new config atop of existing stack making the options
* in the supplied config overriding existing options
Expand Down Expand Up @@ -134,7 +138,7 @@ public boolean peerDiscovery() {

@ValidateMe
public String privateKey() {
return config.getString("privateKey");
return config.getString("peer.discovery.privateKey");
}

@ValidateMe
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
import org.ethereum.listener.EthereumListener;
import org.ethereum.net.client.PeerClient;
import org.ethereum.net.peerdiscovery.PeerDiscovery;
import org.ethereum.net.rlpx.discover.NodeManager;
import org.ethereum.net.rlpx.discover.PeerConnectionManager;
import org.ethereum.net.rlpx.discover.UDPListener;
import org.ethereum.net.server.ChannelManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -66,6 +69,15 @@ public class WorldManager {
@Autowired
private EthereumListener listener;

@Autowired
private NodeManager nodeManager;

@Autowired
private PeerConnectionManager peerConnectionManager;

@Autowired
private UDPListener udpListener;

@PostConstruct
public void init() {
byte[] cowAddr = HashUtil.sha3("cow".getBytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public void connect(String host, int port, String remoteId) {
logger.debug("Connection is closed");

} catch (Exception e) {
logger.debug("Exception: {} ({})", e.getMessage(), e.getClass().getName());
e.printStackTrace();
logger.error("Exception: {} ({})", e.getMessage(), e.getClass().getName(), e);
} finally {
workerGroup.shutdownGracefully();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.ethereum.net.MessageQueue;
import org.ethereum.net.message.ReasonCode;
import org.ethereum.net.p2p.DisconnectMessage;
import org.ethereum.net.server.Channel;
import org.ethereum.util.ByteUtil;
import org.ethereum.util.FastByteComparisons;

Expand Down Expand Up @@ -187,7 +188,6 @@ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
* @param ctx the ChannelHandlerContext
*/
public void processStatus(StatusMessage msg, ChannelHandlerContext ctx) throws InterruptedException {

this.handshakeStatusMessage = msg;
if (peerDiscoveryMode) {
msgQueue.sendMessage(new DisconnectMessage(ReasonCode.REQUESTED));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.ethereum.net.p2p;

import io.netty.buffer.ByteBuf;
import org.ethereum.core.Block;
import org.ethereum.core.Transaction;
import org.ethereum.manager.WorldManager;
Expand All @@ -13,7 +12,6 @@
import org.ethereum.net.message.ReasonCode;
import org.ethereum.net.message.StaticMessages;
import org.ethereum.net.peerdiscovery.PeerInfo;
import org.ethereum.net.rlpx.FrameCodec;
import org.ethereum.net.server.Channel;
import org.ethereum.net.shh.ShhHandler;
import org.ethereum.net.shh.ShhMessageCodes;
Expand All @@ -26,12 +24,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.spongycastle.util.encoders.Hex;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;

Expand Down Expand Up @@ -124,6 +120,7 @@ public void channelRead0(final ChannelHandlerContext ctx, P2pMessage msg) throws
break;
case DISCONNECT:
msgQueue.receivedMessage(msg);
channel.getNodeStatistics().nodeDisconnectedRemote((DisconnectMessage) msg);
break;
case PING:
msgQueue.receivedMessage(msg);
Expand Down Expand Up @@ -162,7 +159,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error(cause.getCause().toString());
logger.error("" + cause.getCause(), cause);
super.exceptionCaught(ctx, cause);
ctx.close();
killTimers();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class AuthInitiateMessage {
public AuthInitiateMessage() {
}

static int getLength() {
public static int getLength() {
return 65+32+64+32+1;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.ethereum.net.rlpx.discover;

/**
* Created by Anton Nashatyrev on 17.07.2015.
*/
public class DiscoverNodeEvent {
NodeHandler node;

public DiscoverNodeEvent(NodeHandler node) {
this.node = node;
}

public NodeHandler getNode() {
return node;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,13 @@
public class DiscoverTask implements Runnable {
private static final Logger logger = LoggerFactory.getLogger("discover");

Channel channel;

NodeTable table;

ECKey key;
NodeManager nodeManager;

byte[] nodeId;

DiscoverTask(byte[] nodeId, Channel channel, ECKey key, NodeTable table) {
this.nodeId = nodeId;
this.channel = channel;
this.key = key;
this.table = table;
public DiscoverTask(NodeManager nodeManager) {
this.nodeManager = nodeManager;
nodeId = nodeManager.homeNode.getId();
}

@Override
Expand All @@ -50,22 +44,17 @@ public synchronized void discover(byte[] nodeId, int round, List<Node> prevTried

if (round == KademliaOptions.MAX_STEPS) {
logger.info("{}", String.format("(KademliaOptions.MAX_STEPS) Terminating discover after %d rounds.", round));
logger.info("{}\n{}", String.format("Nodes discovered %d ", table.getNodesCount()), dumpNodes());
logger.info("{}\n{}", String.format("Nodes discovered %d ", nodeManager.getTable().getNodesCount()), dumpNodes());
return;
}

List<Node> closest = table.getClosestNodes(nodeId);
List<Node> closest = nodeManager.getTable().getClosestNodes(nodeId);
List<Node> tried = new ArrayList<>();

for (Node n : closest) {
if (!tried.contains(n) && !prevTried.contains(n)) {
try {
Message findNode = FindNodeMessage.create(nodeId, key);
DatagramPacket packet = new DatagramPacket(
Unpooled.copiedBuffer(findNode.getPacket()),
new InetSocketAddress(n.getHost(), n.getPort()));
logger.info("<=== [FIND_NODE] (to " + n.getHost() + ":" + n.getPort() + ") ");
channel.write(packet);
nodeManager.getNodeHandler(n).sendFindNode(nodeId);
tried.add(n);
Thread.sleep(50);
} catch (Exception ex) {
Expand All @@ -77,11 +66,11 @@ public synchronized void discover(byte[] nodeId, int round, List<Node> prevTried
}
}

channel.flush();
// channel.flush();

if (tried.isEmpty()) {
logger.info("{}", String.format("(tried.isEmpty()) Terminating discover after %d rounds.", round));
logger.info("{}\n{}", String.format("Nodes discovered %d ", table.getNodesCount()), dumpNodes());
logger.info("{}\n{}", String.format("Nodes discovered %d ", nodeManager.getTable().getNodesCount()), dumpNodes());
return;
}

Expand All @@ -95,7 +84,7 @@ public synchronized void discover(byte[] nodeId, int round, List<Node> prevTried

private String dumpNodes() {
String ret = "";
for (NodeEntry entry : table.getAllNodes()) {
for (NodeEntry entry : nodeManager.getTable().getAllNodes()) {
ret += " " + entry.getNode() + "\n";
}
return ret;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,27 @@
import java.util.concurrent.TimeUnit;

public class DiscoveryExecutor {
Channel channel;
NodeTable table;
ECKey key;
// Channel channel;
// NodeTable table;
// ECKey key;

ScheduledExecutorService discoverer = Executors.newSingleThreadScheduledExecutor();
ScheduledExecutorService refresher = Executors.newSingleThreadScheduledExecutor();

DiscoveryExecutor(Channel channel, NodeTable table, ECKey key) {
this.channel = channel;
this.table = table;
this.key = key;
NodeManager nodeManager;

public DiscoveryExecutor(NodeManager nodeManager) {
this.nodeManager = nodeManager;
}

public void discover() {

discoverer.scheduleWithFixedDelay(
new DiscoverTask(table.getNode().getId(), channel, key, table),
new DiscoverTask(nodeManager),
0, KademliaOptions.DISCOVER_CYCLE, TimeUnit.SECONDS);

refresher.scheduleWithFixedDelay(
new RefreshTask(channel, key, table),
new RefreshTask(nodeManager),
0, KademliaOptions.BUCKET_REFRESH, TimeUnit.MILLISECONDS);

}
Expand Down
Loading

0 comments on commit 65fd770

Please sign in to comment.