Skip to content

Commit

Permalink
12984 use peers for Network component
Browse files Browse the repository at this point in the history
Signed-off-by: Kore Aguda <kore@swirldslabs.com>
  • Loading branch information
kfa-aguda committed Apr 24, 2024
1 parent 683dd5a commit f23c2ac
Show file tree
Hide file tree
Showing 17 changed files with 100 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,10 @@ public SwirldsPlatform(@NonNull final PlatformComponentBuilder builder) {
}
};

if (!currentAddressBook.contains(selfId)) {
throw new IllegalArgumentException("Address book does not contain selfId");
}

gossip = new SyncGossip(
platformContext,
blocks.randomBuilder().buildNonCryptographicRandom(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -370,7 +372,7 @@ public static boolean hasAnyCauseSuppliedType(
* our ID
* @return a list of PeerInfo
*/
public static @NonNull List<PeerInfo> createPeerInfoList(
public static @NonNull Set<PeerInfo> createPeerInfoList(
@NonNull final AddressBook addressBook, @NonNull final NodeId selfId) {
Objects.requireNonNull(addressBook);
Objects.requireNonNull(selfId);
Expand All @@ -382,6 +384,6 @@ public static boolean hasAnyCauseSuppliedType(
address.getSelfName(),
Objects.requireNonNull(address.getHostnameExternal()),
Objects.requireNonNull(address.getSigCert())))
.toList();
.collect(Collectors.toUnmodifiableSet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -590,7 +591,7 @@ public static Map<NodeId, KeysAndCerts> initNodeSecurity(
* @return a trust store containing the public keys of all the members
* @throws KeyStoreException if there is no provider that supports {@link CryptoConstants#KEYSTORE_TYPE}
*/
public static @NonNull KeyStore createPublicKeyStore(@NonNull final List<PeerInfo> peers) throws KeyStoreException {
public static @NonNull KeyStore createPublicKeyStore(@NonNull final Set<PeerInfo> peers) throws KeyStoreException {
Objects.requireNonNull(peers);
final KeyStore store = CryptoStatic.createEmptyTrustStore();
for (final PeerInfo peer : peers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -189,8 +190,9 @@ protected SyncGossip(

final BasicConfig basicConfig = platformContext.getConfiguration().getConfigData(BasicConfig.class);

topology = new StaticTopology(random, addressBook, selfId, basicConfig.numConnections());
final List<PeerInfo> peers = Utilities.createPeerInfoList(addressBook, selfId);
final Set<PeerInfo> peers = Utilities.createPeerInfoList(addressBook, selfId);

topology = new StaticTopology(random, peers, selfId, basicConfig.numConnections());
final NetworkPeerIdentifier peerIdentifier = new NetworkPeerIdentifier(platformContext, peers);
final SocketFactory socketFactory =
NetworkUtils.createSocketFactory(selfId, peers, keysAndCerts, platformContext.getConfiguration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import java.security.cert.X509Certificate;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import javax.security.auth.x500.X500Principal;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -54,7 +54,7 @@ public class NetworkPeerIdentifier {
* @param platformContext the platform context
* @param peers list of peers
*/
public NetworkPeerIdentifier(@NonNull final PlatformContext platformContext, @NonNull final List<PeerInfo> peers) {
public NetworkPeerIdentifier(@NonNull final PlatformContext platformContext, @NonNull final Set<PeerInfo> peers) {
Objects.requireNonNull(platformContext);
Objects.requireNonNull(peers);
noPeerFoundLogger = new RateLimitedLogger(logger, platformContext.getTime(), Duration.ofMinutes(5));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import javax.net.ssl.SSLException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -132,14 +132,14 @@ public static String formatException(final Throwable e) {
* NOTE: This method is a stepping stone to decoupling the networking from the platform.
*
* @param selfId the ID of the node
* @param peers the list of peers
* @param peers the set of peers
* @param keysAndCerts the keys and certificates to use for the TLS connections
* @param configuration the configuration of the network
* @return the created {@link SocketFactory}
*/
public static @NonNull SocketFactory createSocketFactory(
@NonNull final NodeId selfId,
@NonNull final List<PeerInfo> peers,
@NonNull final Set<PeerInfo> peers,
@NonNull final KeysAndCerts keysAndCerts,
@NonNull final Configuration configuration) {
Objects.requireNonNull(selfId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.swirlds.common.threading.locks.locked.LockedResource;
import com.swirlds.platform.network.connection.NotConnectedConnection;
import com.swirlds.platform.network.connectivity.OutboundConnectionCreator;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.Objects;

/**
* Manages a connection that is initiated by this node. If the connection in use is broken, it will try to establish a
Expand All @@ -35,9 +37,10 @@ public class OutboundConnectionManager implements ConnectionManager {
/** locks the connection managed by this instance */
private final AutoClosableResourceLock<Connection> lock = Locks.createResourceLock(currentConn);

public OutboundConnectionManager(final NodeId peerId, final OutboundConnectionCreator connectionCreator) {
this.peerId = peerId;
this.connectionCreator = connectionCreator;
public OutboundConnectionManager(
@NonNull final NodeId peerId, @NonNull final OutboundConnectionCreator connectionCreator) {
this.peerId = Objects.requireNonNull(peerId);
this.connectionCreator = Objects.requireNonNull(connectionCreator);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.List;
import java.util.Objects;
import java.util.Set;

/**
* Creates, binds and connects server and client sockets
Expand Down Expand Up @@ -134,5 +134,5 @@ static void configureAndConnect(
*
* @param peers the updated list of peers
*/
void reload(@NonNull final List<PeerInfo> peers);
void reload(@NonNull final Set<PeerInfo> peers);
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
import java.security.UnrecoverableKeyException;
import java.security.cert.Certificate;
import java.security.cert.CertificateException;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLServerSocket;
Expand Down Expand Up @@ -69,7 +69,7 @@ public class TlsFactory implements SocketFactory {
public TlsFactory(
@NonNull final Certificate agrCert,
@NonNull final PrivateKey agrKey,
@NonNull final List<PeerInfo> peers,
@NonNull final Set<PeerInfo> peers,
@NonNull final SocketConfig socketConfig,
@NonNull final CryptoConfig cryptoConfig)
throws NoSuchAlgorithmException, KeyStoreException, CertificateException, IOException,
Expand Down Expand Up @@ -133,7 +133,7 @@ public TlsFactory(
* {@inheritDoc}
*/
@Override
public void reload(@NonNull final List<PeerInfo> peers) {
public void reload(@NonNull final Set<PeerInfo> peers) {
try {
// we just reset the list for now, until the work to calculate diffs is done
// then, we will have two lists of peers to add and to remove
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import com.swirlds.common.platform.NodeId;
import com.swirlds.platform.network.RandomGraph;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;

/**
Expand All @@ -44,14 +44,14 @@ public interface NetworkTopology {
boolean shouldConnectToMe(NodeId nodeId);

/**
* @return a list of all peers this node should be connected to
* @return a set of all peers this node should be connected to
*/
List<NodeId> getNeighbors();
Set<NodeId> getNeighbors();

/**
* @return a list of peers this node should be connected to with the applied filter
* @return a set of peers this node should be connected to with the applied filter
*/
List<NodeId> getNeighbors(final Predicate<NodeId> filter);
Set<NodeId> getNeighbors(final Predicate<NodeId> filter);

/**
* @return the underlying graph on which this topology is based on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
package com.swirlds.platform.network.topology;

import com.swirlds.common.platform.NodeId;
import com.swirlds.platform.network.PeerInfo;
import com.swirlds.platform.network.RandomGraph;
import com.swirlds.platform.system.address.AddressBook;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;

/**
* A bidirectional topology that never changes.
Expand All @@ -34,9 +34,9 @@ public class StaticTopology implements NetworkTopology {

private final NodeId selfId;
/**
* Two nodes are neighbors if their indexes in the address book are neighbors in the connection graph.
* Two nodes are neighbors if their node indexes are neighbors in the connection graph.
*/
private final AddressBook addressBook;
private final Set<NodeId> peerNodes;

private final RandomGraph connectionGraph;

Expand All @@ -45,50 +45,43 @@ public class StaticTopology implements NetworkTopology {
*
* @param random a source of randomness, used to chose random neighbors, does not need to be
* cryptographically secure
* @param addressBook the current address book
* @param peers the set of peers in the network
* @param selfId the ID of this node
* @param numberOfNeighbors the number of neighbors each node should have
*/
public StaticTopology(
@NonNull final Random random,
@NonNull final AddressBook addressBook,
@NonNull final Set<PeerInfo> peers,
@NonNull final NodeId selfId,
final int numberOfNeighbors) {
this.addressBook = Objects.requireNonNull(addressBook, "addressBook must not be null");
this.peerNodes = Objects.requireNonNull(
peers.stream().map(PeerInfo::nodeId).collect(Collectors.toUnmodifiableSet()), "peers must not be null");
this.selfId = Objects.requireNonNull(selfId, "selfId must not be null");
this.connectionGraph = new RandomGraph(random, addressBook.getSize(), numberOfNeighbors, SEED);

if (!addressBook.contains(selfId)) {
throw new IllegalArgumentException("Address book does not contain selfId");
}
this.connectionGraph = new RandomGraph(random, peers.size() + 1, numberOfNeighbors, SEED);
}

/**
* {@inheritDoc}
*/
@Override
public List<NodeId> getNeighbors() {
public Set<NodeId> getNeighbors() {
return getNeighbors((nodeId -> true));
}

/**
* {@inheritDoc}
*/
@Override
public List<NodeId> getNeighbors(final Predicate<NodeId> filter) {
final int selfIndex = addressBook.getIndexOfNodeId(selfId);
return Arrays.stream(connectionGraph.getNeighbors(selfIndex))
.mapToObj(addressBook::getNodeId)
.filter(filter)
.toList();
public Set<NodeId> getNeighbors(final Predicate<NodeId> filter) {
return peerNodes;
}

/**
* {@inheritDoc}
*/
@Override
public boolean shouldConnectToMe(final NodeId nodeId) {
return isNeighbor(nodeId) && addressBook.getIndexOfNodeId(nodeId) < addressBook.getIndexOfNodeId(selfId);
return isNeighbor(nodeId) && nodeId.id() < selfId.id();
}

/**
Expand All @@ -98,20 +91,20 @@ public boolean shouldConnectToMe(final NodeId nodeId) {
* @return true if this node is my neighbor, false if not
*/
private boolean isNeighbor(final NodeId nodeId) {
if (!addressBook.contains(nodeId)) {
if (!peerNodes.contains(nodeId)) {
return false;
}
final int selfIndex = addressBook.getIndexOfNodeId(selfId);
final int nodeIndex = addressBook.getIndexOfNodeId(nodeId);
return connectionGraph.isAdjacent(selfIndex, nodeIndex);
final long selfIndex = selfId.id();
final long nodeIndex = nodeId.id();
return connectionGraph.isAdjacent((int) selfIndex, (int) nodeIndex);
}

/**
* {@inheritDoc}
*/
@Override
public boolean shouldConnectTo(final NodeId nodeId) {
return isNeighbor(nodeId) && addressBook.getIndexOfNodeId(nodeId) > addressBook.getIndexOfNodeId(selfId);
return isNeighbor(nodeId) && nodeId.id() > selfId.id();
}

/**
Expand Down

0 comments on commit f23c2ac

Please sign in to comment.