Skip to content

Commit

Permalink
Updated naming of resolver to cache -> prepare to merge fd and dns cache
Browse files Browse the repository at this point in the history
  • Loading branch information
durner committed Mar 27, 2024
1 parent 9b2be80 commit 2eac024
Show file tree
Hide file tree
Showing 19 changed files with 109 additions and 80 deletions.
4 changes: 2 additions & 2 deletions example/benchmark/src/benchmark/bandwidth.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#include "benchmark/bandwidth.hpp"
#include "cloud/aws.hpp"
#include "cloud/aws_resolver.hpp"
#include "cloud/aws_cache.hpp"
#include "cloud/azure.hpp"
#include "cloud/gcp.hpp"
#include "network/original_message.hpp"
Expand Down Expand Up @@ -172,7 +172,7 @@ void Bandwidth::runUring(const Settings& benchmarkSettings, const string& uri)
auto awsProvider = static_cast<cloud::AWS*>(cloudProvider.get());
if (!benchmarkSettings.resolver.compare("aws")) {
for (auto i = 0u; i < benchmarkSettings.concurrentThreads; i++) {
awsProvider->initResolver(*sendReceiverHandles[i].get());
awsProvider->initCache(*sendReceiverHandles[i].get());
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions example/simple/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ int main(int /*argc*/, char** /*argv*/) {
bool https = false;
auto provider = anyblob::cloud::Provider::makeProvider(bucketName, https, "", "", &sendReceiverHandle);

// Optionally init the specialized aws resolver
// provider->initResolver(sendReceiverHandle);
// Optionally init the specialized aws cache
// provider->initCache(sendReceiverHandle);

// Update the concurrency according to instance settings
auto config = provider->getConfig(sendReceiverHandle);
Expand Down
4 changes: 2 additions & 2 deletions include/cloud/aws.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ class AWS : public Provider {
[[nodiscard]] Provider::Instance getInstanceDetails(network::TaskedSendReceiverHandle& sendReceiver) override;
/// Get the region of the instance
[[nodiscard]] static std::string getInstanceRegion(network::TaskedSendReceiverHandle& sendReceiver);
/// Init the resolver
void initResolver(network::TaskedSendReceiverHandle& sendReceiverHandle) override;
/// Init the Cache
void initCache(network::TaskedSendReceiverHandle& sendReceiverHandle) override;

/// The constructor
explicit AWS(const RemoteInfo& info) : _settings({info.bucket, info.region, info.endpoint, info.port, info.zonal}), _mutex() {
Expand Down
10 changes: 5 additions & 5 deletions include/cloud/aws_resolver.hpp → include/cloud/aws_cache.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#pragma once
#include "network/resolver.hpp"
#include "network/cache.hpp"
#include <unordered_map>
//---------------------------------------------------------------------------
// AnyBlob - Universal Cloud Object Storage Library
Expand All @@ -13,18 +13,18 @@ namespace anyblob {
//---------------------------------------------------------------------------
namespace cloud {
//---------------------------------------------------------------------------
/// Implements the AWS resolver logic
class AWSResolver : public network::Resolver {
/// Implements the AWS cache logic
class AWSCache : public network::Cache {
/// The good mtu cache
std::unordered_map<unsigned, bool> _mtuCache;

public:
/// The constructor
explicit AWSResolver(unsigned cacheEntries);
explicit AWSCache(unsigned cacheEntries);
/// The address resolving
virtual const addrinfo* resolve(std::string hostname, std::string port, bool& reuse) override;
/// The destructor
virtual ~AWSResolver() noexcept = default;
virtual ~AWSCache() noexcept = default;
};
//---------------------------------------------------------------------------
}; // namespace cloud
Expand Down
4 changes: 2 additions & 2 deletions include/cloud/provider.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ class Provider {
/// Create a provider (keyId is access email for GCP/Azure)
[[nodiscard]] static std::unique_ptr<Provider> makeProvider(const std::string& filepath, bool https = false, const std::string& keyId = "", const std::string& keyFile = "", network::TaskedSendReceiverHandle* sendReceiverHandle = nullptr);

/// Init the resolver for specific provider
virtual void initResolver(network::TaskedSendReceiverHandle& /*sendReceiverHandle*/) {}
/// Init the cache for specific provider
virtual void initCache(network::TaskedSendReceiverHandle& /*sendReceiverHandle*/) {}
/// Get the instance infos
[[nodiscard]] virtual Instance getInstanceDetails(network::TaskedSendReceiverHandle& sendReceiverHandle) = 0;
/// Get the config
Expand Down
6 changes: 3 additions & 3 deletions include/network/resolver.hpp → include/network/cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace network {
class IOUringSocket;
//---------------------------------------------------------------------------
/// The addr resolver and cacher, which is not thread safe
class Resolver {
class Cache {
protected:
/// The addr info
std::vector<std::unique_ptr<addrinfo, decltype(&freeaddrinfo)>> _addr;
Expand All @@ -31,7 +31,7 @@ class Resolver {

public:
/// The constructor
explicit Resolver(unsigned entries);
explicit Cache(unsigned entries);
/// The address resolving
virtual const addrinfo* resolve(std::string hostname, std::string port, bool& oldAddress);
/// Reset the current cache bucket
Expand All @@ -43,7 +43,7 @@ class Resolver {
/// Shutdown of the socket should clear the same addresses
virtual void shutdownSocket(int /*fd*/) {}
/// The destructor
virtual ~Resolver() noexcept = default;
virtual ~Cache() noexcept = default;

/// Get the tld
static std::string_view tld(std::string_view domain);
Expand Down
21 changes: 15 additions & 6 deletions include/network/connection_manager.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once
#include "network/resolver.hpp"
#include "network/cache.hpp"
#include <cassert>
#include <deque>
#include <memory>
#include <string>
#include <unordered_map>
Expand Down Expand Up @@ -73,6 +74,7 @@ class ConnectionManager {
/// The hostname
std::string hostname;

/// The constructor
SocketEntry(int32_t fd, std::string hostname, unsigned port, std::unique_ptr<TLSConnection> tls);
};

Expand All @@ -83,14 +85,21 @@ class ConnectionManager {
std::unordered_map<int32_t, std::unique_ptr<SocketEntry>> _fdSockets;
/// The fd socket cache, uses hostname as key
std::unordered_multimap<std::string, std::unique_ptr<SocketEntry>> _fdCache;
/// Resolver
std::unordered_map<std::string, std::unique_ptr<Resolver>> _resolverCache;
/// The queue
std::deque<SocketEntry*> _fdQueue;
/// Cache
std::unordered_map<std::string, std::unique_ptr<Cache>> _cache;
/// The tls context
std::unique_ptr<network::TLSContext> _context;

/// The counter of current connection managers
static std::atomic<unsigned> _activeConnectionManagers;
/// The maximum number of cached fds, TODO: access ulimit for this
constexpr static unsigned _maxCachedFds = 512;

public:
/// The constructor
explicit ConnectionManager(unsigned uringEntries, unsigned resolverCacheEntries);
explicit ConnectionManager(unsigned uringEntries, unsigned cacheEntries);
/// The destructor
~ConnectionManager();

Expand All @@ -99,8 +108,8 @@ class ConnectionManager {
/// Disconnects the socket
void disconnect(int32_t fd, std::string hostname = "", uint32_t port = 0, const TCPSettings* tcpSettings = nullptr, uint64_t bytes = 0, bool forceShutdown = false);

/// Add resolver
void addResolver(const std::string& hostname, std::unique_ptr<Resolver> resolver);
/// Add domain-specific cache
void addCache(const std::string& hostname, std::unique_ptr<Cache> cache);
/// Checks for a timeout
bool checkTimeout(int fd, const TCPSettings& settings);

Expand Down
6 changes: 3 additions & 3 deletions include/network/tasked_send_receiver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace network {
//---------------------------------------------------------------------------
class TaskedSendReceiver;
class TaskedSendReceiverHandle;
class Resolver;
class Cache;
struct OriginalMessage;
//---------------------------------------------------------------------------
/// Shared submission and completions queue with multiple TaskedSendReceivers
Expand Down Expand Up @@ -125,8 +125,8 @@ class TaskedSendReceiver {
public:
/// Get the group
[[nodiscard]] const TaskedSendReceiverGroup* getGroup() const { return &_group; }
/// Adds a resolver
void addResolver(const std::string& hostname, std::unique_ptr<Resolver> resolver);
/// Adds a domain-specific cache
void addCache(const std::string& hostname, std::unique_ptr<Cache> cache);
/// Process local submissions (should be used for sync requests)
inline void processSync(bool oneQueueInvocation = true) { sendReceive(true, oneQueueInvocation); }
/// Set the timings
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#pragma once
// The `ThroughputResolver` depends on gnu stdlib associative containers that are
// not supported by libcxx. We remove the throughput resolver in its entirety when
// The `ThroughputCache` depends on gnu stdlib associative containers that are
// not supported by libcxx. We remove the throughput cache in its entirety when
// building with libcxx.
#ifndef ANYBLOB_LIBCXX_COMPAT
#include "network/resolver.hpp"
#include "network/cache.hpp"
#include <chrono>
#include <ext/pb_ds/assoc_container.hpp>
#include <ext/pb_ds/tree_policy.hpp>
Expand All @@ -20,8 +20,8 @@ namespace anyblob {
//---------------------------------------------------------------------------
namespace network {
//---------------------------------------------------------------------------
/// Implements the AWS Resolver logic
class ThroughputResolver : public network::Resolver {
/// Implements the throughput-based cache logic
class ThroughputCache : public network::Cache {
/// Order statistic tree
typedef __gnu_pbds::tree<
double,
Expand All @@ -43,7 +43,7 @@ class ThroughputResolver : public network::Resolver {

public:
/// The constructor
explicit ThroughputResolver(unsigned cacheEntries);
explicit ThroughputCache(unsigned cacheEntries);
/// The address resolving
virtual const addrinfo* resolve(std::string hostname, std::string port, bool& reuse) override;
/// Start the timing
Expand All @@ -53,7 +53,7 @@ class ThroughputResolver : public network::Resolver {
/// Clears the used server from the cache
virtual void shutdownSocket(int fd) override;
/// The destructor
virtual ~ThroughputResolver() noexcept = default;
virtual ~ThroughputCache() noexcept = default;
};
//---------------------------------------------------------------------------
}; // namespace network
Expand Down
8 changes: 4 additions & 4 deletions src/cloud/aws.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#include "cloud/aws.hpp"
#include "cloud/aws_resolver.hpp"
#include "cloud/aws_cache.hpp"
#include "cloud/http.hpp"
#include "network/http_helper.hpp"
#include "network/original_message.hpp"
Expand Down Expand Up @@ -304,12 +304,12 @@ void AWS::getSecret()
}
}
//---------------------------------------------------------------------------
void AWS::initResolver(network::TaskedSendReceiverHandle& sendReceiverHandle)
// Inits the resolver
void AWS::initCache(network::TaskedSendReceiverHandle& sendReceiverHandle)
// Inits the cache
{
assert(sendReceiverHandle.get());
if (_type == Provider::CloudService::AWS) {
sendReceiverHandle.get()->addResolver("amazonaws.com", unique_ptr<network::Resolver>(new cloud::AWSResolver(sendReceiverHandle.getGroup()->getConcurrentRequests())));
sendReceiverHandle.get()->addCache("amazonaws.com", unique_ptr<network::Cache>(new cloud::AWSCache(sendReceiverHandle.getGroup()->getConcurrentRequests())));
}
}
//---------------------------------------------------------------------------
Expand Down
8 changes: 4 additions & 4 deletions src/cloud/aws_resolver.cpp → src/cloud/aws_cache.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#include "cloud/aws_resolver.hpp"
#include "cloud/aws_cache.hpp"
#include <cstring>
#include <limits>
#include <stdexcept>
Expand All @@ -18,12 +18,12 @@ namespace cloud {
//---------------------------------------------------------------------------
using namespace std;
//---------------------------------------------------------------------------
AWSResolver::AWSResolver(unsigned entries) : Resolver(entries), _mtuCache()
AWSCache::AWSCache(unsigned entries) : Cache(entries), _mtuCache()
// Constructor
{
}
//---------------------------------------------------------------------------
const addrinfo* AWSResolver::resolve(string hostname, string port, bool& oldAddress)
const addrinfo* AWSCache::resolve(string hostname, string port, bool& oldAddress)
// Resolve the request
{
auto addrPos = _addrCtr % static_cast<unsigned>(_addrString.size());
Expand All @@ -43,7 +43,7 @@ const addrinfo* AWSResolver::resolve(string hostname, string port, bool& oldAddr
throw runtime_error("hostname getaddrinfo error");
}
_addr[addrPos].reset(temp);
if (!Resolver::tld(hostname).compare("amazonaws.com")) {
if (!Cache::tld(hostname).compare("amazonaws.com")) {
struct sockaddr_in* p = reinterpret_cast<sockaddr_in*>(_addr[addrPos]->ai_addr);
auto ipAsInt = p->sin_addr.s_addr;
auto it = _mtuCache.find(ipAsInt);
Expand Down
2 changes: 1 addition & 1 deletion src/cloud/azure.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#include "cloud/http.hpp"
#include "network/http_helper.hpp"
#include "network/original_message.hpp"
#include "network/resolver.hpp"
#include "network/cache.hpp"
#include "network/tasked_send_receiver.hpp"
#include "utils/data_vector.hpp"
#include <algorithm>
Expand Down
2 changes: 1 addition & 1 deletion src/cloud/gcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#include "cloud/gcp_signer.hpp"
#include "network/http_helper.hpp"
#include "network/original_message.hpp"
#include "network/resolver.hpp"
#include "network/cache.hpp"
#include "network/tasked_send_receiver.hpp"
#include "utils/data_vector.hpp"
#include <chrono>
Expand Down
8 changes: 4 additions & 4 deletions src/network/resolver.cpp → src/network/cache.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#include "network/resolver.hpp"
#include "network/cache.hpp"
#include <cstring>
#include <limits>
#include <stdexcept>
Expand All @@ -17,7 +17,7 @@ namespace network {
//---------------------------------------------------------------------------
using namespace std;
//---------------------------------------------------------------------------
string_view Resolver::tld(string_view domain)
string_view Cache::tld(string_view domain)
// String prefix
{
auto pos = domain.find_last_of('.');
Expand All @@ -31,7 +31,7 @@ string_view Resolver::tld(string_view domain)
return string_view();
}
//---------------------------------------------------------------------------
Resolver::Resolver(unsigned entries)
Cache::Cache(unsigned entries)
// Constructor
{
_addr.reserve(entries);
Expand All @@ -42,7 +42,7 @@ Resolver::Resolver(unsigned entries)
_addrCtr = 0;
}
//---------------------------------------------------------------------------
const addrinfo* Resolver::resolve(string hostname, string port, bool& oldAddress)
const addrinfo* Cache::resolve(string hostname, string port, bool& oldAddress)
// Resolve the request
{
auto addrPos = _addrCtr % static_cast<unsigned>(_addrString.size());
Expand Down
Loading

0 comments on commit 2eac024

Please sign in to comment.