Skip to content

Commit

Permalink
Consider the data source's initial read_index in InputChannelSender.
Browse files Browse the repository at this point in the history
  • Loading branch information
cuveland committed Nov 1, 2016
1 parent 85861eb commit 7f08fa7
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 10 deletions.
13 changes: 9 additions & 4 deletions lib/fles_rdma/InputChannelSender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ InputChannelSender::InputChannelSender(
min_acked_desc_(data_source.desc_buffer().size() / 4),
min_acked_data_(data_source.data_buffer().size() / 4)
{
start_index_desc_ = sent_desc_ = acked_desc_ = cached_acked_desc_ =
data_source.get_read_index().desc;
start_index_data_ = sent_data_ = acked_data_ = cached_acked_data_ =
data_source.get_read_index().data;

size_t min_ack_buffer_size =
data_source_.desc_buffer().size() / timeslice_size_ + 1;
ack_.alloc_with_size(min_ack_buffer_size);
Expand Down Expand Up @@ -166,7 +171,7 @@ void InputChannelSender::operator()()
}

// wait for pending send completions
while (acked_desc_ < timeslice_size_ * timeslice) {
while (acked_desc_ < timeslice_size_ * timeslice + start_index_desc_) {
poll_completion();
scheduler_.timer();
}
Expand Down Expand Up @@ -200,7 +205,7 @@ void InputChannelSender::operator()()
bool InputChannelSender::try_send_timeslice(uint64_t timeslice)
{
// wait until a complete timeslice is available in the input buffer
uint64_t desc_offset = timeslice * timeslice_size_;
uint64_t desc_offset = timeslice * timeslice_size_ + start_index_desc_;
uint64_t desc_length = timeslice_size_ + overlap_size_;

if (write_index_desc_ < desc_offset + desc_length) {
Expand Down Expand Up @@ -455,14 +460,14 @@ void InputChannelSender::on_completion(const struct ibv_wc& wc)
int cn = (wc.wr_id >> 8) & 0xFFFF;
conn_[cn]->on_complete_write();

uint64_t acked_ts = acked_desc_ / timeslice_size_;
uint64_t acked_ts = (acked_desc_ - start_index_desc_) / timeslice_size_;
if (ts == acked_ts)
do
++acked_ts;
while (ack_.at(acked_ts) > ts);
else
ack_.at(ts) = ts;
acked_desc_ = acked_ts * timeslice_size_;
acked_desc_ = acked_ts * timeslice_size_ + start_index_desc_;
acked_data_ = data_source_.desc_buffer().at(acked_desc_ - 1).offset +
data_source_.desc_buffer().at(acked_desc_ - 1).size;
if (acked_data_ >= cached_acked_data_ + min_acked_data_ ||
Expand Down
15 changes: 9 additions & 6 deletions lib/fles_rdma/InputChannelSender.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,19 +80,19 @@ class InputChannelSender : public IBConnectionGroup<InputChannelConnection>
RingBuffer<uint64_t, true> ack_;

/// Number of acknowledged microslices. Written to FLIB.
uint64_t acked_desc_ = 0;
uint64_t acked_desc_;

/// Number of acknowledged data bytes. Written to FLIB.
uint64_t acked_data_ = 0;
uint64_t acked_data_;

/// Data source (e.g., FLIB).
InputBufferReadInterface& data_source_;

/// Number of sent microslices, for statistics.
uint64_t sent_desc_ = 0;
uint64_t sent_desc_;

/// Number of sent data bytes, for statistics.
uint64_t sent_data_ = 0;
uint64_t sent_data_;

const std::vector<std::string> compute_hostnames_;
const std::vector<std::string> compute_services_;
Expand All @@ -104,8 +104,11 @@ class InputChannelSender : public IBConnectionGroup<InputChannelConnection>
const uint64_t min_acked_desc_;
const uint64_t min_acked_data_;

uint64_t cached_acked_data_ = 0;
uint64_t cached_acked_desc_ = 0;
uint64_t cached_acked_desc_;
uint64_t cached_acked_data_;

uint64_t start_index_desc_;
uint64_t start_index_data_;

uint64_t write_index_desc_ = 0;

Expand Down

0 comments on commit 7f08fa7

Please sign in to comment.