diff --git a/ethereumj-core/src/main/java/org/ethereum/net/server/Channel.java b/ethereumj-core/src/main/java/org/ethereum/net/server/Channel.java index b600438053..bada918104 100644 --- a/ethereumj-core/src/main/java/org/ethereum/net/server/Channel.java +++ b/ethereumj-core/src/main/java/org/ethereum/net/server/Channel.java @@ -314,13 +314,16 @@ public boolean equals(Object o) { Channel channel = (Channel) o; + if (inetSocketAddress != null ? !inetSocketAddress.equals(channel.inetSocketAddress) : channel.inetSocketAddress != null) return false; return !(node != null ? !node.equals(channel.node) : channel.node != null); } @Override public int hashCode() { - return node != null ? node.hashCode() : 0; + int result = inetSocketAddress != null ? inetSocketAddress.hashCode() : 0; + result = 31 * result + (node != null ? node.hashCode() : 0); + return result; } @Override diff --git a/ethereumj-core/src/main/java/org/ethereum/net/server/ChannelManager.java b/ethereumj-core/src/main/java/org/ethereum/net/server/ChannelManager.java index 4d172f089a..97a0e5ee6a 100644 --- a/ethereumj-core/src/main/java/org/ethereum/net/server/ChannelManager.java +++ b/ethereumj-core/src/main/java/org/ethereum/net/server/ChannelManager.java @@ -1,10 +1,10 @@ package org.ethereum.net.server; import org.ethereum.core.Transaction; +import org.ethereum.db.ByteArrayWrapper; import org.ethereum.facade.Ethereum; import org.ethereum.manager.WorldManager; -import org.ethereum.net.message.ReasonCode; import org.ethereum.sync.SyncManager; import org.ethereum.net.rlpx.discover.NodeManager; import org.slf4j.Logger; @@ -33,7 +33,7 @@ public class ChannelManager { private static final Logger logger = LoggerFactory.getLogger("net"); private List newPeers = new CopyOnWriteArrayList<>(); - private List activePeers = new CopyOnWriteArrayList<>(); + private final Map activePeers = Collections.synchronizedMap(new HashMap()); private ScheduledExecutorService mainWorker = Executors.newSingleThreadScheduledExecutor(); @@ -64,11 +64,18 @@ private void processNewPeers() { for(Channel peer : newPeers) { if(peer.isProtocolsInitialized()) { - process(peer); + + if (!activePeers.containsKey(peer.getNodeIdWrapper())) { + process(peer); + } else { + peer.disconnect(DUPLICATE_PEER); + } + processed.add(peer); } } + newPeers.removeAll(processed); } @@ -79,37 +86,41 @@ private void process(Channel peer) { peer.prohibitTransactionProcessing(); } syncManager.addPeer(peer); - activePeers.add(peer); + activePeers.put(peer.getNodeIdWrapper(), peer); } } public void sendTransaction(Transaction tx) { - for (Channel channel : activePeers) { - channel.sendTransaction(tx); + + synchronized (activePeers) { + for (Channel channel : activePeers.values()) + channel.sendTransaction(tx); } } - public void add(Channel channel) { + public void add(Channel peer) { - if (activePeers.contains(channel) || newPeers.contains(channel)) { - channel.disconnect(DUPLICATE_PEER); + if (newPeers.contains(peer)) { + peer.disconnect(DUPLICATE_PEER); return; } - newPeers.add(channel); + newPeers.add(peer); } public void notifyDisconnect(Channel channel) { logger.debug("Peer {}: notifies about disconnect", channel.getPeerIdShort()); channel.onDisconnect(); syncManager.onDisconnect(channel); - activePeers.remove(channel); + activePeers.values().remove(channel); newPeers.remove(channel); } public void onSyncDone() { - for (Channel channel : activePeers) { - channel.onSyncDone(); + + synchronized (activePeers) { + for (Channel channel : activePeers.values()) + channel.onSyncDone(); } } } diff --git a/ethereumj-core/src/main/java/org/ethereum/sync/PeersPool.java b/ethereumj-core/src/main/java/org/ethereum/sync/PeersPool.java index 3e1e3a3ea5..8dd400981d 100644 --- a/ethereumj-core/src/main/java/org/ethereum/sync/PeersPool.java +++ b/ethereumj-core/src/main/java/org/ethereum/sync/PeersPool.java @@ -81,7 +81,7 @@ public void add(Channel peer) { public void remove(Channel peer) { synchronized (activePeers) { - activePeers.remove(peer.getNodeIdWrapper()); + activePeers.values().remove(peer); } } @@ -122,7 +122,7 @@ public void onDisconnect(Channel peer) { } synchronized (activePeers) { - activePeers.remove(peer.getNodeIdWrapper()); + activePeers.values().remove(peer); bannedPeers.remove(peer); }