Skip to content

Commit

Permalink
Rewrote caching infrastructure
Browse files Browse the repository at this point in the history
  • Loading branch information
durner committed Mar 27, 2024
1 parent 2eac024 commit 5ccd297
Show file tree
Hide file tree
Showing 14 changed files with 234 additions and 266 deletions.
4 changes: 2 additions & 2 deletions include/cloud/aws_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ class AWSCache : public network::Cache {

public:
/// The constructor
explicit AWSCache(unsigned cacheEntries);
AWSCache();
/// The address resolving
virtual const addrinfo* resolve(std::string hostname, std::string port, bool& reuse) override;
virtual std::unique_ptr<network::Cache::SocketEntry> resolve(std::string hostname, unsigned port, bool tls) override;
/// The destructor
virtual ~AWSCache() noexcept = default;
};
Expand Down
57 changes: 43 additions & 14 deletions include/network/cache.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
#pragma once
#include "network/tls_connection.hpp"
#include <deque>
#include <map>
#include <memory>
#include <string>
#include <string_view>
Expand All @@ -21,27 +24,53 @@ class IOUringSocket;
//---------------------------------------------------------------------------
/// The addr resolver and cacher, which is not thread safe
class Cache {
public:
/// The dns entry
struct DnsEntry {
/// The addr
std::unique_ptr<addrinfo, decltype(&freeaddrinfo)> addr;
/// The cache priority
int cachePriority = 0;

/// The constructor
DnsEntry(std::unique_ptr<addrinfo, decltype(&freeaddrinfo)> address, int cachePriority = 0) : addr(move(address)), cachePriority(cachePriority) {}
};

/// The fd socket entry
struct SocketEntry {
/// The DnsEntry
std::unique_ptr<DnsEntry> dns;
/// The optional tls connection
std::unique_ptr<TLSConnection> tls;
/// The fd
int32_t fd;
/// The port
unsigned port;
/// The hostname
std::string hostname;

/// The constructor
SocketEntry(std::string hostname, unsigned port);
};


protected:
/// The addr info
std::vector<std::unique_ptr<addrinfo, decltype(&freeaddrinfo)>> _addr;
/// The current addr string
std::vector<std::pair</*addrAndPort=*/std::string, /*cacheCtr=*/int>> _addrString;
/// The ctr
unsigned _addrCtr;
/// The cache, uses hostname as key, multimap traversals same keys in the insertion order
std::multimap<std::string, std::unique_ptr<SocketEntry>> _cache;
/// The queue
std::deque<SocketEntry*> _queue;
/// The default priority
int _defaultPriority = 8;

public:
/// The constructor
explicit Cache(unsigned entries);
/// The address resolving
virtual const addrinfo* resolve(std::string hostname, std::string port, bool& oldAddress);
/// Reset the current cache bucket
void resetBucket() { _addrString[_addrCtr % _addrString.size()].second = 0; }
virtual std::unique_ptr<SocketEntry> resolve(std::string hostname, unsigned port, bool tls);
/// Start the timing and advance to the next cache bucket
virtual void startSocket(int /*fd*/) { _addrCtr++; }
virtual void startSocket(int /*fd*/) {}
/// Stop the timing
virtual void stopSocket(int /*fd*/, uint64_t /*bytes*/) {}
virtual void stopSocket(std::unique_ptr<SocketEntry> socketEntry, uint64_t bytes, unsigned cachedEntries, bool reuseSocket);
/// Shutdown of the socket should clear the same addresses
virtual void shutdownSocket(int /*fd*/) {}
virtual void shutdownSocket(std::unique_ptr<SocketEntry> socketEntry);
/// The destructor
virtual ~Cache() noexcept = default;

Expand Down
29 changes: 5 additions & 24 deletions include/network/connection_manager.hpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#pragma once
#include "network/cache.hpp"
#include <cassert>
#include <deque>
#include <memory>
#include <string>
#include <unordered_map>
Expand All @@ -20,6 +19,7 @@ namespace network {
//---------------------------------------------------------------------------
class TLSConnection;
class TLSContext;
struct DnsEntry;
//---------------------------------------------------------------------------
// This class acts as the connection enabler, closer, and main
// cache for sockets and their optional tls connection.
Expand Down Expand Up @@ -63,30 +63,11 @@ class ConnectionManager {
}
};

/// The fd socket entry
struct SocketEntry {
/// The optional tls connection
std::unique_ptr<TLSConnection> tls;
/// The fd
int32_t fd;
/// The port
unsigned port;
/// The hostname
std::string hostname;

/// The constructor
SocketEntry(int32_t fd, std::string hostname, unsigned port, std::unique_ptr<TLSConnection> tls);
};

private:
/// The socket wrapper
std::unique_ptr<IOUringSocket> _socketWrapper;
/// The active sockets
std::unordered_map<int32_t, std::unique_ptr<SocketEntry>> _fdSockets;
/// The fd socket cache, uses hostname as key
std::unordered_multimap<std::string, std::unique_ptr<SocketEntry>> _fdCache;
/// The queue
std::deque<SocketEntry*> _fdQueue;
std::unordered_map<int32_t, std::unique_ptr<Cache::SocketEntry>> _fdSockets;
/// Cache
std::unordered_map<std::string, std::unique_ptr<Cache>> _cache;
/// The tls context
Expand All @@ -99,14 +80,14 @@ class ConnectionManager {

public:
/// The constructor
explicit ConnectionManager(unsigned uringEntries, unsigned cacheEntries);
explicit ConnectionManager(unsigned uringEntries);
/// The destructor
~ConnectionManager();

/// Creates a new socket connection
[[nodiscard]] int32_t connect(std::string hostname, uint32_t port, bool tls, const TCPSettings& tcpSettings, bool useCache = true, int retryLimit = 16);
[[nodiscard]] int32_t connect(std::string hostname, uint32_t port, bool tls, const TCPSettings& tcpSettings, int retryLimit = 16);
/// Disconnects the socket
void disconnect(int32_t fd, std::string hostname = "", uint32_t port = 0, const TCPSettings* tcpSettings = nullptr, uint64_t bytes = 0, bool forceShutdown = false);
void disconnect(int32_t fd, const TCPSettings* tcpSettings = nullptr, uint64_t bytes = 0, bool forceShutdown = false);

/// Add domain-specific cache
void addCache(const std::string& hostname, std::unique_ptr<Cache> cache);
Expand Down
2 changes: 0 additions & 2 deletions include/network/tasked_send_receiver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ class TaskedSendReceiverGroup {
uint64_t _chunkSize;
/// The queue maximum for each TaskedSendReceiver
unsigned _concurrentRequests;
/// The maximum dns cache size
unsigned _cacheEntries = 32;
/// The TCP settings
std::unique_ptr<ConnectionManager::TCPSettings> _tcpSettings;

Expand Down
12 changes: 4 additions & 8 deletions include/network/throughput_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,19 @@ class ThroughputCache : public network::Cache {
/// The seconds vector as stable ring buffer
std::vector<double> _throughput;
/// The map between fd and start time
std::unordered_map<int, std::pair<unsigned, std::chrono::steady_clock::time_point>> _fdMap;
std::unordered_map<int, std::chrono::steady_clock::time_point> _fdMap;
/// The seconds vector iterator
uint64_t _throughputIterator;
/// The maximum history
const unsigned _maxHistory = 128;

public:
/// The constructor
explicit ThroughputCache(unsigned cacheEntries);
/// The address resolving
virtual const addrinfo* resolve(std::string hostname, std::string port, bool& reuse) override;
/// Start the timing
explicit ThroughputCache();
/// Start the timing and advance to the next cache bucket
virtual void startSocket(int fd) override;
/// Stop the timing
virtual void stopSocket(int fd, uint64_t bytes) override;
/// Clears the used server from the cache
virtual void shutdownSocket(int fd) override;
virtual void stopSocket(std::unique_ptr<SocketEntry> socketEntry, uint64_t bytes, unsigned cachedEntries, bool reuseSocket) override;
/// The destructor
virtual ~ThroughputCache() noexcept = default;
};
Expand Down
2 changes: 1 addition & 1 deletion src/cloud/aws.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ void AWS::initCache(network::TaskedSendReceiverHandle& sendReceiverHandle)
{
assert(sendReceiverHandle.get());
if (_type == Provider::CloudService::AWS) {
sendReceiverHandle.get()->addCache("amazonaws.com", unique_ptr<network::Cache>(new cloud::AWSCache(sendReceiverHandle.getGroup()->getConcurrentRequests())));
sendReceiverHandle.get()->addCache("amazonaws.com", unique_ptr<network::Cache>(new cloud::AWSCache()));
}
}
//---------------------------------------------------------------------------
Expand Down
85 changes: 43 additions & 42 deletions src/cloud/aws_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,59 +18,60 @@ namespace cloud {
//---------------------------------------------------------------------------
using namespace std;
//---------------------------------------------------------------------------
AWSCache::AWSCache(unsigned entries) : Cache(entries), _mtuCache()
AWSCache::AWSCache() : Cache(), _mtuCache()
// Constructor
{
}
//---------------------------------------------------------------------------
const addrinfo* AWSCache::resolve(string hostname, string port, bool& oldAddress)
unique_ptr<network::Cache::SocketEntry> AWSCache::resolve(string hostname, unsigned port, bool tls)
// Resolve the request
{
auto addrPos = _addrCtr % static_cast<unsigned>(_addrString.size());
auto curCtr = _addrString[addrPos].second--;
auto hostString = hostname + ":" + port;
// Reuses old address
oldAddress = true;
if (_addrString[addrPos].first.compare(hostString) || curCtr == 0) {
struct addrinfo hints = {};
memset(&hints, 0, sizeof hints);
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;

addrinfo* temp;
if (getaddrinfo(hostname.c_str(), port.c_str(), &hints, &temp)) {
throw runtime_error("hostname getaddrinfo error");
for (auto it = _cache.find(hostname); it != _cache.end();) {
// loosely clean up the multimap cache
if (it->second->port == port && ((tls && it->second->tls.get()) || (!tls && !it->second->tls.get()))) {
auto socketEntry = move(it->second);
socketEntry->dns->cachePriority--;
_cache.erase(it);
return socketEntry;
}
_addr[addrPos].reset(temp);
if (!Cache::tld(hostname).compare("amazonaws.com")) {
struct sockaddr_in* p = reinterpret_cast<sockaddr_in*>(_addr[addrPos]->ai_addr);
auto ipAsInt = p->sin_addr.s_addr;
auto it = _mtuCache.find(ipAsInt);
if (it != _mtuCache.end()) {
if (it->second)
_addrString[addrPos] = {move(hostString), numeric_limits<int>::max()};
else
_addrString[addrPos] = {move(hostString), 12};
it++;
}
struct addrinfo hints = {};
memset(&hints, 0, sizeof hints);
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;

addrinfo* temp;
char port_str[16] = {};
sprintf(port_str, "%d", port);
if (getaddrinfo(hostname.c_str(), port_str, &hints, &temp) != 0) {
throw runtime_error("hostname getaddrinfo error");
}
auto socketEntry = make_unique<Cache::SocketEntry>(hostname, port);
socketEntry->dns = make_unique<DnsEntry>(unique_ptr<addrinfo, decltype(&freeaddrinfo)>(temp, &freeaddrinfo), _defaultPriority);

if (!Cache::tld(hostname).compare("amazonaws.com")) {
struct sockaddr_in* p = reinterpret_cast<sockaddr_in*>(socketEntry->dns->addr->ai_addr);
auto ipAsInt = p->sin_addr.s_addr;
auto it = _mtuCache.find(ipAsInt);
if (it != _mtuCache.end()) {
if (it->second)
socketEntry->dns->cachePriority = numeric_limits<int>::max();
} else {
char ipv4[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &p->sin_addr, ipv4, INET_ADDRSTRLEN);
string cmd = "timeout 0.01 ping -s 1473 -D " + string(ipv4) + " -c 1 >>/dev/null 2>>/dev/null";
auto res = system(cmd.c_str());
if (!res) {
_mtuCache.emplace(ipAsInt, true);
socketEntry->dns->cachePriority = numeric_limits<int>::max();
} else {
char ipv4[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &p->sin_addr, ipv4, INET_ADDRSTRLEN);
string cmd = "timeout 0.01 ping -s 1473 -D " + string(ipv4) + " -c 1 >>/dev/null 2>>/dev/null";
auto res = system(cmd.c_str());
if (!res) {
_mtuCache.emplace(ipAsInt, true);
_addrString[addrPos] = {move(hostString), numeric_limits<int>::max()};
} else {
_mtuCache.emplace(ipAsInt, false);
_addrString[addrPos] = {move(hostString), 12};
}
_mtuCache.emplace(ipAsInt, false);
}
} else {
_addrString[addrPos] = {move(hostString), 12};
}
oldAddress = false;
}
return _addr[addrPos].get();
return socketEntry;
}
//---------------------------------------------------------------------------
}; // namespace cloud
Expand Down
Loading

0 comments on commit 5ccd297

Please sign in to comment.