Skip to content

Commit

Permalink
Improve logic which processes duplicated peers
Browse files Browse the repository at this point in the history
  • Loading branch information
mkalinin committed Oct 1, 2015
1 parent f1c759c commit e56c8f6
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 16 deletions.
Expand Up @@ -314,13 +314,16 @@ public boolean equals(Object o) {

Channel channel = (Channel) o;

if (inetSocketAddress != null ? !inetSocketAddress.equals(channel.inetSocketAddress) : channel.inetSocketAddress != null) return false;
return !(node != null ? !node.equals(channel.node) : channel.node != null);

}

@Override
public int hashCode() {
return node != null ? node.hashCode() : 0;
int result = inetSocketAddress != null ? inetSocketAddress.hashCode() : 0;
result = 31 * result + (node != null ? node.hashCode() : 0);
return result;
}

@Override
Expand Down
@@ -1,10 +1,10 @@
package org.ethereum.net.server;

import org.ethereum.core.Transaction;
import org.ethereum.db.ByteArrayWrapper;
import org.ethereum.facade.Ethereum;
import org.ethereum.manager.WorldManager;

import org.ethereum.net.message.ReasonCode;
import org.ethereum.sync.SyncManager;
import org.ethereum.net.rlpx.discover.NodeManager;
import org.slf4j.Logger;
Expand Down Expand Up @@ -33,7 +33,7 @@ public class ChannelManager {
private static final Logger logger = LoggerFactory.getLogger("net");

private List<Channel> newPeers = new CopyOnWriteArrayList<>();
private List<Channel> activePeers = new CopyOnWriteArrayList<>();
private final Map<ByteArrayWrapper, Channel> activePeers = Collections.synchronizedMap(new HashMap<ByteArrayWrapper, Channel>());

private ScheduledExecutorService mainWorker = Executors.newSingleThreadScheduledExecutor();

Expand Down Expand Up @@ -64,11 +64,18 @@ private void processNewPeers() {
for(Channel peer : newPeers) {

if(peer.isProtocolsInitialized()) {
process(peer);

if (!activePeers.containsKey(peer.getNodeIdWrapper())) {
process(peer);
} else {
peer.disconnect(DUPLICATE_PEER);
}

processed.add(peer);
}

}

newPeers.removeAll(processed);
}

Expand All @@ -79,37 +86,41 @@ private void process(Channel peer) {
peer.prohibitTransactionProcessing();
}
syncManager.addPeer(peer);
activePeers.add(peer);
activePeers.put(peer.getNodeIdWrapper(), peer);
}
}

public void sendTransaction(Transaction tx) {
for (Channel channel : activePeers) {
channel.sendTransaction(tx);

synchronized (activePeers) {
for (Channel channel : activePeers.values())
channel.sendTransaction(tx);
}
}

public void add(Channel channel) {
public void add(Channel peer) {

if (activePeers.contains(channel) || newPeers.contains(channel)) {
channel.disconnect(DUPLICATE_PEER);
if (newPeers.contains(peer)) {
peer.disconnect(DUPLICATE_PEER);
return;
}

newPeers.add(channel);
newPeers.add(peer);
}

public void notifyDisconnect(Channel channel) {
logger.debug("Peer {}: notifies about disconnect", channel.getPeerIdShort());
channel.onDisconnect();
syncManager.onDisconnect(channel);
activePeers.remove(channel);
activePeers.values().remove(channel);
newPeers.remove(channel);
}

public void onSyncDone() {
for (Channel channel : activePeers) {
channel.onSyncDone();

synchronized (activePeers) {
for (Channel channel : activePeers.values())
channel.onSyncDone();
}
}
}
4 changes: 2 additions & 2 deletions ethereumj-core/src/main/java/org/ethereum/sync/PeersPool.java
Expand Up @@ -81,7 +81,7 @@ public void add(Channel peer) {

public void remove(Channel peer) {
synchronized (activePeers) {
activePeers.remove(peer.getNodeIdWrapper());
activePeers.values().remove(peer);
}
}

Expand Down Expand Up @@ -122,7 +122,7 @@ public void onDisconnect(Channel peer) {
}

synchronized (activePeers) {
activePeers.remove(peer.getNodeIdWrapper());
activePeers.values().remove(peer);
bannedPeers.remove(peer);
}

Expand Down

0 comments on commit e56c8f6

Please sign in to comment.