From 048973851db84262d653c8e80511cad11fd1759e Mon Sep 17 00:00:00 2001 From: Daniel Graczer Date: Mon, 17 May 2021 11:58:35 +0900 Subject: [PATCH] Added publicly available network map fixes: #251 --- .github/workflows/main.yml | 2 +- Dockerfile | 2 +- doc/config.example.yaml | 14 + dub.json | 1 + source/agora/api/FullNode.d | 21 + source/agora/common/Config.d | 23 ++ source/agora/network/Crawler.d | 597 +++++++++++++++++++++++++++++ source/agora/network/Manager.d | 7 +- source/agora/network/NodeLocator.d | 277 +++++++++++++ source/agora/node/FullNode.d | 21 + source/agora/test/Base.d | 68 +++- source/agora/test/Crawler.d | 72 ++++ 12 files changed, 1099 insertions(+), 6 deletions(-) create mode 100644 source/agora/network/Crawler.d create mode 100644 source/agora/network/NodeLocator.d create mode 100644 source/agora/test/Crawler.d diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index fc7023e4148..a0d01e86e10 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -65,7 +65,7 @@ jobs: if: runner.os == 'Linux' run: | sudo apt-get update - sudo apt-get install libsodium-dev libsqlite3-dev clang + sudo apt-get install libsodium-dev libsqlite3-dev clang libmaxminddb-dev - name: '[Windows] Install dependencies & setup environment' if: runner.os == 'Windows' diff --git a/Dockerfile b/Dockerfile index 096d3288f17..d92e1918217 100644 --- a/Dockerfile +++ b/Dockerfile @@ -19,7 +19,7 @@ COPY devel/dotgdbinit /root/.gdbinit COPY --from=Builder /root/packages/ /root/packages/ RUN apk --no-cache add --allow-untrusted -X /root/packages/build/ ldc-runtime=1.26.0-r0 \ && rm -rf /root/packages/ -RUN apk --no-cache add llvm-libunwind libgcc libsodium libstdc++ sqlite-libs +RUN apk --no-cache add llvm-libunwind libgcc libsodium libstdc++ sqlite-libs libmaxminddb COPY --from=Builder /root/agora/talos/build/ /usr/share/agora/talos/ COPY --from=Builder /root/agora/build/agora /usr/local/bin/agora WORKDIR /agora/ diff --git a/doc/config.example.yaml b/doc/config.example.yaml index ffe2d2c249a..f6644c691d1 100644 --- a/doc/config.example.yaml +++ b/doc/config.example.yaml @@ -55,6 +55,20 @@ node: # after `relay_tx_cache_exp_secs`. relay_tx_cache_exp_secs : 1200 + # true, if this node should collect statistics about other + # nodes in the network, including their geographical location and OS + collect_network_statistics : true + + # The number of network crawlers that will be instantiated to collects + # statistics about other nodes in the network + num_of_crawlers : 3 + + # The number of seconds one crawler should wait after successfully contacted a node + crawling_interval_secs : 2 + + # The path to a file containing IP address -> geographical location mapping + ipdb_path : "data/ipdb.mmdb" + # Each entry in this array is an interface Agora will listen to, allowing to # expose the same node on more than one network interface or with different # API, such as having one interface using HTTP+JSON and the other TCP+binary. diff --git a/dub.json b/dub.json index 6c8186dd547..a5e5e204af9 100644 --- a/dub.json +++ b/dub.json @@ -69,6 +69,7 @@ "dflags": [ "-extern-std=c++14", "-preview=in" ], "lflags-posix": [ "-lstdc++", "-lsqlite3" ], "lflags-windows": [ "sqlite3.lib", "/nodefaultlib:msvcetd.lib" ], + "libs": [ "maxminddb" ], "libs-windows": [ "iphlpapi" ], "buildRequirements": [ "allowWarnings" ], diff --git a/source/agora/api/FullNode.d b/source/agora/api/FullNode.d index f1a1793c587..f9b89e1a351 100644 --- a/source/agora/api/FullNode.d +++ b/source/agora/api/FullNode.d @@ -21,6 +21,7 @@ import agora.consensus.data.PreImageInfo; import agora.common.Types; import agora.common.Set; import agora.consensus.data.Transaction; +import agora.network.Crawler : CrawlResultHolder; import vibe.data.serialization; import vibe.http.common; @@ -41,6 +42,9 @@ public struct NodeInfo /// Partial or full view of the addresses of the node's quorum (based on is_complete) public Set!string addresses; + + /// String representation of the OS the node is running on + public string os; } /******************************************************************************* @@ -227,6 +231,23 @@ public interface API public PreImageInfo getPreimage (Hash enroll_key); + /*************************************************************************** + + Get the information about the network and it's reachable nodes. + + The information includes the network address and the geograpical location + of the nodes including the continent/country/city/latitude/longitude. + + API: + GET /network_info + + Returns: information about the network and about it's nodes + + + ***************************************************************************/ + + public CrawlResultHolder getNetworkInfo (); + /*************************************************************************** Get validators' pre-image information diff --git a/source/agora/common/Config.d b/source/agora/common/Config.d index a5e4d480880..c9234df17ac 100644 --- a/source/agora/common/Config.d +++ b/source/agora/common/Config.d @@ -228,6 +228,21 @@ public struct NodeConfig /// Transaction put into the relay queue will expire, and will be removed /// after `relay_tx_cache_exp`. public Duration relay_tx_cache_exp; + + /// true, if this node should collect statistics about other + /// nodes in the network, including their geographical location and OS + public bool collect_network_statistics; + + /// The number of network crawlers that will be instantiated to collects + /// statistics about other nodes in the network + public ubyte num_of_crawlers; + + /// The number of seconds one crawler should wait after successfully contacted + /// a node + public Duration crawling_interval; + + /// The path to a file containing IP address -> geographical location mapping + public string ipdb_path; } /// Validator config @@ -524,6 +539,10 @@ private NodeConfig parseNodeConfig (Node* node, in CommandLine cmdln) Duration relay_tx_interval = opt!(ulong, "node", "relay_tx_interval_secs")(cmdln, node, 15).seconds; const relay_tx_min_fee = Amount(opt!(ulong, "node", "relay_tx_min_fee")(cmdln, node, 0)); Duration relay_tx_cache_exp = opt!(ulong, "node", "relay_tx_cache_exp_secs")(cmdln, node, 1200).seconds; + const collect_network_statistics = opt!(bool, "node", "collect_network_statistics")(cmdln, node, true); + const num_of_crawlers = opt!(ubyte, "node", "num_of_crawlers")(cmdln, node, 1); + Duration crawling_interval = opt!(ulong, "node", "crawling_interval_secs")(cmdln, node, 3).seconds; + const ipdb_path = opt!(string, "node", "ipdb_path")(cmdln, node, "data/ipdb.mmdb"); NodeConfig result = { min_listeners : min_listeners, @@ -545,6 +564,10 @@ private NodeConfig parseNodeConfig (Node* node, in CommandLine cmdln) relay_tx_interval : relay_tx_interval, relay_tx_min_fee : relay_tx_min_fee, relay_tx_cache_exp : relay_tx_cache_exp, + collect_network_statistics : collect_network_statistics, + num_of_crawlers : num_of_crawlers, + crawling_interval : crawling_interval, + ipdb_path : ipdb_path, }; return result; } diff --git a/source/agora/network/Crawler.d b/source/agora/network/Crawler.d new file mode 100644 index 00000000000..741f3f52b70 --- /dev/null +++ b/source/agora/network/Crawler.d @@ -0,0 +1,597 @@ +/******************************************************************************* + + Contains a network crawler implementation, that tries to determine + node properties like geographical location, node OS for the reachable + network nodes. + + The implementation uses a never ending BFS search, where certain nodes + can be temporarily banned. Nodes can get temporarily banned, if they become + unreachable for a certain period of time, or if they recommend too many + unreachable nodes. + + The implementation contains some performance optimizations: + + 1. Multiple fibers can crawl the network at the same time + 2. Additional supporting data structures are added for fast lookups of + already discovered nodes + + The implementation tries to minimize the impact of ill-behaved nodes: + + 1. Nodes that are unreachable for a certain period of time, or nodes that + recommend too many unreachable nodes are temporarily banned + 2. Nodes only take a limited number of node recommendation from other nodes + 3. Node do simple address sanity checks on the recommendations, before even + trying to contact them + 4. Nodes ignore the port of other nodes during crawling, so an attacker + can not start up multiple nodes on the same machine on different ports + to distort crawling results + 5. Nodes translate domain names to IP addresses during crawling, so an attacker + can not start up multiple nodes on multiple subdomains like sub1.attacker.com, + sub2.attacker.com with the same IP address to distort crawling results + + The implementation tries to recover from network failures: + + Whenever a particular node experiences a local network faluire, all other nodes + would appear as offline, and all of them would eventually be temporarily banned. + Every temporarily banned is immediately removed from the BFS candidate + nodes, and the implementation would end up with no nodes to continue the + never BFS search. In order to avoid this situations, certain (seed) + nodes can never be removed from the BFS candidate search list, even if the seed + node gets banned. Although the crawling results for those banned seed nodes + is removed from the overall crawling results. + + The implementaion handles some special cases + + 1. Multiple crawling fibers will use the same instance of NodeLocator, + so the fiber shutdown needs to be coordinated to make sure during the + shutdown no fibers would try to use an already stopped NodeLocator + 2. Current TestNetworkManager.getClient asserts with 0 whenever a node + would like to create a NetworkClient to an already shut down node. As nodes + (and crawlers) stops at different times, the above assert could trigger. + In order to avoid this a new config parameter called + `TestConf.use_non_assert_get_client` was added. + + Copyright: + Copyright (c) 2019-2021 BOSAGORA Foundation + All rights reserved. + + License: + MIT License. See LICENSE for details. + +*******************************************************************************/ + +module agora.network.Crawler; + +import agora.api.FullNode : NodeInfo; +import agora.common.BanManager; +import agora.common.Config; +import agora.common.Set; +import agora.common.Task; +import agora.common.Types; +import agora.network.Client; +import agora.network.Clock; +import agora.network.Manager; +import agora.network.NodeLocator; +import agora.serialization.Serializer; +import agora.utils.InetUtils; +import agora.utils.Log; + +import std.algorithm; +import std.container.dlist; +import std.conv : to; +import std.range : iota; +import std.socket : AddressFamily, getAddressInfo; +import std.traits : KeyType, ValueType; + +import core.time; + +/// Result obtained by crawling a particular node +public static struct CrawlResult +{ + /// + public string continent; + + /// + public string country; + + /// + public string city; + + /// Latitude coordinates of the node + public string latitude; + + /// Longitude coordinates of the node + public string longitude; + + /// Operating system the node is running on + public string os; + + /// The time the crawling result + public TimePoint crawl_time; +} + +/// Crawling results for all the nodes that are directly or indirectly reachable +/// from this node. Special serialize/fromBinary methods are provided, because +/// the current binary serializer cannot deal with associative arrays. +public static struct CrawlResultHolder +{ + /// Results for all the nodes that are directly or indirectly reachable + /// from this node + public CrawlResult[Address] crawl_results; + + /// + alias crawl_results this; + + /// + public void serialize (scope SerializeDg dg) const @safe + { + serializePart(cast(ulong) crawl_results.length, dg); + foreach (const ref key, const ref value; crawl_results) + { + serializePart(key, dg); + serializePart(value, dg); + } + } + + /// + public static CrawlResultHolder fromBinary (CrawlResultHolder) + (scope DeserializeDg dg, in DeserializerOptions opts) @safe + { + alias CRT = typeof(CrawlResultHolder.crawl_results); + CRT crawl_results; + + iota(deserializeFull!(ulong)(dg, opts)).each!( _ => + crawl_results[deserializeFull!(KeyType!CRT)(dg, opts)] = + deserializeFull!(ValueType!CRT)(dg, opts) + ); + return CrawlResultHolder(crawl_results); + } + + unittest + { + CrawlResultHolder crawl_result_holder; + CrawlResult crawl_result; + crawl_result.continent = "Asia"; + crawl_result_holder["a"] = crawl_result; + crawl_result.continent = "Europe"; + crawl_result_holder["z"] = crawl_result; + + testSymmetry(crawl_result_holder); + } +} + +/// Crawler implementation, using the MMDB to retrieve node's geographical location +public class Crawler +{ + + /// Struct to hold information about a particular network client + private struct NetworkClientHolder + { + /// The network address of the node, that introduced the node + /// `this.network_client` + public Address recommender_address; + + /// The node that was introduced by `this.recommender_address` + public NetworkClient network_client; + } + + /// Task manager + private ITaskManager taskman; + + /// Ban manager + private BanManager banman; + + /// Config + private Config config; + + /// Network manager + private NetworkManager network_manager; + + /// Node locator + private INodeLocator node_locator; + + /// Logger + private Logger log; + + /// Clock + private Clock clock; + + /// 1. Crawler fibers pop an already dicovered node from the front of the list + /// 2. Crawler fibers try to get node recommendations from the popped node + /// 3. If the popped node answers with a list of node recommendations + /// then all those new recommendations and the popped node are pushed to + /// the back of the list + private DList!NetworkClientHolder discovered_node_list; + + /// Lookup map to decide, whether the current node have discovered + /// a particular node. This maps supplements `discovered_node_list`: + /// + /// 1. Iterating over `discovered_node_list` is slow + /// 2. Iterating over `discovered_node_list` cannot reliably answer, whether + /// a node has been discovered, as crawling fibers pop elements off of it + private NetworkClientHolder[Address] discovered_node_map; + + /// Contains the crawling results. A particular node is removed from this + /// data structure, after it is banned + private CrawlResultHolder crawl_result_holder; + + /// Indicating whether the crawler is in the process of shutting down + private bool is_shutting_down; + + /// Number of crawler fibers that already shut down + private ubyte already_shutdown_cnt; + + public this (ITaskManager taskman, Clock clock, Config config, NetworkManager network_manager) + { + this.taskman = taskman; + this.clock = clock; + this.config = config; + this.network_manager = network_manager; + this.node_locator = this.makeNodeLocator(); + this.log = Logger(__MODULE__); + + BanManager.Config ban_config = + { + max_failed_requests : 25, + ban_duration : 30.minutes + }; + this.banman = new BanManager(ban_config, clock, "banned_crawler.dat"); + } + + /// entry point for the crawling fibers + private void crawl () + { + // Fiber shutdown have to be coordinated among all fibers + while (!this.is_shutting_down) + { + if (discovered_node_list.empty) + { + this.taskman.wait(this.config.node.crawling_interval); + continue; + } + + // pop a node from the front of the list + auto network_client_holder = this.discovered_node_list.front; + auto client_address = network_client_holder.network_client.address; + this.discovered_node_list.removeFront(); + + NodeInfo node_info; + try + // Node might be banned by too many wrong recommendation, despite + // the fact the node is constantly available + if (this.banman.isBanned(client_address)) + throw new Exception("Client banned"); + else + node_info = network_client_holder.network_client.getNodeInfo(); + catch (Exception e) + { + // Penalizing the recommender with weight 1 + this.banman.onFailedRequest(InetUtils.extractHostAndPort( + network_client_holder.recommender_address).host, 1); + // Penalizing the unreachable node with weight 5 + this.banman.onFailedRequest(InetUtils.extractHostAndPort( + client_address).host, 5); + + // Node might be banned by being unreachable for too long + if (this.banman.isBanned(client_address)) + { + // Seed nodes are never entirely banned, and need to be put + // back into the queue, however the crawling results of the + // seed nodes need to be removed. + if (client_address in this.network_manager.seed_addresses) + { + this.taskman.wait(this.config.node.crawling_interval); + this.discovered_node_list.insertBack(network_client_holder); + } + else + discovered_node_map.remove(client_address); + + crawl_result_holder.crawl_results.remove(client_address); + } + else + this.discovered_node_list.insertBack(network_client_holder); + + // Crawling failed for this node, and should be continued immediately + // without waiting + continue; + } + + // Simple checks on the recommended nodes and if checks pass, + // then add them to the data structures + this.processNetworkAddresses(client_address, node_info.addresses.pickRandom(10)); + + // Node proved that, it implements getNodeInfo method, and it is still + // running, so it's statistics should be added to the crawling results + if (auto it = client_address in this.crawl_result_holder) + it.crawl_time = this.clock.networkTime(); + else + this.crawl_result_holder[client_address] = + this.populateCrawlResult( + this.node_locator.extractValues(client_address, + [ + "continent->names->en", + "country->names->en", + "city->names->en", + "location->latitude", + "location->longitude", + ]) ~ node_info.os); + + // After a succesfull crawl, wait before starting a new one + this.taskman.wait(this.config.node.crawling_interval); + this.discovered_node_list.insertBack(network_client_holder); + } + + // After all the fibers exited the for loop, and there is no fiber + // trying to use the NodeLocator, resources can be freed + if (this.already_shutdown_cnt++ == this.config.node.num_of_crawlers) + do_stop(); + } + + /*************************************************************************** + + Populates a `CrawlResult` based on the string values passed in and + based on current time + + Params: + extracted_values = the values that will be used to populate the + `CrawlResult` object + + Returns: + a `CrawlResult` object populated from `extracted_values` + + ***************************************************************************/ + + private CrawlResult populateCrawlResult (string[] extracted_values) @safe nothrow + { + CrawlResult crawl_result; + foreach (i, ref field; crawl_result.tupleof) + static if (is(typeof(field) == string)) + field = extracted_values[i]; + else if (is(typeof(field) == TimePoint)) + field = this.clock.networkTime(); + + return crawl_result; + } + + unittest + { + import std.algorithm : map; + import std.array : array; + import std.conv : to; + + auto mock_clock = new MockClock(6); + auto crawler = new Crawler(null, mock_clock, Config(), null); + CrawlResult expected; + expected.continent = "0"; + expected.country = "1"; + expected.city = "2"; + expected.latitude = "3"; + expected.longitude = "4"; + expected.os = "5"; + expected.crawl_time = 6; + + assert(crawler.populateCrawlResult(iota(6).map!(i => i.to!string()).array()) == expected); + } + +/*************************************************************************** + + Get the information about the network and it's reachable nodes. + + The information includes the network address and the geograpical location + of the nodes including the continent/country/city/latitude/longitude. + + API: + GET /network_info + + Returns: information about the network and about it's nodes + + + ***************************************************************************/ + + + public CrawlResultHolder getNetworkInfo () @safe @nogc nothrow pure + { + return crawl_result_holder; + } + + /*************************************************************************** + + Creates and returns a newly created NodeLocator object + + Returns: + a newly created NodeLocator object + + ***************************************************************************/ + + protected INodeLocator makeNodeLocator () + { + return new NodeLocatorGeoIP(this.config.node.ipdb_path); + } + + // Start the Crawler + public void start () + { + import std.array : array; + + if (!this.config.node.collect_network_statistics) + return; + + // at least 1 seed node needs to be provided + this.network_manager.seed_addresses.length || assert(0); + + // node locator object should start successfully + this.node_locator.start() || assert(0); + + this.processNetworkAddresses("seed", this.network_manager.seed_addresses[].array()); + + // start the crawling fibers + iota(config.node.num_of_crawlers).each!(_ => + this.taskman.runTask(() + { + this.crawl(); + }) + ); + } + + // Initiate stopping the Crawler + public void stop () @safe @nogc nothrow pure + { + this.is_shutting_down = true; + } + + // Stop the Crawler + private void do_stop () + { + this.node_locator.stop(); + } + + /*************************************************************************** + + Processes network addresses by checking the addresses and updating the + internal data structures + + Params: + recommender_address = the node address which recommended `addresses` + addresses = the recommended addresses + + Returns: + network clients that were created while processing the addresses + + ***************************************************************************/ + + protected NetworkClientHolder[] processNetworkAddresses (Address recommender_address, Address[] addresses) + { + NetworkClientHolder[] network_client_holders; + network_client_holders.reserve(addresses.length); + foreach (address; addresses) + if (auto transformed_address = this.transformAddress(address)) + { + auto network_client = this.getNetworkClient(transformed_address); + // getting the client of already shut down/never started nodes + // returns null in unittests + version (unittest) + if (network_client is null) + break; + auto network_client_holder = NetworkClientHolder(recommender_address, network_client); + this.discovered_node_list.insertBack(network_client_holder); + this.discovered_node_map[network_client.address] = network_client_holder; + network_client_holders ~= network_client_holder; + } + return network_client_holders; + } + + /*************************************************************************** + + Checks and transform an address to the canonical representation of + [schema]://[IP address]:[port]. If any of the checks fail, then the + returned value is null, otherwise the canonical representation is + returned. + + Params: + address = the address to check and transform + + Returns: + null, if any of the checks fail, the canonical representation of + the address + + ***************************************************************************/ + + protected Address transformAddress (Address address) @safe + { + // Breaking up the address into host, port and determining the host type + auto host_port = InetUtils.extractHostAndPort(address); + + if (host_port.type == HostType.Invalid || this.banman.isBanned(host_port.host)) + return null; + + // Domain names needs to be resolved before proceeding, as subdomains + // can be created at no cost by an attacker, while obtaining multiple + // IP addresses comes at a cost + string resolved_host; + if (host_port.type == HostType.Domain) + try + foreach (ref addr_info; getAddressInfo(host_port.host)) + { + string resolved_address_candidate = addr_info.address.toAddrString(); + if (addr_info.address.addressFamily == AddressFamily.INET6) + resolved_address_candidate = InetUtils.expandIPv6(resolved_address_candidate); + + if (addr_info.address.addressFamily == AddressFamily.INET || + addr_info.address.addressFamily == AddressFamily.INET6 ) + { + resolved_host = resolved_address_candidate; + break; + } + } + catch (Exception e) + { + log.trace("Error happened while trying to resolve DNS name {}", host_port.host); + return null; + } + else + resolved_host = host_port.host; + + if (resolved_host in this.discovered_node_map || this.banman.isBanned(resolved_host)) + return null; + + return host_port.schema ~ "://" ~ resolved_host ~ ":" ~ host_port.port.to!string; + } + + unittest + { + import std.algorithm : count; + import core.time; + + auto mock_clock = new MockClock(0); + BanManager.Config banman_config = {ban_duration : 10.seconds}; + auto banman = new BanManager(banman_config, mock_clock, "garbageDir"); + auto crawler = new Crawler(null, mock_clock, Config(), null); + crawler.banman = banman; + + // Returned string has to have a format of ://: + assert(crawler.transformAddress("8.8.8.8") == "http://8.8.8.8:80"); + assert(crawler.transformAddress("http://8.8.8.8") == "http://8.8.8.8:80"); + assert(crawler.transformAddress("http://8.8.8.8:1234") == "http://8.8.8.8:1234"); + assert(crawler.transformAddress("https://8.8.8.8") == "https://8.8.8.8:443"); + + // DNS domains must be resolved + assert(crawler.transformAddress("bosagora.io").count('.') == 3); + + // Banned hosts should return null + banman.ban("8.8.8.8"); + assert(crawler.transformAddress("http://8.8.8.8") == null); + // Unban node by advancing time + mock_clock.setTime(11); + assert(crawler.transformAddress("http://8.8.8.8") == "http://8.8.8.8:80"); + + // Slready discovered hosts should return null + crawler.discovered_node_map["8.8.8.8"] = NetworkClientHolder(); + assert(crawler.transformAddress("8.8.8.8") == null); + } + + protected NetworkClient getNetworkClient (Address address) + { + return this.network_manager.getNetworkClient(this.taskman, this.banman, + address, network_manager.getClient(address, this.config.node.timeout), + this.config.node.retry_delay, this.config.node.max_retries); + } +} + +/// Test crawler used in network test which uses the NodeLocatorMock +public class TestCrawler : Crawler +{ + this (ITaskManager taskman, Clock clock, Config config, NetworkManager network_manager) + { + super(taskman, clock, config, network_manager); + } + + protected override INodeLocator makeNodeLocator () const @safe pure nothrow + { + return new NodeLocatorMock(); + } + + protected override Address transformAddress (Address address) const @safe @nogc pure nothrow + { + return address; + } +} diff --git a/source/agora/network/Manager.d b/source/agora/network/Manager.d index 19bf3366718..8dafeec4d59 100644 --- a/source/agora/network/Manager.d +++ b/source/agora/network/Manager.d @@ -52,11 +52,13 @@ import vibe.web.rest; import std.algorithm; import std.array; import std.container : DList; +import std.conv : to; import std.datetime.stopwatch; import std.exception; import std.format; import std.random; import std.range : walkLength; +import std.system : os; import core.stdc.time; import core.time; @@ -407,6 +409,9 @@ public class NetworkManager /// Maximum connection tasks to run in parallel private enum MaxConnectionTasks = 10; + /// The string representation of the OS this node runs on + public static immutable Os = os.to!string(); + /// Ctor public this (in Config config, Metadata metadata, ITaskManager taskman, Clock clock) { @@ -1080,7 +1085,7 @@ public class NetworkManager return NodeInfo( this.minPeersConnected() ? NetworkState.Complete : NetworkState.Incomplete, - this.known_addresses); + this.known_addresses, Os); } /*************************************************************************** diff --git a/source/agora/network/NodeLocator.d b/source/agora/network/NodeLocator.d new file mode 100644 index 00000000000..b98c48e304a --- /dev/null +++ b/source/agora/network/NodeLocator.d @@ -0,0 +1,277 @@ +/******************************************************************************* + + Contains code to locate the geographilcal location of the nodes in the network. + + The current implementation retrieves the node's continent, country, city, + latitue, longitude information. + + Copyright: + Copyright (c) 2019-2021 BOSAGORA Foundation + All rights reserved. + + License: + MIT License. See LICENSE for details. + +*******************************************************************************/ + +module agora.network.NodeLocator; + +import agora.common.Types : Address; +import agora.utils.Log; +import mmdb.MaxMindDB; + +import core.stdc.errno; +import core.stdc.stdio; +import core.stdc.string; + +/// Interface for the node locator +public interface INodeLocator +{ + /// Start the node locator, returns true on succes, false otherwise + public bool start (); + + /// Stop the node locator + public void stop (); + + /*************************************************************************** + + Extract certain properties of the geographical location of the node. + The properties can be continent, country, ... + The exact properties that can be retrieve are implementation dependent. + + Params: + address = the network address of the node + paths = query parts to identify which properties needs to be retrieved + + Returns: + an array of retrieved properties, if a particular property + cannot be retrieved, then `MissingValue` is returned + + ***************************************************************************/ + + public string[] extractValues (Address address, string[] paths); + + /// Value returned when a property cannot be retrieved by `extractValues` + public static immutable MissingValue = "Unknown"; +} + +/// Node locator implementation using MaxMindDB to retrieve the geographical +/// location of the nodes +public class NodeLocatorGeoIP : INodeLocator +{ + /// path to the MaxMindDB database + private string locator_db_path; + + /// MMDB_s struct returned by `open` call + private MMDB_s mmdb; + + /// Logger + private Logger log; + + /// Path separator between the elements of the property query path, for + /// example: `continent->names->en` + public static immutable PathSeparator = "->"; + + /// + public this (string locator_db_path) + { + this.log = Logger(__MODULE__); + this.locator_db_path = locator_db_path; + } + + /// Start the node locator, returns true on succes, false otherwise + public bool start () + { + int status = MMDB_open(this.locator_db_path.ptr, MMDB_MODE_MMAP, &this.mmdb); + + if (MMDB_SUCCESS != status) + { + log.error("Can't open {} - {}", this.locator_db_path, MMDB_strerror(status)); + + if (MMDB_IO_ERROR == status) + log.error("IO error: {}", strerror(errno)); + + return false; + } + return true; + } + + /// Stop the node locator + public void stop () + { + MMDB_close(&this.mmdb); + this.mmdb = MMDB_s.init; + } + + /// + public ~this () + { + if (this.mmdb != MMDB_s.init) + { + fprintf(stderr, "NodeLocator.shutdown() was not called manually"); + this.stop(); + } + } + + /*************************************************************************** + + Extract certain properties of the geographical location of the node. + The properties can be continent, country, ... + The exact properties that can be retrieve are implementation dependent. + + Params: + address = the network address of the node + paths = query parts to identify which properties needs to be retrieved + + Returns: + an array of retrieved properties, if a particular property + cannot be retrieved, then `MissingValue` is returned + + ***************************************************************************/ + + public string[] extractValues (Address address, string[] paths) + { + import std.algorithm : map; + import std.array : array, split; + import std.range : repeat; + import std.string : toStringz; + + string[] extractedValues; + extractedValues.length = paths.length; + + // Look up the value in the database + int gai_error; + int mmdb_error; + MMDB_lookup_result_s result = MMDB_lookup_string(&this.mmdb, address.ptr, &gai_error, &mmdb_error); + if (0 != gai_error) + { + log.warn("Error from getaddrinfo for {}", address); + return MissingValue.repeat(paths.length).array(); + } + if (MMDB_SUCCESS != mmdb_error) + { + log.warn("Got an error from libmaxminddb: {}", MMDB_strerror(mmdb_error)); + return MissingValue.repeat(paths.length).array(); + } + + // Extract the data + foreach (ind, path; paths) + { + auto path_splitted = path.split(PathSeparator).map!(path_part => path_part.toStringz()).array(); + path_splitted ~= null; + MMDB_entry_data_s entry_data; + int status = MMDB_aget_value(&result.entry, &entry_data, path_splitted.ptr); + extractedValues[ind] = (MMDB_SUCCESS == status && entry_data.has_data) + ? getStringFromEntry(entry_data) + : MissingValue; + } + return extractedValues; + } + + /*************************************************************************** + + Convert data in MMDB_entry_data_s structure to string + + Params: + entry_data = data to convert + + Returns: + data converted to string + + ***************************************************************************/ + + private string getStringFromEntry (const ref MMDB_entry_data_s entry_data) const + { + import std.conv : to; + import std.utf : toUTF8; + + switch (entry_data.type) + { + case MMDB_DATA_TYPE_UTF8_STRING : + return entry_data.utf8_string[0 .. entry_data.data_size].toUTF8(); + case MMDB_DATA_TYPE_FLOAT : + return entry_data.float_value.to!string(); + case MMDB_DATA_TYPE_DOUBLE : + return entry_data.double_value.to!string(); + case MMDB_DATA_TYPE_UINT16 : + return entry_data.uint16.to!string(); + case MMDB_DATA_TYPE_UINT32 : + return entry_data.uint32.to!string(); + case MMDB_DATA_TYPE_INT32 : + return entry_data.int32.to!string(); + case MMDB_DATA_TYPE_UINT64 : + return entry_data.uint64.to!string(); + case MMDB_DATA_TYPE_BOOLEAN : + return entry_data.boolean.to!string(); + default : + assert(0, "Datatype not implemented"); + } + } + + unittest + { + auto node_locator = new NodeLocatorGeoIP("data/ip.mmdb"); + node_locator.start(); + assert(node_locator.extractValues( + "211.179.51.66", + [ + "continent->names->en", + "country->names->en", + "city->names->en", + "location->latitude", + "location->longitude", + ]) == + ["Asia", "South Korea", "Seoul (Namdaemunno 5(o)-ga)", "37.5562", "126.975"]); + node_locator.stop(); + } +} + +/// Node locator mock which is supposed to be used in network tests +public class NodeLocatorMock : INodeLocator +{ + /// Start the node locator, returns true on succes, false otherwise + public override bool start () const @safe @nogc pure nothrow {return true;} + + /// Stop the node locator + public override void stop () const @safe @nogc pure nothrow {} + + /*************************************************************************** + + Extract certain properties of the geographical location of the node. + The properties can be continent, country, ... + The exact properties that can be retrieve are implementation dependent. + + Params: + address = the network address of the node + paths = query parts to identify which properties needs to be retrieved + + Returns: + an array of retrieved properties, if a particular property + cannot be retrieved, then `MissingValue` is returned + + ***************************************************************************/ + + public override string[] extractValues (Address address, string[] paths) const @safe pure nothrow + { + // Returns the same information regardless of the address + string[string] path_map = + [ + "continent->names->en" : "Asia", + "country->names->en" : "South Korea", + "city->names->en" : "Seoul (Namdaemunno 5(o)-ga)", + "location->latitude" : "1.111", + "location->longitude" : "2.222", + ]; + + string[] extractedValues; + extractedValues.length = paths.length; + + foreach (ind, path; paths) + if (auto it = path in path_map) + extractedValues[ind] = *it; + else + extractedValues[ind] = MissingValue; + + return extractedValues; + } +} diff --git a/source/agora/node/FullNode.d b/source/agora/node/FullNode.d index b7a4c16f8d5..b1b1b3d037c 100644 --- a/source/agora/node/FullNode.d +++ b/source/agora/node/FullNode.d @@ -47,6 +47,7 @@ import agora.crypto.Hash; import agora.crypto.Key; import agora.network.Client; import agora.network.Clock; +import agora.network.Crawler; import agora.network.Manager; import agora.node.BlockStorage; import agora.node.Ledger; @@ -143,6 +144,9 @@ public class FullNode : API /// Transaction relayer protected TransactionRelayer transaction_relayer; + /// Crawler + protected Crawler crawler; + /*************************************************************************** Persistence-related fields @@ -256,6 +260,7 @@ public class FullNode : API this.enroll_man = this.makeEnrollmentManager(); this.fee_man = this.makeFeeManager(); this.transaction_relayer = this.makeTransactionRelayer(); + this.crawler = this.makeCrawler(); const ulong StackMaxTotalSize = 16_384; const ulong StackMaxItemSize = 512; this.engine = new Engine(StackMaxTotalSize, StackMaxItemSize); @@ -385,6 +390,7 @@ public class FullNode : API // Block externalized handler is set and push for Genesis block. if (this.block_handlers.length > 0 && this.getBlockHeight() == 0) this.pushBlock(this.params.Genesis); + this.crawler.start(); } /// Returns an already instantiated version of the BanManager @@ -568,6 +574,7 @@ public class FullNode : API this.utxo_set = null; this.enroll_man = null; this.timers = null; + this.crawler.stop(); } /// Make a new instance of the consensus parameters based on the config @@ -611,6 +618,12 @@ public class FullNode : API return new StatsServer(this.config.node.stats_listening_port); } + /// Returns a newly constructed Crawler + protected Crawler makeCrawler () + { + return new Crawler(this.taskman, this.clock, this.config, this.network); + } + /// Returns: The Logger to use for this class protected Logger makeLogger () { @@ -886,6 +899,14 @@ public class FullNode : API return this.ledger.getBlockHeight(); } + /// GET: /network_info + public override CrawlResultHolder getNetworkInfo () @safe + { + this.endpoint_request_stats + .increaseMetricBy!"agora_endpoint_calls_total"(1, "network_info", "http"); + return this.crawler.getNetworkInfo(); + } + /// GET: /blocks_from public override const(Block)[] getBlocksFrom (ulong height, uint max_blocks) @safe diff --git a/source/agora/test/Base.d b/source/agora/test/Base.d index 25089c0dc0e..28ad029f08e 100644 --- a/source/agora/test/Base.d +++ b/source/agora/test/Base.d @@ -51,6 +51,7 @@ import agora.crypto.Hash; import agora.crypto.Key; import agora.network.Client; import agora.network.Clock; +import agora.network.Crawler; import agora.network.Manager; import agora.node.BlockStorage; import agora.node.FullNode; @@ -1245,14 +1246,19 @@ public class TestNetworkManager : NetworkManager /// public Registry!NameRegistryAPI* nregistry; + /// + public TestConf test_conf; + /// Constructor public this (Parameters!(NetworkManager.__ctor) args, string address, - Registry!TestAPI* reg, Registry!NameRegistryAPI* nreg) + Registry!TestAPI* reg, Registry!NameRegistryAPI* nreg, + TestConf test_conf = TestConf.init) { super(args); this.registry = reg; this.nregistry = nreg; this.address = address; + this.test_conf = test_conf; } /// @@ -1262,7 +1268,11 @@ public class TestNetworkManager : NetworkManager auto tid = this.registry.locate(address); if (tid != typeof(tid).init) return new RemoteAPI!TestAPI(tid, timeout); - assert(0, "Trying to access node at address '" ~ address ~ + + if (this.test_conf.use_non_assert_get_client) + return null; + else + assert(0, "Trying to access node at address '" ~ address ~ "' without first creating it"); } @@ -1564,6 +1574,12 @@ private mixin template TestNodeMixin () return new MemMetadata(); } + /// Make a Crawler instance + protected override Crawler makeCrawler () + { + return new TestCrawler(this.taskman, this.clock, this.config, this.network); + } + /// Return a LocalRest-backed task manager protected override ITaskManager makeTaskManager () { @@ -1577,7 +1593,8 @@ private mixin template TestNodeMixin () assert(taskman !is null); return new TestNetworkManager( this.config, metadata, taskman, clock, - this.config.interfaces[0].address, this.registry, this.nregistry); + this.config.interfaces[0].address, this.registry, + this.nregistry, this.test_conf); } /// Return an enrollment manager backed by an in-memory SQLite db @@ -1700,6 +1717,8 @@ public class TestFullNode : FullNode, TestAPI /// mixin TestNodeMixin!(); + /// + private TestConf test_conf; /// public this (Config config, Registry!TestAPI* reg, Registry!NameRegistryAPI* nreg, @@ -1710,6 +1729,7 @@ public class TestFullNode : FullNode, TestAPI this.blocks = blocks; this.cur_time = cur_time; this.test_start_time = *cur_time; + this.test_conf = test_conf; super(config); } @@ -1720,6 +1740,13 @@ public class TestFullNode : FullNode, TestAPI (out long time_offset) { return true; }, this.cur_time); } + /// Start periodic network discover based on test config settings + protected override void startPeriodicDiscovery () + { + if (this.test_conf.do_network_discovery) + super.startPeriodicDiscovery(); + } + /// ditto public override Enrollment setRecurringEnrollment (bool doIt) { @@ -1768,6 +1795,9 @@ public class TestValidatorNode : Validator, TestAPI /// mixin TestNodeMixin!(); + /// + private TestConf test_conf; + /// public this (Config config, Registry!TestAPI* reg, Registry!NameRegistryAPI* nreg, immutable(Block)[] blocks, in TestConf test_conf, shared(TimePoint)* cur_time) @@ -1778,6 +1808,7 @@ public class TestValidatorNode : Validator, TestAPI this.txs_to_nominate = test_conf.txs_to_nominate; this.cur_time = cur_time; this.test_start_time = *cur_time; + this.test_conf = test_conf; super(config); } @@ -1807,6 +1838,13 @@ public class TestValidatorNode : Validator, TestAPI &this.acceptBlock, this.txs_to_nominate, this.test_start_time); } + /// Start periodic network discover based on test config settings + protected override void startPeriodicDiscovery () + { + if (this.test_conf.do_network_discovery) + super.startPeriodicDiscovery(); + } + /// Provides a unittest-adjusted clock source for the node protected override TestClock makeClock (ITaskManager taskman) { @@ -1958,6 +1996,27 @@ public struct TestConf /// Transaction put into the relay queue will expire, and will be removed /// after `relay_tx_cache_exp`. Duration relay_tx_cache_exp = 60.minutes; + + /// true, if this node should collect statistics about other + /// nodes in the network, including their geographical location and OS + public bool collect_network_statistics = false; + + /// The number of network crawlers that will be instantiated to collects + /// statistics about other nodes in the network + public ubyte num_of_crawlers = 1; + + /// The number of seconds one crawler should wait after successfully contacted + /// a node + public Duration crawling_interval = 2.seconds; + + /// true, if NetworkManager.getClient should return null, as opposed to assert, + /// in case the node passed to getClient has already been shut down or + /// never created + public bool use_non_assert_get_client = false; + + /// true, if NetworkManager should automatically initiate network discovery, + /// when the node is started up + public bool do_network_discovery = true; } /******************************************************************************* @@ -2039,6 +2098,9 @@ public APIManager makeTestNetwork (APIManager : TestAPIManager = TestAPIManager) relay_tx_interval : test_conf.relay_tx_interval, relay_tx_min_fee : test_conf.relay_tx_min_fee, relay_tx_cache_exp : test_conf.relay_tx_cache_exp, + num_of_crawlers : test_conf.num_of_crawlers, + crawling_interval : test_conf.crawling_interval, + collect_network_statistics : test_conf.collect_network_statistics, }; return conf; diff --git a/source/agora/test/Crawler.d b/source/agora/test/Crawler.d new file mode 100644 index 00000000000..f8d1f86fc9a --- /dev/null +++ b/source/agora/test/Crawler.d @@ -0,0 +1,72 @@ +/******************************************************************************* + + Tests for the network crawlers, that are trying to determine node properties + like geographical location or node OS. + + Copyright: + Copyright (c) 2019-2021 BOSAGORA Foundation + All rights reserved. + + License: + MIT License. See LICENSE for details. + +*******************************************************************************/ + +module agora.test.Crawler; + +version (unittest): + +import agora.consensus.data.genesis.Test; +import agora.test.Base; +import agora.utils.Utility : retryFor; + +import std.array : array; +import std.conv : to; + +import core.time; +import core.thread.osthread : Thread; + +/// +void test_crawling (NetworkTopology topology) +{ + TestConf test_conf = + { + topology : topology, + // Making sure running nodes doesn't alter the original topology + // by switching off network discovery + do_network_discovery : false, + use_non_assert_get_client : true, + collect_network_statistics : true, + num_of_crawlers : 1, + crawling_interval : 50.msecs, + full_nodes : 2, + }; + + auto network = makeTestNetwork!TestAPIManager(test_conf); + network.start(); + scope(exit) network.shutdown(); + scope(failure) network.printLogs(); + + immutable expected_discovered_node_cnt = test_conf.full_nodes + genesis_validator_keys.length; + // Check if the crawling was succesfull, and all nodes in the network + // crawled all the other nodes (including themselves) successfully + auto check_for_success = () + { + foreach (i, client; network.clients.array()) + if (network.clients[i].getNetworkInfo().length != expected_discovered_node_cnt) + return false; + return true; + }; + + check_for_success().retryFor(10.seconds); +} + +unittest +{ + test_crawling(NetworkTopology.MinimallyConnected); + + // Giving enough time for proper shutdown of nodes + Thread.sleep(1.seconds); + + test_crawling(NetworkTopology.FullyConnected); +}