Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions src/request_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,24 @@ RequestHandler::RequestHandler(const Request::ConstPtr& request, const ResponseF
, manager_(NULL)
, metrics_(metrics) {}

RequestHandler::~RequestHandler() {
if (Logger::log_level() >= CASS_LOG_TRACE) {
OStringStream ss;
for (RequestTryVec::const_iterator it = request_tries_.begin(), end = request_tries_.end();
it != end; ++it) {
if (it != request_tries_.begin()) ss << ", ";
ss << "(" << it->address << ", ";
if (it->error != CASS_OK) {
ss << cass_error_desc(it->error);
} else {
ss << it->latency;
}
ss << ")";
}
LOG_TRACE("Speculative execution attempts: [%s]", ss.str().c_str());
}
}

void RequestHandler::set_prepared_metadata(const PreparedMetadata::Entry::Ptr& entry) {
wrapper_.set_prepared_metadata(entry);
}
Expand Down Expand Up @@ -269,6 +287,10 @@ void RequestHandler::set_response(const Host::Ptr& host, const Response::Ptr& re
metrics_->record_speculative_request(uv_hrtime() - start_time_ns_);
}
}

if (Logger::log_level() >= CASS_LOG_TRACE) {
request_tries_.push_back(RequestTry(host->address(), uv_hrtime() - start_time_ns_));
}
}

void RequestHandler::set_error(CassError code, const String& message) {
Expand All @@ -289,6 +311,9 @@ void RequestHandler::set_error(const Host::Ptr& host, CassError code, const Stri
set_error(code, message);
}
}
if (Logger::log_level() >= CASS_LOG_TRACE) {
request_tries_.push_back(RequestTry(host->address(), code));
}
}

