From 9ad54148d368914a219dbe27115d72687d402b62 Mon Sep 17 00:00:00 2001 From: Dominik Durner Date: Fri, 29 Mar 2024 14:50:23 +0100 Subject: [PATCH] Restrict number of cached elements --- example/benchmark/scripts/create.sh | 5 ++-- include/network/cache.hpp | 8 ++++- src/cloud/aws_cache.cpp | 6 ++++ src/network/cache.cpp | 26 ++++++++++++---- src/network/connection_manager.cpp | 46 +++++++++++++++-------------- 5 files changed, 61 insertions(+), 30 deletions(-) diff --git a/example/benchmark/scripts/create.sh b/example/benchmark/scripts/create.sh index 5b44b24..8f91a3d 100755 --- a/example/benchmark/scripts/create.sh +++ b/example/benchmark/scripts/create.sh @@ -14,19 +14,20 @@ fi aws s3api create-bucket --bucket $CONTAINER --region $REGION --create-bucket-configuration LocationConstraint=$REGION #------------------------------------------------------ mkdir data +mkdir data/enc cd data for size in ${SIZES[@]} do for n in {1..8192} do openssl rand -out ${n}.bin $size - openssl enc -aes-256-cbc -K "3031323334353637383930313233343536373839303132333435363738393031" -iv "30313233343536373839303132333435" -in ${n}.bin -out ${n}.enc.bin + openssl enc -aes-256-cbc -K "3031323334353637383930313233343536373839303132333435363738393031" -iv "30313233343536373839303132333435" -in ${n}.bin -out enc/${n}.bin done aws s3 sync ./ s3://${CONTAINER}/${size}/ for n in {1..8192} do rm ${n}.bin - rm ${n}.enc.bin + rm enc/${n}.bin done done #------------------------------------------------------ diff --git a/include/network/cache.hpp b/include/network/cache.hpp index b9eafe7..05e33d5 100644 --- a/include/network/cache.hpp +++ b/include/network/cache.hpp @@ -42,6 +42,8 @@ class Cache { std::unique_ptr dns; /// The optional tls connection std::unique_ptr tls; + /// The timestamp + size_t timestamp; /// The fd int32_t fd; /// The port @@ -57,6 +59,10 @@ class Cache { protected: /// The cache, uses hostname as key, multimap traversals same keys in the insertion order std::multimap> _cache; + /// The fifo deletion of sockets to help reduce open fds (ulimit -n issue) + std::map _fifo; + /// The timestamp counter for deletion + size_t _timestamp = 0; /// The default priority int _defaultPriority = 8; @@ -68,7 +74,7 @@ class Cache { /// Stops the socket and either closes the connection or cashes it virtual void stopSocket(std::unique_ptr socketEntry, uint64_t bytes, unsigned cachedEntries, bool reuseSocket); /// Shutdown of the socket and clears the same addresses - virtual void shutdownSocket(std::unique_ptr socketEntry); + virtual void shutdownSocket(std::unique_ptr socketEntry, unsigned cachedEntries); /// The destructor virtual ~Cache(); diff --git a/src/cloud/aws_cache.cpp b/src/cloud/aws_cache.cpp index ba3fe83..4049915 100644 --- a/src/cloud/aws_cache.cpp +++ b/src/cloud/aws_cache.cpp @@ -28,9 +28,15 @@ unique_ptr AWSCache::resolve(string hostname, unsig { for (auto it = _cache.find(hostname); it != _cache.end();) { // loosely clean up the multimap cache + if (!it->second->dns) { + _fifo.erase(it->second->timestamp); + it = _cache.erase(it); + continue; + } if (it->second->port == port && ((tls && it->second->tls.get()) || (!tls && !it->second->tls.get()))) { auto socketEntry = move(it->second); socketEntry->dns->cachePriority--; + _fifo.erase(socketEntry->timestamp); _cache.erase(it); return socketEntry; } diff --git a/src/network/cache.cpp b/src/network/cache.cpp index 638f19b..2063fea 100644 --- a/src/network/cache.cpp +++ b/src/network/cache.cpp @@ -36,29 +36,39 @@ Cache::SocketEntry::SocketEntry(string hostname, unsigned port) : dns(nullptr), // The constructor {} //--------------------------------------------------------------------------- -void Cache::shutdownSocket(unique_ptr socketEntry) +void Cache::shutdownSocket(unique_ptr socketEntry, unsigned cacheEntries) // Shutdown the socket and dns cache { - // delete all occurences of the cached ip in map + // delete all occurences of the cached ips in the cache map if (socketEntry->hostname.length() > 0) { for (auto it = _cache.find(socketEntry->hostname); it != _cache.end();) { if (!strncmp(socketEntry->dns->addr->ai_addr->sa_data, it->second->dns->addr->ai_addr->sa_data, 14)) { it->second->dns->cachePriority = 0; - stopSocket(move(it->second), 0, 0, false); + _fifo.erase(it->second->timestamp); + stopSocket(move(it->second), 0, cacheEntries, false); it = _cache.erase(it); } else { it++; } } } - stopSocket(move(socketEntry), 0, 0, false); + stopSocket(move(socketEntry), 0, cacheEntries, false); } //--------------------------------------------------------------------------- -void Cache::stopSocket(unique_ptr socketEntry, uint64_t /*bytes*/, unsigned /*cacheEntries*/, bool reuseSocket) +void Cache::stopSocket(unique_ptr socketEntry, uint64_t /*bytes*/, unsigned cacheEntries, bool reuseSocket) // Stops the socket and either closes the connection or cashes it { if (reuseSocket && socketEntry->hostname.length() > 0 && socketEntry->port) { if (socketEntry->dns->cachePriority > 0) { + socketEntry->timestamp = _timestamp++; + for (auto it = _fifo.begin(); _fifo.size() >= cacheEntries;) { + if (it->second->fd >= 0) + close(it->second->fd); + it->second->fd = -1; + it->second->dns = nullptr; + it = _fifo.erase(it); + } + _fifo.emplace(socketEntry->timestamp, socketEntry.get()); _cache.emplace(socketEntry->hostname, move(socketEntry)); } else { close(socketEntry->fd); @@ -78,9 +88,15 @@ unique_ptr Cache::resolve(string hostname, unsigned port, bo { for (auto it = _cache.find(hostname); it != _cache.end();) { // loosely clean up the multimap cache + if (!it->second->dns) { + _fifo.erase(it->second->timestamp); + it = _cache.erase(it); + continue; + } if (it->second->port == port && ((tls && it->second->tls.get()) || (!tls && !it->second->tls.get()))) { auto socketEntry = move(it->second); socketEntry->dns->cachePriority--; + _fifo.erase(socketEntry->timestamp); _cache.erase(it); return socketEntry; } diff --git a/src/network/connection_manager.cpp b/src/network/connection_manager.cpp index e0513bc..e1ed02b 100644 --- a/src/network/connection_manager.cpp +++ b/src/network/connection_manager.cpp @@ -74,13 +74,14 @@ int32_t ConnectionManager::connect(string hostname, uint32_t port, bool tls, con throw runtime_error("Socket creation error!" + string(strerror(errno))); } + auto maxCacheEntries = (_maxCachedFds / _activeConnectionManagers) + 1; // Settings for socket // No blocking mode if (tcpSettings.nonBlocking > 0) { int flags = fcntl(socketEntry->fd, F_GETFL, 0); flags |= O_NONBLOCK; if (fcntl(socketEntry->fd, F_SETFL, flags) < 0) { - resCache->shutdownSocket(move(socketEntry)); + resCache->shutdownSocket(move(socketEntry), maxCacheEntries); throw runtime_error("Socket creation error! - non blocking error"); } } @@ -88,7 +89,7 @@ int32_t ConnectionManager::connect(string hostname, uint32_t port, bool tls, con // Keep Alive if (tcpSettings.keepAlive > 0) { if (setsockopt(socketEntry->fd, SOL_SOCKET, SO_KEEPALIVE, &tcpSettings.keepAlive, sizeof(tcpSettings.keepAlive))) { - resCache->shutdownSocket(move(socketEntry)); + resCache->shutdownSocket(move(socketEntry), maxCacheEntries); throw runtime_error("Socket creation error! - keep alive error"); } } @@ -96,7 +97,7 @@ int32_t ConnectionManager::connect(string hostname, uint32_t port, bool tls, con // Keep Idle if (tcpSettings.keepIdle > 0) { if (setsockopt(socketEntry->fd, SOL_TCP, TCP_KEEPIDLE, &tcpSettings.keepIdle, sizeof(tcpSettings.keepIdle))) { - resCache->shutdownSocket(move(socketEntry)); + resCache->shutdownSocket(move(socketEntry), maxCacheEntries); throw runtime_error("Socket creation error! - keep idle error"); } } @@ -104,7 +105,7 @@ int32_t ConnectionManager::connect(string hostname, uint32_t port, bool tls, con // Keep intvl if (tcpSettings.keepIntvl > 0) { if (setsockopt(socketEntry->fd, SOL_TCP, TCP_KEEPINTVL, &tcpSettings.keepIntvl, sizeof(tcpSettings.keepIntvl))) { - resCache->shutdownSocket(move(socketEntry)); + resCache->shutdownSocket(move(socketEntry), maxCacheEntries); throw runtime_error("Socket creation error! - keep intvl error"); } } @@ -112,7 +113,7 @@ int32_t ConnectionManager::connect(string hostname, uint32_t port, bool tls, con // Keep cnt if (tcpSettings.keepCnt > 0) { if (setsockopt(socketEntry->fd, SOL_TCP, TCP_KEEPCNT, &tcpSettings.keepCnt, sizeof(tcpSettings.keepCnt))) { - resCache->shutdownSocket(move(socketEntry)); + resCache->shutdownSocket(move(socketEntry), maxCacheEntries); throw runtime_error("Socket creation error! - keep cnt error"); } } @@ -120,7 +121,7 @@ int32_t ConnectionManager::connect(string hostname, uint32_t port, bool tls, con // No Delay if (tcpSettings.noDelay > 0) { if (setsockopt(socketEntry->fd, SOL_TCP, TCP_NODELAY, &tcpSettings.noDelay, sizeof(tcpSettings.noDelay))) { - resCache->shutdownSocket(move(socketEntry)); + resCache->shutdownSocket(move(socketEntry), maxCacheEntries); throw runtime_error("Socket creation error! - nodelay error"); } } @@ -128,7 +129,7 @@ int32_t ConnectionManager::connect(string hostname, uint32_t port, bool tls, con // Reuse ports if (tcpSettings.reusePorts > 0) { if (setsockopt(socketEntry->fd, SOL_SOCKET, SO_REUSEPORT, &tcpSettings.reusePorts, sizeof(tcpSettings.reusePorts))) { - resCache->shutdownSocket(move(socketEntry)); + resCache->shutdownSocket(move(socketEntry), maxCacheEntries); throw runtime_error("Socket creation error! - reuse port error"); } } @@ -136,7 +137,7 @@ int32_t ConnectionManager::connect(string hostname, uint32_t port, bool tls, con // Recv buffer if (tcpSettings.recvBuffer > 0) { if (setsockopt(socketEntry->fd, SOL_SOCKET, SO_RCVBUF, &tcpSettings.recvBuffer, sizeof(tcpSettings.recvBuffer))) { - resCache->shutdownSocket(move(socketEntry)); + resCache->shutdownSocket(move(socketEntry), maxCacheEntries); throw runtime_error("Socket creation error! - recvbuf error"); } } @@ -144,7 +145,7 @@ int32_t ConnectionManager::connect(string hostname, uint32_t port, bool tls, con // Lingering of sockets if (tcpSettings.linger > 0) { if (setsockopt(socketEntry->fd, SOL_TCP, TCP_LINGER2, &tcpSettings.linger, sizeof(tcpSettings.linger))) { - resCache->shutdownSocket(move(socketEntry)); + resCache->shutdownSocket(move(socketEntry), maxCacheEntries); throw runtime_error("Socket creation error - linger timeout error!" + string(strerror(errno))); } } @@ -158,7 +159,7 @@ int32_t ConnectionManager::connect(string hostname, uint32_t port, bool tls, con } if (setsockopt(socketEntry->fd, SOL_TCP, TCP_WINSHIFT, &windowShift, sizeof(windowShift))) { - resCache->shutdownSocket(move(socketEntry)); + resCache->shutdownSocket(move(socketEntry), maxCacheEntries); throw runtime_error("Socket creation error - win shift error!"); } } @@ -169,7 +170,7 @@ int32_t ConnectionManager::connect(string hostname, uint32_t port, bool tls, con if (tcpSettings.rfc1323 && tcpSettings.recvBuffer >= (64 << 10)) { int on = 1; if (setsockopt(socketEntry->fd, SOL_TCP, TCP_RFC1323, &on, sizeof(on))) { - resCache->shutdownSocket(move(socketEntry)); + resCache->shutdownSocket(move(socketEntry), maxCacheEntries); throw runtime_error("Socket creation error - rfc 1323 error!"); } } @@ -179,29 +180,29 @@ int32_t ConnectionManager::connect(string hostname, uint32_t port, bool tls, con #ifdef TCP_MAXSEG if (tcpSettings.mss > 0) { if (setsockopt(socketEntry->fd, SOL_TCP, TCP_MAXSEG, &tcpSettings.mss, sizeof(tcpSettings.mss))) { - resCache->shutdownSocket(move(socketEntry)); + resCache->shutdownSocket(move(socketEntry), maxCacheEntries); throw runtime_error("Socket creation error - max segment error!"); } } #endif - auto setTimeOut = [&resCache, &socketEntry](int fd, const TCPSettings& tcpSettings) { + auto setTimeOut = [&resCache, &socketEntry, maxCacheEntries](int fd, const TCPSettings& tcpSettings) { // Set timeout if (tcpSettings.timeout > 0) { struct timeval tv; tv.tv_sec = tcpSettings.timeout / (1000 * 1000); tv.tv_usec = tcpSettings.timeout % (1000 * 1000); if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, reinterpret_cast(&tv), sizeof tv)) { - resCache->shutdownSocket(move(socketEntry)); + resCache->shutdownSocket(move(socketEntry), maxCacheEntries); throw runtime_error("Socket creation error - recv timeout error!"); } if (setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, reinterpret_cast(&tv), sizeof tv)) { - resCache->shutdownSocket(move(socketEntry)); + resCache->shutdownSocket(move(socketEntry), maxCacheEntries); throw runtime_error("Socket creation error - send timeout error!"); } int timeoutInMs = tcpSettings.timeout / 1000; if (setsockopt(fd, SOL_TCP, TCP_USER_TIMEOUT, &timeoutInMs, sizeof(timeoutInMs))) { - resCache->shutdownSocket(move(socketEntry)); + resCache->shutdownSocket(move(socketEntry), maxCacheEntries); throw runtime_error("Socket creation error - tcp timeout error!" + string(strerror(errno))); } } @@ -210,7 +211,7 @@ int32_t ConnectionManager::connect(string hostname, uint32_t port, bool tls, con // Connect to remote auto connectRes = ::connect(socketEntry->fd, socketEntry->dns->addr->ai_addr, socketEntry->dns->addr->ai_addrlen); if (connectRes < 0 && errno != EINPROGRESS) { - resCache->shutdownSocket(move(socketEntry)); + resCache->shutdownSocket(move(socketEntry), maxCacheEntries); if (retryLimit > 0) { return connect(hostname, port, tls, tcpSettings, retryLimit - 1); } else { @@ -240,7 +241,7 @@ int32_t ConnectionManager::connect(string hostname, uint32_t port, bool tls, con int socketError; socklen_t socketErrorLen = sizeof(socketError); if (getsockopt(socketEntry->fd, SOL_SOCKET, SO_ERROR, &socketError, &socketErrorLen)) { - resCache->shutdownSocket(move(socketEntry)); + resCache->shutdownSocket(move(socketEntry), maxCacheEntries); throw runtime_error("Socket creation error! Could not retrieve socket options!"); } @@ -249,12 +250,12 @@ int32_t ConnectionManager::connect(string hostname, uint32_t port, bool tls, con setTimeOut(socketEntry->fd, tcpSettings); return emplaceSocket(); } else { - resCache->shutdownSocket(move(socketEntry)); + resCache->shutdownSocket(move(socketEntry), maxCacheEntries); throw runtime_error("Socket creation error! " + string(strerror(socketError))); } } else { // Reached timeout - resCache->shutdownSocket(move(socketEntry)); + resCache->shutdownSocket(move(socketEntry), maxCacheEntries); if (retryLimit > 0) { return connect(hostname, port, tls, tcpSettings, retryLimit - 1); } else { @@ -279,11 +280,12 @@ void ConnectionManager::disconnect(int32_t fd, const TCPSettings* tcpSettings, u } else { resCache = _cache.find("")->second.get(); } + auto maxCacheEntries = (_maxCachedFds / _activeConnectionManagers) + 1; if (forceShutdown) { - resCache->shutdownSocket(move(socketIt->second)); + resCache->shutdownSocket(move(socketIt->second), maxCacheEntries); _fdSockets.erase(socketIt); } else { - resCache->stopSocket(move(socketIt->second), bytes, (_maxCachedFds / _activeConnectionManagers) + 1, tcpSettings ? tcpSettings->reuse : false); + resCache->stopSocket(move(socketIt->second), bytes, maxCacheEntries, tcpSettings ? tcpSettings->reuse : false); _fdSockets.erase(socketIt); } }