diff --git a/include/cloud/aws_cache.hpp b/include/cloud/aws_cache.hpp index 4ded03c..9345639 100644 --- a/include/cloud/aws_cache.hpp +++ b/include/cloud/aws_cache.hpp @@ -20,11 +20,11 @@ class AWSCache : public network::Cache { public: /// The constructor - explicit AWSCache(unsigned cacheEntries); + AWSCache(); /// The address resolving - virtual const addrinfo* resolve(std::string hostname, std::string port, bool& reuse) override; + virtual std::unique_ptr resolve(std::string hostname, unsigned port, bool tls) override; /// The destructor - virtual ~AWSCache() noexcept = default; + virtual ~AWSCache() = default; }; //--------------------------------------------------------------------------- }; // namespace cloud diff --git a/include/network/cache.hpp b/include/network/cache.hpp index 92e2950..dec4741 100644 --- a/include/network/cache.hpp +++ b/include/network/cache.hpp @@ -1,4 +1,7 @@ #pragma once +#include "network/tls_connection.hpp" +#include +#include #include #include #include @@ -21,29 +24,55 @@ class IOUringSocket; //--------------------------------------------------------------------------- /// The addr resolver and cacher, which is not thread safe class Cache { + public: + /// The dns entry + struct DnsEntry { + /// The addr + std::unique_ptr addr; + /// The cache priority + int cachePriority = 0; + + /// The constructor + DnsEntry(std::unique_ptr address, int cachePriority = 0) : addr(move(address)), cachePriority(cachePriority) {} + }; + + /// The fd socket entry + struct SocketEntry { + /// The DnsEntry + std::unique_ptr dns; + /// The optional tls connection + std::unique_ptr tls; + /// The fd + int32_t fd; + /// The port + unsigned port; + /// The hostname + std::string hostname; + + /// The constructor + SocketEntry(std::string hostname, unsigned port); + }; + + protected: - /// The addr info - std::vector> _addr; - /// The current addr string - std::vector> _addrString; - /// The ctr - unsigned _addrCtr; + /// The cache, uses hostname as key, multimap traversals same keys in the insertion order + std::multimap> _cache; + /// The queue + std::deque _queue; + /// The default priority + int _defaultPriority = 8; public: - /// The constructor - explicit Cache(unsigned entries); /// The address resolving - virtual const addrinfo* resolve(std::string hostname, std::string port, bool& oldAddress); - /// Reset the current cache bucket - void resetBucket() { _addrString[_addrCtr % _addrString.size()].second = 0; } + virtual std::unique_ptr resolve(std::string hostname, unsigned port, bool tls); /// Start the timing and advance to the next cache bucket - virtual void startSocket(int /*fd*/) { _addrCtr++; } - /// Stop the timing - virtual void stopSocket(int /*fd*/, uint64_t /*bytes*/) {} - /// Shutdown of the socket should clear the same addresses - virtual void shutdownSocket(int /*fd*/) {} + virtual void startSocket(int /*fd*/) {} + /// Stops the socket and either closes the connection or cashes it + virtual void stopSocket(std::unique_ptr socketEntry, uint64_t bytes, unsigned cachedEntries, bool reuseSocket); + /// Shutdown of the socket and clears the same addresses + virtual void shutdownSocket(std::unique_ptr socketEntry); /// The destructor - virtual ~Cache() noexcept = default; + virtual ~Cache(); /// Get the tld static std::string_view tld(std::string_view domain); diff --git a/include/network/connection_manager.hpp b/include/network/connection_manager.hpp index d65d7be..31aa9d8 100644 --- a/include/network/connection_manager.hpp +++ b/include/network/connection_manager.hpp @@ -1,7 +1,6 @@ #pragma once #include "network/cache.hpp" #include -#include #include #include #include @@ -20,6 +19,7 @@ namespace network { //--------------------------------------------------------------------------- class TLSConnection; class TLSContext; +struct DnsEntry; //--------------------------------------------------------------------------- // This class acts as the connection enabler, closer, and main // cache for sockets and their optional tls connection. @@ -63,30 +63,11 @@ class ConnectionManager { } }; - /// The fd socket entry - struct SocketEntry { - /// The optional tls connection - std::unique_ptr tls; - /// The fd - int32_t fd; - /// The port - unsigned port; - /// The hostname - std::string hostname; - - /// The constructor - SocketEntry(int32_t fd, std::string hostname, unsigned port, std::unique_ptr tls); - }; - private: /// The socket wrapper std::unique_ptr _socketWrapper; /// The active sockets - std::unordered_map> _fdSockets; - /// The fd socket cache, uses hostname as key - std::unordered_multimap> _fdCache; - /// The queue - std::deque _fdQueue; + std::unordered_map> _fdSockets; /// Cache std::unordered_map> _cache; /// The tls context @@ -99,14 +80,14 @@ class ConnectionManager { public: /// The constructor - explicit ConnectionManager(unsigned uringEntries, unsigned cacheEntries); + explicit ConnectionManager(unsigned uringEntries); /// The destructor ~ConnectionManager(); /// Creates a new socket connection - [[nodiscard]] int32_t connect(std::string hostname, uint32_t port, bool tls, const TCPSettings& tcpSettings, bool useCache = true, int retryLimit = 16); + [[nodiscard]] int32_t connect(std::string hostname, uint32_t port, bool tls, const TCPSettings& tcpSettings, int retryLimit = 16); /// Disconnects the socket - void disconnect(int32_t fd, std::string hostname = "", uint32_t port = 0, const TCPSettings* tcpSettings = nullptr, uint64_t bytes = 0, bool forceShutdown = false); + void disconnect(int32_t fd, const TCPSettings* tcpSettings = nullptr, uint64_t bytes = 0, bool forceShutdown = false); /// Add domain-specific cache void addCache(const std::string& hostname, std::unique_ptr cache); diff --git a/include/network/tasked_send_receiver.hpp b/include/network/tasked_send_receiver.hpp index 6759672..213d1e6 100644 --- a/include/network/tasked_send_receiver.hpp +++ b/include/network/tasked_send_receiver.hpp @@ -57,8 +57,6 @@ class TaskedSendReceiverGroup { uint64_t _chunkSize; /// The queue maximum for each TaskedSendReceiver unsigned _concurrentRequests; - /// The maximum dns cache size - unsigned _cacheEntries = 32; /// The TCP settings std::unique_ptr _tcpSettings; diff --git a/include/network/throughput_cache.hpp b/include/network/throughput_cache.hpp index 35e6493..0cee7ff 100644 --- a/include/network/throughput_cache.hpp +++ b/include/network/throughput_cache.hpp @@ -35,7 +35,7 @@ class ThroughputCache : public network::Cache { /// The seconds vector as stable ring buffer std::vector _throughput; /// The map between fd and start time - std::unordered_map> _fdMap; + std::unordered_map _fdMap; /// The seconds vector iterator uint64_t _throughputIterator; /// The maximum history @@ -43,17 +43,13 @@ class ThroughputCache : public network::Cache { public: /// The constructor - explicit ThroughputCache(unsigned cacheEntries); - /// The address resolving - virtual const addrinfo* resolve(std::string hostname, std::string port, bool& reuse) override; - /// Start the timing + explicit ThroughputCache(); + /// Start the timing and advance to the next cache bucket virtual void startSocket(int fd) override; - /// Stop the timing - virtual void stopSocket(int fd, uint64_t bytes) override; - /// Clears the used server from the cache - virtual void shutdownSocket(int fd) override; + /// Stops the socket and either closes the connection or cashes it + virtual void stopSocket(std::unique_ptr socketEntry, uint64_t bytes, unsigned cachedEntries, bool reuseSocket) override; /// The destructor - virtual ~ThroughputCache() noexcept = default; + virtual ~ThroughputCache() = default; }; //--------------------------------------------------------------------------- }; // namespace network diff --git a/include/utils/utils.hpp b/include/utils/utils.hpp index da60564..9126176 100644 --- a/include/utils/utils.hpp +++ b/include/utils/utils.hpp @@ -12,6 +12,12 @@ namespace anyblob { namespace utils { //--------------------------------------------------------------------------- +#ifndef NDEBUG +#define verify(expression) assert(expression) +#else +#define verify(expression) ((void) (expression)) +#endif +//--------------------------------------------------------------------------- /// Encode url special characters in %HEX std::string encodeUrlParameters(const std::string& encode); /// Encode everything from binary representation to hex diff --git a/src/cloud/aws.cpp b/src/cloud/aws.cpp index be2c167..079a274 100644 --- a/src/cloud/aws.cpp +++ b/src/cloud/aws.cpp @@ -5,6 +5,7 @@ #include "network/original_message.hpp" #include "network/tasked_send_receiver.hpp" #include "utils/data_vector.hpp" +#include "utils/utils.hpp" #include #include #include @@ -63,8 +64,8 @@ Provider::Instance AWS::getInstanceDetails(network::TaskedSendReceiverHandle& se info.port = getIAMPort(); HTTP http(info); auto originalMsg = make_unique(move(message), http); - assert(sendReceiverHandle.sendSync(originalMsg.get())); - assert(sendReceiverHandle.processSync()); + verify(sendReceiverHandle.sendSync(originalMsg.get())); + verify(sendReceiverHandle.processSync()); auto& content = originalMsg->result.getDataVector(); unique_ptr infoPtr; auto s = network::HttpHelper::retrieveContent(content.cdata(), content.size(), infoPtr); @@ -87,8 +88,8 @@ string AWS::getInstanceRegion(network::TaskedSendReceiverHandle& sendReceiverHan info.port = getIAMPort(); HTTP http(info); auto originalMsg = make_unique(move(message), http); - assert(sendReceiverHandle.sendSync(originalMsg.get())); - assert(sendReceiverHandle.processSync()); + verify(sendReceiverHandle.sendSync(originalMsg.get())); + verify(sendReceiverHandle.processSync()); auto& content = originalMsg->result.getDataVector(); unique_ptr infoPtr; auto s = network::HttpHelper::retrieveContent(content.cdata(), content.size(), infoPtr); @@ -243,16 +244,16 @@ void AWS::initSecret(network::TaskedSendReceiverHandle& sendReceiverHandle) info.port = getIAMPort(); HTTP http(info); auto originalMsg = make_unique(move(message), http); - assert(sendReceiverHandle.sendSync(originalMsg.get())); - assert(sendReceiverHandle.processSync()); + verify(sendReceiverHandle.sendSync(originalMsg.get())); + verify(sendReceiverHandle.processSync()); auto& content = originalMsg->result.getDataVector(); unique_ptr infoPtr; auto s = network::HttpHelper::retrieveContent(content.cdata(), content.size(), infoPtr); string iamUser; message = downloadSecret(s, iamUser); originalMsg = make_unique(move(message), http); - assert(sendReceiverHandle.sendSync(originalMsg.get())); - assert(sendReceiverHandle.processSync()); + verify(sendReceiverHandle.sendSync(originalMsg.get())); + verify(sendReceiverHandle.processSync()); auto& secretContent = originalMsg->result.getDataVector(); infoPtr.reset(); s = network::HttpHelper::retrieveContent(secretContent.cdata(), secretContent.size(), infoPtr); @@ -277,8 +278,8 @@ void AWS::initSecret(network::TaskedSendReceiverHandle& sendReceiverHandle) info.port = getPort(); HTTP http(info); auto originalMsg = make_unique(move(message), http); - assert(sendReceiverHandle.sendSync(originalMsg.get())); - assert(sendReceiverHandle.processSync()); + verify(sendReceiverHandle.sendSync(originalMsg.get())); + verify(sendReceiverHandle.processSync()); auto& secretContent = originalMsg->result.getDataVector(); unique_ptr infoPtr; auto s = network::HttpHelper::retrieveContent(secretContent.cdata(), secretContent.size(), infoPtr); @@ -309,7 +310,7 @@ void AWS::initCache(network::TaskedSendReceiverHandle& sendReceiverHandle) { assert(sendReceiverHandle.get()); if (_type == Provider::CloudService::AWS) { - sendReceiverHandle.get()->addCache("amazonaws.com", unique_ptr(new cloud::AWSCache(sendReceiverHandle.getGroup()->getConcurrentRequests()))); + sendReceiverHandle.get()->addCache("amazonaws.com", unique_ptr(new cloud::AWSCache())); } } //--------------------------------------------------------------------------- diff --git a/src/cloud/aws_cache.cpp b/src/cloud/aws_cache.cpp index fac151c..ba3fe83 100644 --- a/src/cloud/aws_cache.cpp +++ b/src/cloud/aws_cache.cpp @@ -18,59 +18,60 @@ namespace cloud { //--------------------------------------------------------------------------- using namespace std; //--------------------------------------------------------------------------- -AWSCache::AWSCache(unsigned entries) : Cache(entries), _mtuCache() +AWSCache::AWSCache() : Cache(), _mtuCache() // Constructor { } //--------------------------------------------------------------------------- -const addrinfo* AWSCache::resolve(string hostname, string port, bool& oldAddress) +unique_ptr AWSCache::resolve(string hostname, unsigned port, bool tls) // Resolve the request { - auto addrPos = _addrCtr % static_cast(_addrString.size()); - auto curCtr = _addrString[addrPos].second--; - auto hostString = hostname + ":" + port; - // Reuses old address - oldAddress = true; - if (_addrString[addrPos].first.compare(hostString) || curCtr == 0) { - struct addrinfo hints = {}; - memset(&hints, 0, sizeof hints); - hints.ai_family = AF_INET; - hints.ai_socktype = SOCK_STREAM; - hints.ai_protocol = IPPROTO_TCP; - - addrinfo* temp; - if (getaddrinfo(hostname.c_str(), port.c_str(), &hints, &temp)) { - throw runtime_error("hostname getaddrinfo error"); + for (auto it = _cache.find(hostname); it != _cache.end();) { + // loosely clean up the multimap cache + if (it->second->port == port && ((tls && it->second->tls.get()) || (!tls && !it->second->tls.get()))) { + auto socketEntry = move(it->second); + socketEntry->dns->cachePriority--; + _cache.erase(it); + return socketEntry; } - _addr[addrPos].reset(temp); - if (!Cache::tld(hostname).compare("amazonaws.com")) { - struct sockaddr_in* p = reinterpret_cast(_addr[addrPos]->ai_addr); - auto ipAsInt = p->sin_addr.s_addr; - auto it = _mtuCache.find(ipAsInt); - if (it != _mtuCache.end()) { - if (it->second) - _addrString[addrPos] = {move(hostString), numeric_limits::max()}; - else - _addrString[addrPos] = {move(hostString), 12}; + it++; + } + struct addrinfo hints = {}; + memset(&hints, 0, sizeof hints); + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_STREAM; + hints.ai_protocol = IPPROTO_TCP; + + addrinfo* temp; + char port_str[16] = {}; + sprintf(port_str, "%d", port); + if (getaddrinfo(hostname.c_str(), port_str, &hints, &temp) != 0) { + throw runtime_error("hostname getaddrinfo error"); + } + auto socketEntry = make_unique(hostname, port); + socketEntry->dns = make_unique(unique_ptr(temp, &freeaddrinfo), _defaultPriority); + + if (!Cache::tld(hostname).compare("amazonaws.com")) { + struct sockaddr_in* p = reinterpret_cast(socketEntry->dns->addr->ai_addr); + auto ipAsInt = p->sin_addr.s_addr; + auto it = _mtuCache.find(ipAsInt); + if (it != _mtuCache.end()) { + if (it->second) + socketEntry->dns->cachePriority = numeric_limits::max(); + } else { + char ipv4[INET_ADDRSTRLEN]; + inet_ntop(AF_INET, &p->sin_addr, ipv4, INET_ADDRSTRLEN); + string cmd = "timeout 0.01 ping -s 1473 -D " + string(ipv4) + " -c 1 >>/dev/null 2>>/dev/null"; + auto res = system(cmd.c_str()); + if (!res) { + _mtuCache.emplace(ipAsInt, true); + socketEntry->dns->cachePriority = numeric_limits::max(); } else { - char ipv4[INET_ADDRSTRLEN]; - inet_ntop(AF_INET, &p->sin_addr, ipv4, INET_ADDRSTRLEN); - string cmd = "timeout 0.01 ping -s 1473 -D " + string(ipv4) + " -c 1 >>/dev/null 2>>/dev/null"; - auto res = system(cmd.c_str()); - if (!res) { - _mtuCache.emplace(ipAsInt, true); - _addrString[addrPos] = {move(hostString), numeric_limits::max()}; - } else { - _mtuCache.emplace(ipAsInt, false); - _addrString[addrPos] = {move(hostString), 12}; - } + _mtuCache.emplace(ipAsInt, false); } - } else { - _addrString[addrPos] = {move(hostString), 12}; } - oldAddress = false; } - return _addr[addrPos].get(); + return socketEntry; } //--------------------------------------------------------------------------- }; // namespace cloud diff --git a/src/cloud/azure.cpp b/src/cloud/azure.cpp index 825f544..ae95146 100644 --- a/src/cloud/azure.cpp +++ b/src/cloud/azure.cpp @@ -6,6 +6,7 @@ #include "network/cache.hpp" #include "network/tasked_send_receiver.hpp" #include "utils/data_vector.hpp" +#include "utils/utils.hpp" #include #include #include @@ -57,8 +58,8 @@ Provider::Instance Azure::getInstanceDetails(network::TaskedSendReceiverHandle& info.port = getIAMPort(); HTTP http(info); auto originalMsg = make_unique(move(message), http); - assert(sendReceiverHandle.sendSync(originalMsg.get())); - assert(sendReceiverHandle.processSync()); + verify(sendReceiverHandle.sendSync(originalMsg.get())); + verify(sendReceiverHandle.processSync()); auto& content = originalMsg->result.getDataVector(); unique_ptr infoPtr; auto s = network::HttpHelper::retrieveContent(content.cdata(), content.size(), infoPtr); @@ -86,8 +87,8 @@ string Azure::getRegion(network::TaskedSendReceiverHandle& sendReceiverHandle) info.port = getIAMPort(); HTTP http(info); auto originalMsg = make_unique(move(message), http); - assert(sendReceiverHandle.sendSync(originalMsg.get())); - assert(sendReceiverHandle.processSync()); + verify(sendReceiverHandle.sendSync(originalMsg.get())); + verify(sendReceiverHandle.processSync()); auto& content = originalMsg->result.getDataVector(); unique_ptr infoPtr; auto s = network::HttpHelper::retrieveContent(content.cdata(), content.size(), infoPtr); diff --git a/src/cloud/gcp.cpp b/src/cloud/gcp.cpp index 1f7e48b..3c6cfa2 100644 --- a/src/cloud/gcp.cpp +++ b/src/cloud/gcp.cpp @@ -6,6 +6,7 @@ #include "network/cache.hpp" #include "network/tasked_send_receiver.hpp" #include "utils/data_vector.hpp" +#include "utils/utils.hpp" #include #include #include @@ -50,8 +51,8 @@ Provider::Instance GCP::getInstanceDetails(network::TaskedSendReceiverHandle& se info.port = getIAMPort(); HTTP http(info); auto originalMsg = make_unique(move(message), http); - assert(sendReceiverHandle.sendSync(originalMsg.get())); - assert(sendReceiverHandle.processSync()); + verify(sendReceiverHandle.sendSync(originalMsg.get())); + verify(sendReceiverHandle.processSync()); auto& content = originalMsg->result.getDataVector(); unique_ptr infoPtr; auto s = network::HttpHelper::retrieveContent(content.cdata(), content.size(), infoPtr); @@ -73,8 +74,8 @@ string GCP::getInstanceRegion(network::TaskedSendReceiverHandle& sendReceiverHan info.port = getIAMPort(); HTTP http(info); auto originalMsg = make_unique(move(message), http); - assert(sendReceiverHandle.sendSync(originalMsg.get())); - assert(sendReceiverHandle.processSync()); + verify(sendReceiverHandle.sendSync(originalMsg.get())); + verify(sendReceiverHandle.processSync()); auto& content = originalMsg->result.getDataVector(); unique_ptr infoPtr; auto s = network::HttpHelper::retrieveContent(content.cdata(), content.size(), infoPtr); diff --git a/src/network/cache.cpp b/src/network/cache.cpp index e8f3ce8..a482b7f 100644 --- a/src/network/cache.cpp +++ b/src/network/cache.cpp @@ -4,6 +4,7 @@ #include #include #include +#include //--------------------------------------------------------------------------- // AnyBlob - Universal Cloud Object Storage Library // Dominik Durner, 2021 @@ -31,40 +32,95 @@ string_view Cache::tld(string_view domain) return string_view(); } //--------------------------------------------------------------------------- -Cache::Cache(unsigned entries) -// Constructor +Cache::SocketEntry::SocketEntry(string hostname, unsigned port) : dns(nullptr), tls(nullptr), fd(-1), port(port), hostname(move(hostname)) +// The constructor +{} +//--------------------------------------------------------------------------- +void Cache::shutdownSocket(unique_ptr socketEntry) +// Shutdown the socket and dns cache { - _addr.reserve(entries); - for (auto i = 0u; i < entries; i++) { - _addr.emplace_back(nullptr, &freeaddrinfo); + // delete all occurences of the cached ip in queue + if (socketEntry->hostname.length() > 0) { + for (auto it = _cache.find(socketEntry->hostname); it != _cache.end();) { + if (!strncmp(socketEntry->dns->addr->ai_addr->sa_data, it->second->dns->addr->ai_addr->sa_data, 14)) { + it->second->dns->cachePriority = 0; + stopSocket(move(it->second), 0, 0, false); + it = _cache.erase(it); + } else { + it++; + } + } + } + stopSocket(move(socketEntry), 0, 0, false); +} +//--------------------------------------------------------------------------- +void Cache::stopSocket(unique_ptr socketEntry, uint64_t /*bytes*/, unsigned /*cacheEntries*/, bool reuseSocket) +// Stops the socket and either closes the connection or cashes it +{ + if (reuseSocket && socketEntry->hostname.length() > 0 && socketEntry->port) { + if (socketEntry->dns->cachePriority > 0) { + //_queue.push_back(socketEntry.get()); + //while (_queue.size() > cacheEntries) { + // auto* socketEntryPtr = _queue.front(); + // close(socketEntryPtr->fd); + // socketEntryPtr->fd = -1; + // _queue.pop_front(); + //} + _cache.emplace(socketEntry->hostname, move(socketEntry)); + } else { + shutdown(socketEntry->fd, SHUT_RDWR); + close(socketEntry->fd); + } + } else { + if (socketEntry->fd >= 0) { + shutdown(socketEntry->fd, SHUT_RDWR); + close(socketEntry->fd); + } + socketEntry->fd = -1; + if (socketEntry->dns->cachePriority > 0) + _cache.emplace(socketEntry->hostname, move(socketEntry)); } - _addrString.resize(entries, {"", 0}); - _addrCtr = 0; } //--------------------------------------------------------------------------- -const addrinfo* Cache::resolve(string hostname, string port, bool& oldAddress) +unique_ptr Cache::resolve(string hostname, unsigned port, bool tls) // Resolve the request { - auto addrPos = _addrCtr % static_cast(_addrString.size()); - auto curCtr = _addrString[addrPos].second--; - auto hostString = hostname + ":" + port; - oldAddress = true; - if (_addrString[addrPos].first.compare(hostString) || curCtr == 0) { - struct addrinfo hints = {}; - memset(&hints, 0, sizeof hints); - hints.ai_family = AF_INET; - hints.ai_socktype = SOCK_STREAM; - hints.ai_protocol = IPPROTO_TCP; + for (auto it = _cache.find(hostname); it != _cache.end();) { + // loosely clean up the multimap cache + if (it->second->port == port && ((tls && it->second->tls.get()) || (!tls && !it->second->tls.get()))) { + auto socketEntry = move(it->second); + socketEntry->dns->cachePriority--; + _cache.erase(it); + return socketEntry; + } + it++; + } + struct addrinfo hints = {}; + memset(&hints, 0, sizeof hints); + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_STREAM; + hints.ai_protocol = IPPROTO_TCP; - addrinfo* temp; - if (getaddrinfo(hostname.c_str(), port.c_str(), &hints, &temp) != 0) { - throw runtime_error("hostname getaddrinfo error"); + addrinfo* temp; + char port_str[16] = {}; + sprintf(port_str, "%d", port); + if (getaddrinfo(hostname.c_str(), port_str, &hints, &temp) != 0) { + throw runtime_error("hostname getaddrinfo error"); + } + auto socketEntry = make_unique(hostname, port); + socketEntry->dns = make_unique(unique_ptr(temp, &freeaddrinfo), _defaultPriority); + return socketEntry; +} +//--------------------------------------------------------------------------- +Cache::~Cache() +// The destructor +{ + for (auto& f : _cache) { + if (f.second->fd >= 0) { + shutdown(f.second->fd, SHUT_RDWR); + close(f.second->fd); } - _addr[addrPos].reset(temp); - _addrString[addrPos] = {move(hostString), 12}; - oldAddress = false; } - return _addr[addrPos].get(); } //--------------------------------------------------------------------------- }; // namespace network diff --git a/src/network/connection_manager.cpp b/src/network/connection_manager.cpp index eb97d3d..18b396e 100644 --- a/src/network/connection_manager.cpp +++ b/src/network/connection_manager.cpp @@ -1,4 +1,5 @@ #include "network/connection_manager.hpp" +#include "network/cache.hpp" #include "network/io_uring_socket.hpp" #include "network/tls_connection.hpp" #include "network/tls_context.hpp" @@ -9,8 +10,8 @@ #include #include #include -#include #include +#include #include #include #include @@ -31,48 +32,24 @@ namespace network { using namespace std; std::atomic ConnectionManager::_activeConnectionManagers{0}; //--------------------------------------------------------------------------- -ConnectionManager::SocketEntry::SocketEntry(int32_t fd, std::string hostname, unsigned port, std::unique_ptr tls) : tls(move(tls)), fd(fd), port(port), hostname(move(hostname)) -// The constructor -{} -//--------------------------------------------------------------------------- -ConnectionManager::ConnectionManager(unsigned uringEntries, unsigned cacheEntries) : _socketWrapper(make_unique(uringEntries)), _cache() +ConnectionManager::ConnectionManager(unsigned uringEntries) : _socketWrapper(make_unique(uringEntries)), _cache() // The constructor { _activeConnectionManagers++; _context = make_unique(); #ifndef ANYBLOB_LIBCXX_COMPAT // By default, use the ThroughputCache. - _cache.emplace("", make_unique(cacheEntries)); + _cache.emplace("", make_unique()); #else // If we build in libcxx compatability mode, we need to use the default cache. // The ThroughputCache currently does not build with libcxx. - _cache.emplace("", make_unique(cacheEntries)); + _cache.emplace("", make_unique()); #endif } //--------------------------------------------------------------------------- -int32_t ConnectionManager::connect(string hostname, uint32_t port, bool tls, const TCPSettings& tcpSettings, bool useCache, int retryLimit) +int32_t ConnectionManager::connect(string hostname, uint32_t port, bool tls, const TCPSettings& tcpSettings, int retryLimit) // Creates a new socket connection { - if (useCache) { - for (auto it = _fdCache.find(hostname); it != _fdCache.end();) { - // loosely clean up the multimap cache - if (it->second->fd < 0) { - it = _fdCache.erase(it); - continue; - } - if (it->second->port == port && ((tls && it->second->tls.get()) || (!tls && !it->second->tls.get()))) { - auto fd = it->second->fd; - _fdSockets.emplace(fd, move(it->second)); - _fdCache.erase(it); - return fd; - } - it++; - } - } - - char port_str[16] = {}; - sprintf(port_str, "%d", port); - Cache* resCache; auto tldName = string(Cache::tld(hostname)); auto it = _cache.find(tldName); @@ -83,77 +60,91 @@ int32_t ConnectionManager::connect(string hostname, uint32_t port, bool tls, con } // Did we use - bool oldAddress = false; - const auto* addr = resCache->resolve(hostname, port_str, oldAddress); + auto socketEntry = resCache->resolve(hostname, port, tls); + + auto fd = socketEntry->fd; + if (fd >= 0) { + _fdSockets.emplace(socketEntry->fd, move(socketEntry)); + return fd; + } // Build socket - auto fd = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol); - if (fd == -1) { + socketEntry->fd = socket(socketEntry->dns->addr->ai_family, socketEntry->dns->addr->ai_socktype, socketEntry->dns->addr->ai_protocol); + if (socketEntry->fd == -1) { throw runtime_error("Socket creation error!" + string(strerror(errno))); } // Settings for socket // No blocking mode if (tcpSettings.nonBlocking > 0) { - int flags = fcntl(fd, F_GETFL, 0); + int flags = fcntl(socketEntry->fd, F_GETFL, 0); flags |= O_NONBLOCK; - if (fcntl(fd, F_SETFL, flags) < 0) { + if (fcntl(socketEntry->fd, F_SETFL, flags) < 0) { + resCache->shutdownSocket(move(socketEntry)); throw runtime_error("Socket creation error! - non blocking error"); } } // Keep Alive if (tcpSettings.keepAlive > 0) { - if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &tcpSettings.keepAlive, sizeof(tcpSettings.keepAlive))) { + if (setsockopt(socketEntry->fd, SOL_SOCKET, SO_KEEPALIVE, &tcpSettings.keepAlive, sizeof(tcpSettings.keepAlive))) { + resCache->shutdownSocket(move(socketEntry)); throw runtime_error("Socket creation error! - keep alive error"); } } // Keep Idle if (tcpSettings.keepIdle > 0) { - if (setsockopt(fd, SOL_TCP, TCP_KEEPIDLE, &tcpSettings.keepIdle, sizeof(tcpSettings.keepIdle))) { + if (setsockopt(socketEntry->fd, SOL_TCP, TCP_KEEPIDLE, &tcpSettings.keepIdle, sizeof(tcpSettings.keepIdle))) { + resCache->shutdownSocket(move(socketEntry)); throw runtime_error("Socket creation error! - keep idle error"); } } // Keep intvl if (tcpSettings.keepIntvl > 0) { - if (setsockopt(fd, SOL_TCP, TCP_KEEPINTVL, &tcpSettings.keepIntvl, sizeof(tcpSettings.keepIntvl))) { + if (setsockopt(socketEntry->fd, SOL_TCP, TCP_KEEPINTVL, &tcpSettings.keepIntvl, sizeof(tcpSettings.keepIntvl))) { + resCache->shutdownSocket(move(socketEntry)); throw runtime_error("Socket creation error! - keep intvl error"); } } // Keep cnt if (tcpSettings.keepCnt > 0) { - if (setsockopt(fd, SOL_TCP, TCP_KEEPCNT, &tcpSettings.keepCnt, sizeof(tcpSettings.keepCnt))) { + if (setsockopt(socketEntry->fd, SOL_TCP, TCP_KEEPCNT, &tcpSettings.keepCnt, sizeof(tcpSettings.keepCnt))) { + resCache->shutdownSocket(move(socketEntry)); throw runtime_error("Socket creation error! - keep cnt error"); } } // No Delay if (tcpSettings.noDelay > 0) { - if (setsockopt(fd, SOL_TCP, TCP_NODELAY, &tcpSettings.noDelay, sizeof(tcpSettings.noDelay))) { + if (setsockopt(socketEntry->fd, SOL_TCP, TCP_NODELAY, &tcpSettings.noDelay, sizeof(tcpSettings.noDelay))) { + resCache->shutdownSocket(move(socketEntry)); throw runtime_error("Socket creation error! - nodelay error"); } } // Reuse ports if (tcpSettings.reusePorts > 0) { - if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &tcpSettings.reusePorts, sizeof(tcpSettings.reusePorts))) { + if (setsockopt(socketEntry->fd, SOL_SOCKET, SO_REUSEPORT, &tcpSettings.reusePorts, sizeof(tcpSettings.reusePorts))) { + resCache->shutdownSocket(move(socketEntry)); throw runtime_error("Socket creation error! - reuse port error"); } } // Recv buffer if (tcpSettings.recvBuffer > 0) { - if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &tcpSettings.recvBuffer, sizeof(tcpSettings.recvBuffer))) { + if (setsockopt(socketEntry->fd, SOL_SOCKET, SO_RCVBUF, &tcpSettings.recvBuffer, sizeof(tcpSettings.recvBuffer))) { + resCache->shutdownSocket(move(socketEntry)); throw runtime_error("Socket creation error! - recvbuf error"); } } // Lingering of sockets if (tcpSettings.linger > 0) { - if (setsockopt(fd, SOL_TCP, TCP_LINGER2, &tcpSettings.linger, sizeof(tcpSettings.linger))) { + if (setsockopt(socketEntry->fd, SOL_TCP, TCP_LINGER2, &tcpSettings.linger, sizeof(tcpSettings.linger))) { + resCache->shutdownSocket(move(socketEntry)); throw runtime_error("Socket creation error - linger timeout error!" + string(strerror(errno))); } } @@ -166,7 +157,8 @@ int32_t ConnectionManager::connect(string hostname, uint32_t port, bool tls, con windowShift++; } - if (setsockopt(inSock, SOL_TCP, TCP_WINSHIFT, &windowShift, sizeof(windowShift))) { + if (setsockopt(socketEntry->fd, SOL_TCP, TCP_WINSHIFT, &windowShift, sizeof(windowShift))) { + resCache->shutdownSocket(move(socketEntry)); throw runtime_error("Socket creation error - win shift error!"); } } @@ -176,7 +168,8 @@ int32_t ConnectionManager::connect(string hostname, uint32_t port, bool tls, con #ifdef TCP_RFC1323 if (tcpSettings.rfc1323 && tcpSettings.recvBuffer >= (64 << 10)) { int on = 1; - if (setsockopt(fd, SOL_TCP, TCP_RFC1323, &on, sizeof(on))) { + if (setsockopt(socketEntry->fd, SOL_TCP, TCP_RFC1323, &on, sizeof(on))) { + resCache->shutdownSocket(move(socketEntry)); throw runtime_error("Socket creation error - rfc 1323 error!"); } } @@ -185,57 +178,60 @@ int32_t ConnectionManager::connect(string hostname, uint32_t port, bool tls, con // Max segment size #ifdef TCP_MAXSEG if (tcpSettings.mss > 0) { - if (setsockopt(fd, SOL_TCP, TCP_MAXSEG, &tcpSettings.mss, sizeof(tcpSettings.mss))) { + if (setsockopt(socketEntry->fd, SOL_TCP, TCP_MAXSEG, &tcpSettings.mss, sizeof(tcpSettings.mss))) { + resCache->shutdownSocket(move(socketEntry)); throw runtime_error("Socket creation error - max segment error!"); } } #endif - auto setTimeOut = [](int fd, const TCPSettings& tcpSettings) { + auto setTimeOut = [&resCache, &socketEntry](int fd, const TCPSettings& tcpSettings) { // Set timeout if (tcpSettings.timeout > 0) { struct timeval tv; tv.tv_sec = tcpSettings.timeout / (1000 * 1000); tv.tv_usec = tcpSettings.timeout % (1000 * 1000); if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, reinterpret_cast(&tv), sizeof tv)) { + resCache->shutdownSocket(move(socketEntry)); throw runtime_error("Socket creation error - recv timeout error!"); } if (setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, reinterpret_cast(&tv), sizeof tv)) { + resCache->shutdownSocket(move(socketEntry)); throw runtime_error("Socket creation error - send timeout error!"); } int timeoutInMs = tcpSettings.timeout / 1000; if (setsockopt(fd, SOL_TCP, TCP_USER_TIMEOUT, &timeoutInMs, sizeof(timeoutInMs))) { + resCache->shutdownSocket(move(socketEntry)); throw runtime_error("Socket creation error - tcp timeout error!" + string(strerror(errno))); } } }; // Connect to remote - auto connectRes = ::connect(fd, addr->ai_addr, addr->ai_addrlen); + auto connectRes = ::connect(socketEntry->fd, socketEntry->dns->addr->ai_addr, socketEntry->dns->addr->ai_addrlen); if (connectRes < 0 && errno != EINPROGRESS) { - close(fd); - if (oldAddress && retryLimit > 0) { - // Retry with a new address - resCache->resetBucket(); - return connect(hostname, port, tls, tcpSettings, useCache, retryLimit - 1); + resCache->shutdownSocket(move(socketEntry)); + if (retryLimit > 0) { + return connect(hostname, port, tls, tcpSettings, retryLimit - 1); } else { throw runtime_error("Socket creation error! " + string(strerror(errno))); } } - resCache->startSocket(fd); + resCache->startSocket(socketEntry->fd); - auto emplaceSocket = [this, &hostname, port, tls](int32_t fd) { + auto emplaceSocket = [this, &socketEntry, tls]() { + auto fd = socketEntry->fd; if (tls) - _fdSockets.emplace(fd, make_unique(fd, hostname, port, make_unique(*_context))); - else - _fdSockets.emplace(fd, make_unique(fd, hostname, port, nullptr)); + socketEntry->tls = make_unique(*_context); + _fdSockets.emplace(fd, move(socketEntry)); + return fd; }; if (connectRes < 0 && errno == EINPROGRESS) { // connection check struct pollfd pollEvent; - pollEvent.fd = fd; + pollEvent.fd = socketEntry->fd; pollEvent.events = POLLIN | POLLOUT; // connection check @@ -243,67 +239,53 @@ int32_t ConnectionManager::connect(string hostname, uint32_t port, bool tls, con if (t == 1) { int socketError; socklen_t socketErrorLen = sizeof(socketError); - if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &socketError, &socketErrorLen)) { + if (getsockopt(socketEntry->fd, SOL_SOCKET, SO_ERROR, &socketError, &socketErrorLen)) { + resCache->shutdownSocket(move(socketEntry)); throw runtime_error("Socket creation error! Could not retrieve socket options!"); } if (!socketError) { // sucessful - setTimeOut(fd, tcpSettings); - emplaceSocket(fd); - return fd; + setTimeOut(socketEntry->fd, tcpSettings); + return emplaceSocket(); } else { - close(fd); + resCache->shutdownSocket(move(socketEntry)); throw runtime_error("Socket creation error! " + string(strerror(socketError))); } } else { // Reached timeout - close(fd); + resCache->shutdownSocket(move(socketEntry)); if (retryLimit > 0) { - return connect(hostname, port, tls, tcpSettings, useCache, retryLimit - 1); + return connect(hostname, port, tls, tcpSettings, retryLimit - 1); } else { throw runtime_error("Socket creation error! Timeout reached"); } } } - setTimeOut(fd, tcpSettings); - emplaceSocket(fd); - return fd; + setTimeOut(socketEntry->fd, tcpSettings); + return emplaceSocket(); } //--------------------------------------------------------------------------- -void ConnectionManager::disconnect(int32_t fd, string hostname, uint32_t port, const TCPSettings* tcpSettings, uint64_t bytes, bool forceShutdown) +void ConnectionManager::disconnect(int32_t fd, const TCPSettings* tcpSettings, uint64_t bytes, bool forceShutdown) // Disconnects the socket { + auto socketIt = _fdSockets.find(fd); + assert(socketIt != _fdSockets.end()); Cache* resCache; - auto tldName = string(Cache::tld(hostname)); + auto tldName = string(Cache::tld(socketIt->second->hostname)); auto it = _cache.find(tldName); if (it != _cache.end()) { resCache = it->second.get(); } else { resCache = _cache.find("")->second.get(); } - resCache->stopSocket(fd, bytes); if (forceShutdown) { - shutdown(fd, SHUT_RDWR); - resCache->shutdownSocket(fd); - _fdSockets.erase(fd); - } else if (tcpSettings && tcpSettings->reuse > 0 && hostname.length() > 0 && port) { - auto it = _fdSockets.find(fd); - assert(it != _fdSockets.end()); - auto* socketEntry = it->second.get(); - _fdQueue.push_back(socketEntry); - auto cacheEntries = (_maxCachedFds / _activeConnectionManagers) + 1; - while (_fdQueue.size() > cacheEntries) { - socketEntry = _fdQueue.front(); - close(socketEntry->fd); - socketEntry->fd = -1; - _fdQueue.pop_front(); - } - _fdCache.emplace(hostname, move(it->second)); - _fdSockets.erase(it); - } else { - _fdSockets.erase(fd); - close(fd); + resCache->shutdownSocket(move(socketIt->second)); + _fdSockets.erase(socketIt); + } + else { + resCache->stopSocket(move(socketIt->second), bytes, (_maxCachedFds / _activeConnectionManagers) + 1, tcpSettings ? tcpSettings->reuse : false); + _fdSockets.erase(socketIt); } } //--------------------------------------------------------------------------- @@ -336,11 +318,10 @@ TLSConnection* ConnectionManager::getTLSConnection(int32_t fd) ConnectionManager::~ConnectionManager() // The destructor { - for (auto& f : _fdSockets) + for (auto& f : _fdSockets) { + shutdown(f.first, SHUT_RDWR); close(f.first); - for (auto& f : _fdCache) - if (f.second->fd >= 0) - close(f.second->fd); + } _activeConnectionManagers--; } //--------------------------------------------------------------------------- diff --git a/src/network/http_message.cpp b/src/network/http_message.cpp index 1c52b82..d29646b 100644 --- a/src/network/http_message.cpp +++ b/src/network/http_message.cpp @@ -34,6 +34,8 @@ MessageState HTTPMessage::execute(ConnectionManager& connectionManager) try { fd = connectionManager.connect(originalMessage->provider.getAddress(), originalMessage->provider.getPort(), false, tcpSettings); } catch (exception& /*e*/) { + if (request) + request->fd = -1; originalMessage->result.failureCode |= static_cast(MessageFailureCode::Socket); reset(connectionManager, failures++ > failuresMax); return execute(connectionManager); @@ -97,7 +99,7 @@ MessageState HTTPMessage::execute(ConnectionManager& connectionManager) try { // check whether finished http if (HttpHelper::finished(receive.data(), static_cast(receiveBufferOffset), info)) { - connectionManager.disconnect(request->fd, originalMessage->provider.getAddress(), originalMessage->provider.getPort(), &tcpSettings, static_cast(sendBufferOffset + receiveBufferOffset)); + connectionManager.disconnect(request->fd, &tcpSettings, static_cast(sendBufferOffset + receiveBufferOffset)); originalMessage->result.response = move(info); if (HttpResponse::checkSuccess(originalMessage->result.response->response.code)) { state = MessageState::Finished; @@ -158,8 +160,8 @@ void HTTPMessage::reset(ConnectionManager& connectionManager, bool aborted) if ((originalMessage->result.failureCode & static_cast(MessageFailureCode::HTTP)) && originalMessage->provider.supportsResigning()) { originalMessage->message = originalMessage->provider.resignRequest(*originalMessage->message, originalMessage->putData, originalMessage->putLength); } - if (request) - connectionManager.disconnect(request->fd, originalMessage->provider.getAddress(), originalMessage->provider.getPort(), &tcpSettings, 0, true); + if (request && request->fd >= 0) + connectionManager.disconnect(request->fd, &tcpSettings, 0, true); } //--------------------------------------------------------------------------- } // namespace network diff --git a/src/network/https_message.cpp b/src/network/https_message.cpp index 3354253..d8186c4 100644 --- a/src/network/https_message.cpp +++ b/src/network/https_message.cpp @@ -39,6 +39,8 @@ MessageState HTTPSMessage::execute(ConnectionManager& connectionManager) try { fd = connectionManager.connect(originalMessage->provider.getAddress(), originalMessage->provider.getPort(), true, tcpSettings); } catch (exception& /*e*/) { + if (request) + request->fd = -1; originalMessage->result.failureCode |= static_cast(MessageFailureCode::Socket); reset(connectionManager, failures++ > failuresMax); return execute(connectionManager); @@ -114,7 +116,7 @@ MessageState HTTPSMessage::execute(ConnectionManager& connectionManager) // Decide if to cache the session if (tcpSettings.reuse) { state = MessageState::Finished; - connectionManager.disconnect(request->fd, originalMessage->provider.getAddress(), originalMessage->provider.getPort(), &tcpSettings, static_cast(sendBufferOffset + receiveBufferOffset)); + connectionManager.disconnect(request->fd, &tcpSettings, static_cast(sendBufferOffset + receiveBufferOffset)); return state; } else { state = MessageState::TLSShutdown; @@ -144,7 +146,7 @@ MessageState HTTPSMessage::execute(ConnectionManager& connectionManager) auto status = tlsLayer->shutdown(connectionManager); // The request was successful even if the shutdown fails if (status == TLSConnection::Progress::Finished || status == TLSConnection::Progress::Aborted) { - connectionManager.disconnect(request->fd, originalMessage->provider.getAddress(), originalMessage->provider.getPort(), &tcpSettings, static_cast(sendBufferOffset + receiveBufferOffset)); + connectionManager.disconnect(request->fd, &tcpSettings, static_cast(sendBufferOffset + receiveBufferOffset)); state = MessageState::Finished; return MessageState::Finished; } else { diff --git a/src/network/tasked_send_receiver.cpp b/src/network/tasked_send_receiver.cpp index 8e65007..b1210e1 100644 --- a/src/network/tasked_send_receiver.cpp +++ b/src/network/tasked_send_receiver.cpp @@ -6,6 +6,7 @@ #include "network/tls_context.hpp" #include "utils/data_vector.hpp" #include "utils/timer.hpp" +#include "utils/utils.hpp" #include #include #include @@ -18,12 +19,6 @@ // If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/. // SPDX-License-Identifier: MPL-2.0 //--------------------------------------------------------------------------- -#ifndef NDEBUG -#define verify(expression) assert(expression) -#else -#define verify(expression) ((void) (expression)) -#endif -//--------------------------------------------------------------------------- namespace anyblob { namespace network { //--------------------------------------------------------------------------- @@ -138,7 +133,7 @@ bool TaskedSendReceiverHandle::sendReceive(bool local, bool oneQueueInvocation) return true; } //--------------------------------------------------------------------------- -TaskedSendReceiver::TaskedSendReceiver(TaskedSendReceiverGroup& group) : _group(group), _submissions(), _next(nullptr), _connectionManager(make_unique(group._concurrentRequests << 2, group._cacheEntries)), _messageTasks(), _timings(nullptr), _stopDeamon(false) +TaskedSendReceiver::TaskedSendReceiver(TaskedSendReceiverGroup& group) : _group(group), _submissions(), _next(nullptr), _connectionManager(make_unique(group._concurrentRequests << 2)), _messageTasks(), _timings(nullptr), _stopDeamon(false) // The constructor { } diff --git a/src/network/throughput_cache.cpp b/src/network/throughput_cache.cpp index d896895..7f247c5 100644 --- a/src/network/throughput_cache.cpp +++ b/src/network/throughput_cache.cpp @@ -1,4 +1,4 @@ -#ifndef ANYBLOB_LIBCXX_COMPAT +#ifndef ANYBLOB_LIBCXX_COMPAT #include "network/throughput_cache.hpp" #include #include @@ -19,65 +19,26 @@ namespace network { //--------------------------------------------------------------------------- using namespace std; //--------------------------------------------------------------------------- -ThroughputCache::ThroughputCache(unsigned entries) : Cache(entries), _throughputTree(), _throughput(), _throughputIterator() +ThroughputCache::ThroughputCache() : Cache(), _throughputTree(), _throughput(), _throughputIterator() // Constructor { + _defaultPriority = 2; _throughput.resize(_maxHistory); } //--------------------------------------------------------------------------- -const addrinfo* ThroughputCache::resolve(string hostname, string port, bool& reuse) -// Resolve the request -{ - auto addrPos = _addrCtr % _addrString.size(); - auto curCtr = _addrString[addrPos].second--; - auto hostString = hostname + ":" + port; - if (_addrString[addrPos].first.compare(hostString) || curCtr == 0) { - struct addrinfo hints = {}; - memset(&hints, 0, sizeof hints); - hints.ai_family = AF_INET; - hints.ai_socktype = SOCK_STREAM; - hints.ai_protocol = IPPROTO_TCP; - - addrinfo* temp; - if (getaddrinfo(hostname.c_str(), port.c_str(), &hints, &temp)) { - throw runtime_error("hostname getaddrinfo error"); - } - _addr[addrPos].reset(temp); - _addrString[addrPos] = {hostString, 2}; - reuse = false; - } - reuse = true; - return _addr[addrPos].get(); -} -//--------------------------------------------------------------------------- void ThroughputCache::startSocket(int fd) // Start a socket { - _fdMap.emplace(fd, make_pair(_addrCtr++ % _addrString.size(), chrono::steady_clock::now())); -} -//--------------------------------------------------------------------------- -void ThroughputCache::shutdownSocket(int fd) -// Shutdown a socket and clear the dns cache -{ - auto it = _fdMap.find(fd); - if (it != _fdMap.end()) { - auto pos = it->second.first; - auto& ip = _addr[pos]; - for (auto compPos = 0u; compPos < _addr.size(); compPos++) { - if (_addr[compPos] && !strncmp(ip->ai_addr->sa_data, _addr[compPos]->ai_addr->sa_data, 14)) { - _addrString[compPos].second = 0; - } - } - } + _fdMap.emplace(fd, chrono::steady_clock::now()); } //--------------------------------------------------------------------------- -void ThroughputCache::stopSocket(int fd, uint64_t bytes) +void ThroughputCache::stopSocket(std::unique_ptr socketEntry, uint64_t bytes, unsigned cachedEntries, bool reuseSocket) // Stop a socket { auto now = chrono::steady_clock::now(); - auto it = _fdMap.find(fd); + auto it = _fdMap.find(socketEntry->fd); if (it != _fdMap.end()) { - auto timeNs = chrono::duration_cast(now - it->second.second).count(); + auto timeNs = chrono::duration_cast(now - it->second).count(); auto throughput = static_cast(bytes) / chrono::duration(timeNs).count(); auto curThroughput = _throughputIterator++; auto del = _throughput[curThroughput % _maxHistory]; @@ -89,16 +50,17 @@ void ThroughputCache::stopSocket(int fd, uint64_t bytes) if (maxElem > 3) { auto percentile = _throughputTree.find_by_order(maxElem / 3); if (percentile.m_p_nd->m_value <= throughput) - _addrString[it->second.first].second += 1; + socketEntry->dns->cachePriority += 1; else possible = false; } if (possible && maxElem > 6) { auto percentile = _throughputTree.find_by_order(maxElem / 6); if (percentile.m_p_nd->m_value <= throughput) - _addrString[it->second.first].second += 2; + socketEntry->dns->cachePriority += 2; } } + Cache::stopSocket(move(socketEntry), bytes, cachedEntries, reuseSocket); } //--------------------------------------------------------------------------- }; // namespace network diff --git a/test/unit/network/io_uring_socket_test.cpp b/test/unit/network/io_uring_socket_test.cpp index a875638..955de12 100644 --- a/test/unit/network/io_uring_socket_test.cpp +++ b/test/unit/network/io_uring_socket_test.cpp @@ -16,7 +16,7 @@ namespace test { //--------------------------------------------------------------------------- TEST_CASE("io_uring_socket") { PerfEventBlock e; - ConnectionManager io (10, 10); + ConnectionManager io (10); ConnectionManager::TCPSettings tcpSettings; REQUIRE(io.connect("db.in.tum.de", 80, false, tcpSettings) > 0); }