void RequestHandler::set_error_with_error_response(const Host::Ptr& host,
Expand All @@ -297,6 +322,9 @@ void RequestHandler::set_error_with_error_response(const Host::Ptr& host,
stop_request();
running_executions_--;
future_->set_error_with_response(host->address(), error, code, message);
if (Logger::log_level() >= CASS_LOG_TRACE) {
request_tries_.push_back(RequestTry(host->address(), code));
}
}

void RequestHandler::stop_timer() { timer_.stop(); }
Expand Down
25 changes: 25 additions & 0 deletions src/request_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,28 @@ class ExecutionProfile;
class Timer;
class TokenMap;

struct RequestTry {
RequestTry()
: error(CASS_OK)
, latency(0) {}

RequestTry(const Address& address, uint64_t latency)
: address(address)
, error(CASS_OK)
, latency(latency / (1000 * 1000)) {} // To milliseconds

RequestTry(const Address& address, CassError error)
: address(address)
, error(error)
, latency(0) {}

Address address;
CassError error;
uint64_t latency;
};

typedef SmallVector<RequestTry, 2> RequestTryVec;

class ResponseFuture : public Future {
public:
typedef SharedRefPtr<ResponseFuture> Ptr;
Expand Down Expand Up @@ -138,6 +160,7 @@ class RequestHandler : public RefCounted<RequestHandler> {

RequestHandler(const Request::ConstPtr& request, const ResponseFuture::Ptr& future,
Metrics* metrics = NULL);
~RequestHandler();

void set_prepared_metadata(const PreparedMetadata::Entry::Ptr& entry);

Expand Down Expand Up @@ -210,6 +233,8 @@ class RequestHandler : public RefCounted<RequestHandler> {
ConnectionPoolManager* manager_;

Metrics* const metrics_;

RequestTryVec request_tries_;
};

class KeyspaceChangedResponse {
Expand Down
2 changes: 2 additions & 0 deletions src/token_map.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class TokenMap : public RefCounted<TokenMap> {

virtual const CopyOnWriteHostVec& get_replicas(const String& keyspace_name,
const String& routing_key) const = 0;

virtual String dump(const String& keyspace_name) const = 0;
};

}}} // namespace datastax::internal::core
Expand Down
92 changes: 83 additions & 9 deletions src/token_map_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@

#include <algorithm>
#include <assert.h>
#include <iomanip>
#include <ios>
#include <uv.h>

#define CASS_NETWORK_TOPOLOGY_STRATEGY "NetworkTopologyStrategy"
Expand Down Expand Up @@ -143,12 +145,32 @@ class ByteOrderedPartitioner {
static StringRef name() { return "ByteOrderedPartitioner"; }
};

inline std::ostream& operator<<(std::ostream& os, const RandomPartitioner::Token& token) {
os << std::setfill('0') << std::setw(16) << std::hex << token.hi << std::setfill('0')
<< std::setw(16) << std::hex << token.lo;
return os;
}

inline std::ostream& operator<<(std::ostream& os, const ByteOrderedPartitioner::Token& token) {
for (ByteOrderedPartitioner::Token::const_iterator it = token.begin(), end = token.end();
it != end; ++it) {
os << std::hex << *it;
}
return os;
}

class HostSet : public DenseHashSet<Host::Ptr> {
public:
HostSet() {
set_empty_key(Host::Ptr(new Host(Address::EMPTY_KEY)));
set_deleted_key(Host::Ptr(new Host(Address::DELETED_KEY)));
}

template <class InputIterator>
HostSet(InputIterator first, InputIterator last)
: DenseHashSet<Host::Ptr>(first, last, Host::Ptr(new Host(Address::EMPTY_KEY))) {
set_deleted_key(Host::Ptr(new Host(Address::DELETED_KEY)));
}
};

class RackSet : public DenseHashSet<uint32_t> {
Expand Down Expand Up @@ -355,6 +377,17 @@ void ReplicationStrategy<Partitioner>::build_replicas(const TokenHostVec& tokens
}
}

// Adds unique replica. It returns true if the replica was added.
inline bool add_replica(CopyOnWriteHostVec& hosts, const Host::Ptr& host) {
for (HostVec::const_reverse_iterator it = hosts->rbegin(); it != hosts->rend(); ++it) {
if ((*it)->address() == host->address()) {
return false; // Already in the replica set
}
}
hosts->push_back(host);
return true;
}

template <class Partitioner>
void ReplicationStrategy<Partitioner>::build_replicas_network_topology(
const TokenHostVec& tokens, const DatacenterMap& datacenters, TokenReplicasVec& result) const {
Expand Down Expand Up @@ -443,24 +476,27 @@ void ReplicationStrategy<Partitioner>::build_replicas_network_topology(
// datacenter only then consider hosts in the same rack

if (rack == 0 || racks_observed_this_dc.size() == rack_count_this_dc) {
++replica_count_this_dc;
replicas->push_back(Host::Ptr(host));
if (add_replica(replicas, Host::Ptr(host))) {
++replica_count_this_dc;
}
} else {
TokenHostQueue& skipped_endpoints_this_dc = dc_rack_info.skipped_endpoints;
if (racks_observed_this_dc.count(rack) > 0) {
skipped_endpoints_this_dc.push_back(curr_token_it);
} else {
++replica_count_this_dc;
replicas->push_back(Host::Ptr(host));
racks_observed_this_dc.insert(rack);
if (add_replica(replicas, Host::Ptr(host))) {
++replica_count_this_dc;
racks_observed_this_dc.insert(rack);
}

// Once we visited every rack in the current datacenter then starting considering
// hosts we've already skipped.
if (racks_observed_this_dc.size() == rack_count_this_dc) {
while (!skipped_endpoints_this_dc.empty() &&
replica_count_this_dc < replication_factor) {
++replica_count_this_dc;
replicas->push_back(Host::Ptr(skipped_endpoints_this_dc.front()->second));
if (add_replica(replicas, Host::Ptr(skipped_endpoints_this_dc.front()->second))) {
++replica_count_this_dc;
}
skipped_endpoints_this_dc.pop_front();
}
}
Expand All @@ -484,9 +520,10 @@ void ReplicationStrategy<Partitioner>::build_replicas_simple(const TokenHostVec&
for (typename TokenHostVec::const_iterator i = tokens.begin(), end = tokens.end(); i != end;
++i) {
CopyOnWriteHostVec replicas(new HostVec());
replicas->reserve(num_replicas);
typename TokenHostVec::const_iterator token_it = i;
do {
replicas->push_back(Host::Ptr(token_it->second));
add_replica(replicas, Host::Ptr(Host::Ptr(token_it->second)));
++token_it;
if (token_it == tokens.end()) {
token_it = tokens.begin();
Expand Down Expand Up @@ -578,7 +615,11 @@ class TokenMapImpl : public TokenMap {
virtual const CopyOnWriteHostVec& get_replicas(const String& keyspace_name,
const String& routing_key) const;

// Test only
virtual String dump(const String& keyspace_name) const;

public:
// Testing only

bool contains(const Token& token) const {
for (typename TokenHostVec::const_iterator i = tokens_.begin(), end = tokens_.end(); i != end;
++i) {
Expand All @@ -587,6 +628,8 @@ class TokenMapImpl : public TokenMap {
return false;
}

const TokenReplicasVec& token_replicas(const String& keyspace_name) const;

private:
void update_keyspace(const VersionNumber& cassandra_version, const ResultResponse* result,
bool should_build_replicas);
Expand Down Expand Up @@ -713,6 +756,35 @@ const CopyOnWriteHostVec& TokenMapImpl<Partitioner>::get_replicas(const String&
return no_replicas_dummy_;
}

template <class Partitioner>
String TokenMapImpl<Partitioner>::dump(const String& keyspace_name) const {
String result;
typename KeyspaceReplicaMap::const_iterator ks_it = replicas_.find(keyspace_name);
const TokenReplicasVec& replicas = ks_it->second;

for (typename TokenReplicasVec::const_iterator it = replicas.begin(), end = replicas.end();
it != end; ++it) {
OStringStream ss;
ss << std::setw(20) << it->first << " [ ";
const CopyOnWriteHostVec& hosts = it->second;
for (HostVec::const_iterator host_it = hosts->begin(), end = hosts->end(); host_it != end;
++host_it) {
ss << (*host_it)->address_string() << " ";
}
ss << "]\n";
result.append(ss.str());
}
return result;
}

template <class Partitioner>
const typename TokenMapImpl<Partitioner>::TokenReplicasVec&
TokenMapImpl<Partitioner>::token_replicas(const String& keyspace_name) const {
typename KeyspaceReplicaMap::const_iterator ks_it = replicas_.find(keyspace_name);
static TokenReplicasVec not_found;
return ks_it != replicas_.end() ? ks_it->second : not_found;
}

template <class Partitioner>
void TokenMapImpl<Partitioner>::update_keyspace(const VersionNumber& cassandra_version,
const ResultResponse* result,
Expand Down Expand Up @@ -773,6 +845,8 @@ void TokenMapImpl<Partitioner>::build_replicas() {
const String& keyspace_name = i->first;
const ReplicationStrategy<Partitioner>& strategy = i->second;
strategy.build_replicas(tokens_, datacenters_, replicas_[keyspace_name]);
LOG_TRACE("Replicas for keyspace '%s':\n%s", keyspace_name.c_str(),
dump(keyspace_name).c_str());
}
}

Expand Down
Loading