Skip to content

Commit 69ccd35

Browse files
committed
WIP Speculative Execution #2
1 parent 28d5deb commit 69ccd35

File tree

4 files changed

+20
-13
lines changed

4 files changed

+20
-13
lines changed

src/io_worker.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,8 @@ void IOWorker::retry(const SpeculativeExecution::Ptr& speculative_execution) {
147147
return;
148148
}
149149

150+
printf("Executing %s...\n", speculative_execution->current_host()->address_string().c_str());
151+
150152
PoolMap::const_iterator it = pools_.find(speculative_execution->current_host()->address());
151153
if (it != pools_.end() && it->second->is_ready()) {
152154
const SharedRefPtr<Pool>& pool = it->second;

src/request_handler.cpp

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ RequestHandler::~RequestHandler() {
111111
}
112112

113113
void RequestHandler::add_execution(SpeculativeExecution* speculative_execution) {
114+
running_executions_++;
114115
speculative_execution->inc_ref();
115116
speculative_executions_.push_back(speculative_execution);
116117
}
@@ -127,26 +128,27 @@ void RequestHandler::schedule_next_execution(const Host::Ptr& current_host) {
127128
void RequestHandler::set_response(const Host::Ptr& host,
128129
const SharedRefPtr<Response>& response) {
129130
if (future_->set_response(host->address(), response)) {
131+
printf("Response from %s\n", host->address_string().c_str());
130132
finish();
131133
}
132134
}
133135

134136
void RequestHandler::set_error(CassError code,
135137
const std::string& message) {
136-
if (future_->set_error(code, message)) {
137-
finish();
138-
}
139138
}
140139

141140
void RequestHandler::set_error(const Host::Ptr& host,
142141
CassError code, const std::string& message) {
143-
if (host) {
144-
if (future_->set_error_with_address(host->address(), code, message)) {
145-
finish();
146-
}
147-
} else {
148-
if (future_->set_error(code, message)) {
149-
finish();
142+
bool skip = (code == CASS_ERROR_LIB_NO_HOSTS_AVAILABLE && --running_executions_ > 0);
143+
if (!skip) {
144+
if (host) {
145+
if (future_->set_error_with_address(host->address(), code, message)) {
146+
finish();
147+
}
148+
} else {
149+
if (future_->set_error(code, message)) {
150+
finish();
151+
}
150152
}
151153
}
152154
}
@@ -236,6 +238,7 @@ void SpeculativeExecution::execute(int64_t timeout) {
236238
if (timeout > 0) {
237239
timer_.start(request_handler_->io_worker()->loop(), timeout, this, on_execute);
238240
} else {
241+
// TODO: Need to move to next node when executing immediately (but not the first time :()
239242
if (request()->is_idempotent()) {
240243
request_handler_->schedule_next_execution(current_host_);
241244
}

src/request_handler.hpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,8 @@ class RequestHandler : public RefCounted<RequestHandler> {
114114
, timestamp_(request->timestamp())
115115
, future_(future)
116116
, retry_policy_(retry_policy)
117-
, io_worker_(NULL) { }
117+
, io_worker_(NULL)
118+
, running_executions_(0) { }
118119

119120
~RequestHandler();
120121

@@ -170,6 +171,7 @@ class RequestHandler : public RefCounted<RequestHandler> {
170171
ScopedPtr<SpeculativeExecutionPlan> execution_plan_;
171172
Host::Ptr first_host_;
172173
IOWorker* io_worker_;
174+
int running_executions_;
173175
SpeculativeExecutionVec speculative_executions_;
174176
Request::EncodingCache encoding_cache_;
175177
};

src/speculative_execution.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,10 @@ class NoSpeculativeExecutionPolicy : public SpeculativeExecutionPolicy {
6666
class TestSpeculativeExecutionPlan : public SpeculativeExecutionPlan {
6767
public:
6868
TestSpeculativeExecutionPlan()
69-
: count_(3) { }
69+
: count_(2) { }
7070

7171
virtual int64_t next_execution(const Host::Ptr& current_host) {
72-
return --count_ > 0 ? 100 : -1;
72+
return --count_ >= 0 ? 1 : -1;
7373
}
7474

7575
private:

0 commit comments

Comments
 (0)