Skip to content

Commit

Permalink
Rewrote caching infrastructure
Browse files Browse the repository at this point in the history
  • Loading branch information
durner committed Mar 28, 2024
1 parent 2eac024 commit 7fbb36d
Show file tree
Hide file tree
Showing 18 changed files with 317 additions and 300 deletions.
6 changes: 3 additions & 3 deletions include/cloud/aws_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ class AWSCache : public network::Cache {

public:
/// The constructor
explicit AWSCache(unsigned cacheEntries);
AWSCache();
/// The address resolving
virtual const addrinfo* resolve(std::string hostname, std::string port, bool& reuse) override;
virtual std::unique_ptr<network::Cache::SocketEntry> resolve(std::string hostname, unsigned port, bool tls) override;
/// The destructor
virtual ~AWSCache() noexcept = default;
virtual ~AWSCache() = default;
};
//---------------------------------------------------------------------------
}; // namespace cloud
Expand Down
67 changes: 50 additions & 17 deletions include/network/cache.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
#pragma once
#include "network/tls_connection.hpp"
#include <deque>
#include <map>
#include <memory>
#include <string>
#include <string_view>
Expand All @@ -21,29 +24,59 @@ class IOUringSocket;
//---------------------------------------------------------------------------
/// The addr resolver and cacher, which is not thread safe
class Cache {
public:
/// The dns entry
struct DnsEntry {
/// The addr
std::unique_ptr<addrinfo, decltype(&freeaddrinfo)> addr;
/// The cache priority
int cachePriority = 0;

/// The constructor
DnsEntry(std::unique_ptr<addrinfo, decltype(&freeaddrinfo)> address, int cachePriority = 0) : addr(move(address)), cachePriority(cachePriority) {}
};

/// The fd socket entry
struct SocketEntry {
/// The DnsEntry
std::unique_ptr<DnsEntry> dns;
/// The optional tls connection
std::unique_ptr<TLSConnection> tls;
/// The timestamp
size_t timestamp;
/// The fd
int32_t fd;
/// The port
unsigned port;
/// The hostname
std::string hostname;

/// The constructor
SocketEntry(std::string hostname, unsigned port);
};


protected:
/// The addr info
std::vector<std::unique_ptr<addrinfo, decltype(&freeaddrinfo)>> _addr;
/// The current addr string
std::vector<std::pair</*addrAndPort=*/std::string, /*cacheCtr=*/int>> _addrString;
/// The ctr
unsigned _addrCtr;
/// The cache, uses hostname as key, multimap traversals same keys in the insertion order
std::multimap<std::string, std::unique_ptr<SocketEntry>> _cache;
/// The fifo deletion of sockets to help reduce open fds (ulimit -n issue)
std::map<size_t, SocketEntry*> _fifo;
/// The timestamp counter for deletion
size_t _timestamp = 0;
/// The default priority
int _defaultPriority = 8;

public:
/// The constructor
explicit Cache(unsigned entries);
/// The address resolving
virtual const addrinfo* resolve(std::string hostname, std::string port, bool& oldAddress);
/// Reset the current cache bucket
void resetBucket() { _addrString[_addrCtr % _addrString.size()].second = 0; }
virtual std::unique_ptr<SocketEntry> resolve(std::string hostname, unsigned port, bool tls);
/// Start the timing and advance to the next cache bucket
virtual void startSocket(int /*fd*/) { _addrCtr++; }
/// Stop the timing
virtual void stopSocket(int /*fd*/, uint64_t /*bytes*/) {}
/// Shutdown of the socket should clear the same addresses
virtual void shutdownSocket(int /*fd*/) {}
virtual void startSocket(int /*fd*/) {}
/// Stops the socket and either closes the connection or cashes it
virtual void stopSocket(std::unique_ptr<SocketEntry> socketEntry, uint64_t bytes, unsigned cachedEntries, bool reuseSocket);
/// Shutdown of the socket and clears the same addresses
virtual void shutdownSocket(std::unique_ptr<SocketEntry> socketEntry);
/// The destructor
virtual ~Cache() noexcept = default;
virtual ~Cache();

/// Get the tld
static std::string_view tld(std::string_view domain);
Expand Down
29 changes: 5 additions & 24 deletions include/network/connection_manager.hpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#pragma once
#include "network/cache.hpp"
#include <cassert>
#include <deque>
#include <memory>
#include <string>
#include <unordered_map>
Expand All @@ -20,6 +19,7 @@ namespace network {
//---------------------------------------------------------------------------
class TLSConnection;
class TLSContext;
struct DnsEntry;
//---------------------------------------------------------------------------
// This class acts as the connection enabler, closer, and main
// cache for sockets and their optional tls connection.
Expand Down Expand Up @@ -63,30 +63,11 @@ class ConnectionManager {
}
};

/// The fd socket entry
struct SocketEntry {
/// The optional tls connection
std::unique_ptr<TLSConnection> tls;
/// The fd
int32_t fd;
/// The port
unsigned port;
/// The hostname
std::string hostname;

/// The constructor
SocketEntry(int32_t fd, std::string hostname, unsigned port, std::unique_ptr<TLSConnection> tls);
};

private:
/// The socket wrapper
std::unique_ptr<IOUringSocket> _socketWrapper;
/// The active sockets
std::unordered_map<int32_t, std::unique_ptr<SocketEntry>> _fdSockets;
/// The fd socket cache, uses hostname as key
std::unordered_multimap<std::string, std::unique_ptr<SocketEntry>> _fdCache;
/// The queue
std::deque<SocketEntry*> _fdQueue;
std::unordered_map<int32_t, std::unique_ptr<Cache::SocketEntry>> _fdSockets;
/// Cache
std::unordered_map<std::string, std::unique_ptr<Cache>> _cache;
/// The tls context
Expand All @@ -99,14 +80,14 @@ class ConnectionManager {

public:
/// The constructor
explicit ConnectionManager(unsigned uringEntries, unsigned cacheEntries);
explicit ConnectionManager(unsigned uringEntries);
/// The destructor
~ConnectionManager();

/// Creates a new socket connection
[[nodiscard]] int32_t connect(std::string hostname, uint32_t port, bool tls, const TCPSettings& tcpSettings, bool useCache = true, int retryLimit = 16);
[[nodiscard]] int32_t connect(std::string hostname, uint32_t port, bool tls, const TCPSettings& tcpSettings, int retryLimit = 16);
/// Disconnects the socket
void disconnect(int32_t fd, std::string hostname = "", uint32_t port = 0, const TCPSettings* tcpSettings = nullptr, uint64_t bytes = 0, bool forceShutdown = false);
void disconnect(int32_t fd, const TCPSettings* tcpSettings = nullptr, uint64_t bytes = 0, bool forceShutdown = false);

/// Add domain-specific cache
void addCache(const std::string& hostname, std::unique_ptr<Cache> cache);
Expand Down
2 changes: 0 additions & 2 deletions include/network/tasked_send_receiver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ class TaskedSendReceiverGroup {
uint64_t _chunkSize;
/// The queue maximum for each TaskedSendReceiver
unsigned _concurrentRequests;
/// The maximum dns cache size
unsigned _cacheEntries = 32;
/// The TCP settings
std::unique_ptr<ConnectionManager::TCPSettings> _tcpSettings;

Expand Down
16 changes: 6 additions & 10 deletions include/network/throughput_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,21 @@ class ThroughputCache : public network::Cache {
/// The seconds vector as stable ring buffer
std::vector<double> _throughput;
/// The map between fd and start time
std::unordered_map<int, std::pair<unsigned, std::chrono::steady_clock::time_point>> _fdMap;
std::unordered_map<int, std::chrono::steady_clock::time_point> _fdMap;
/// The seconds vector iterator
uint64_t _throughputIterator;
/// The maximum history
const unsigned _maxHistory = 128;

public:
/// The constructor
explicit ThroughputCache(unsigned cacheEntries);
/// The address resolving
virtual const addrinfo* resolve(std::string hostname, std::string port, bool& reuse) override;
/// Start the timing
explicit ThroughputCache();
/// Start the timing and advance to the next cache bucket
virtual void startSocket(int fd) override;
/// Stop the timing
virtual void stopSocket(int fd, uint64_t bytes) override;
/// Clears the used server from the cache
virtual void shutdownSocket(int fd) override;
/// Stops the socket and either closes the connection or cashes it
virtual void stopSocket(std::unique_ptr<SocketEntry> socketEntry, uint64_t bytes, unsigned cachedEntries, bool reuseSocket) override;
/// The destructor
virtual ~ThroughputCache() noexcept = default;
virtual ~ThroughputCache() = default;
};
//---------------------------------------------------------------------------
}; // namespace network
Expand Down
3 changes: 1 addition & 2 deletions include/utils/unordered_map.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,7 @@ class UnorderedMap {

/// The advance function of the forward iterator
constexpr Iterator& operator++() {
/// TODO: durner
return *this;
throw std::runtime_error("advancing an iterator is not implemented");
}
};

Expand Down
6 changes: 6 additions & 0 deletions include/utils/utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
namespace anyblob {
namespace utils {
//---------------------------------------------------------------------------
#ifndef NDEBUG
#define verify(expression) assert(expression)
#else
#define verify(expression) ((void) (expression))
#endif
//---------------------------------------------------------------------------
/// Encode url special characters in %HEX
std::string encodeUrlParameters(const std::string& encode);
/// Encode everything from binary representation to hex
Expand Down
23 changes: 12 additions & 11 deletions src/cloud/aws.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "network/original_message.hpp"
#include "network/tasked_send_receiver.hpp"
#include "utils/data_vector.hpp"
#include "utils/utils.hpp"
#include <chrono>
#include <iomanip>
#include <sstream>
Expand Down Expand Up @@ -63,8 +64,8 @@ Provider::Instance AWS::getInstanceDetails(network::TaskedSendReceiverHandle& se
info.port = getIAMPort();
HTTP http(info);
auto originalMsg = make_unique<network::OriginalMessage>(move(message), http);
assert(sendReceiverHandle.sendSync(originalMsg.get()));
assert(sendReceiverHandle.processSync());
verify(sendReceiverHandle.sendSync(originalMsg.get()));
verify(sendReceiverHandle.processSync());
auto& content = originalMsg->result.getDataVector();
unique_ptr<network::HttpHelper::Info> infoPtr;
auto s = network::HttpHelper::retrieveContent(content.cdata(), content.size(), infoPtr);
Expand All @@ -87,8 +88,8 @@ string AWS::getInstanceRegion(network::TaskedSendReceiverHandle& sendReceiverHan
info.port = getIAMPort();
HTTP http(info);
auto originalMsg = make_unique<network::OriginalMessage>(move(message), http);
assert(sendReceiverHandle.sendSync(originalMsg.get()));
assert(sendReceiverHandle.processSync());
verify(sendReceiverHandle.sendSync(originalMsg.get()));
verify(sendReceiverHandle.processSync());
auto& content = originalMsg->result.getDataVector();
unique_ptr<network::HttpHelper::Info> infoPtr;
auto s = network::HttpHelper::retrieveContent(content.cdata(), content.size(), infoPtr);
Expand Down Expand Up @@ -243,16 +244,16 @@ void AWS::initSecret(network::TaskedSendReceiverHandle& sendReceiverHandle)
info.port = getIAMPort();
HTTP http(info);
auto originalMsg = make_unique<network::OriginalMessage>(move(message), http);
assert(sendReceiverHandle.sendSync(originalMsg.get()));
assert(sendReceiverHandle.processSync());
verify(sendReceiverHandle.sendSync(originalMsg.get()));
verify(sendReceiverHandle.processSync());
auto& content = originalMsg->result.getDataVector();
unique_ptr<network::HttpHelper::Info> infoPtr;
auto s = network::HttpHelper::retrieveContent(content.cdata(), content.size(), infoPtr);
string iamUser;
message = downloadSecret(s, iamUser);
originalMsg = make_unique<network::OriginalMessage>(move(message), http);
assert(sendReceiverHandle.sendSync(originalMsg.get()));
assert(sendReceiverHandle.processSync());
verify(sendReceiverHandle.sendSync(originalMsg.get()));
verify(sendReceiverHandle.processSync());
auto& secretContent = originalMsg->result.getDataVector();
infoPtr.reset();
s = network::HttpHelper::retrieveContent(secretContent.cdata(), secretContent.size(), infoPtr);
Expand All @@ -277,8 +278,8 @@ void AWS::initSecret(network::TaskedSendReceiverHandle& sendReceiverHandle)
info.port = getPort();
HTTP http(info);
auto originalMsg = make_unique<network::OriginalMessage>(move(message), http);
assert(sendReceiverHandle.sendSync(originalMsg.get()));
assert(sendReceiverHandle.processSync());
verify(sendReceiverHandle.sendSync(originalMsg.get()));
verify(sendReceiverHandle.processSync());
auto& secretContent = originalMsg->result.getDataVector();
unique_ptr<network::HttpHelper::Info> infoPtr;
auto s = network::HttpHelper::retrieveContent(secretContent.cdata(), secretContent.size(), infoPtr);
Expand Down Expand Up @@ -309,7 +310,7 @@ void AWS::initCache(network::TaskedSendReceiverHandle& sendReceiverHandle)
{
assert(sendReceiverHandle.get());
if (_type == Provider::CloudService::AWS) {
sendReceiverHandle.get()->addCache("amazonaws.com", unique_ptr<network::Cache>(new cloud::AWSCache(sendReceiverHandle.getGroup()->getConcurrentRequests())));
sendReceiverHandle.get()->addCache("amazonaws.com", unique_ptr<network::Cache>(new cloud::AWSCache()));
}
}
//---------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit 7fbb36d

Please sign in to comment.