Skip to content

Commit

Permalink
Split up the thread main functions in fles_zeromq.
Browse files Browse the repository at this point in the history
  • Loading branch information
cuveland committed Dec 3, 2016
1 parent b6f78bf commit a58a16e
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 87 deletions.
53 changes: 35 additions & 18 deletions lib/fles_zeromq/ComponentSenderZeromq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,30 +52,47 @@ ComponentSenderZeromq::~ComponentSenderZeromq()
}

void ComponentSenderZeromq::operator()()
{
run_begin();
while (acked_ts2_ / 2 < max_timeslice_number_ && *signal_status_ == 0) {
run_cycle();
scheduler_.timer();
}
run_end();
}

void ComponentSenderZeromq::run_begin()
{
data_source_.proceed();
time_begin_ = std::chrono::high_resolution_clock::now();

report_status();
while (acked_ts2_ / 2 < max_timeslice_number_ && *signal_status_ == 0) {
zmq_msg_t request;
int rc = zmq_msg_init(&request);
assert(rc == 0);

int len = zmq_msg_recv(&request, socket_, 0);
if (len == -1 && errno == EAGAIN) {
continue;
}
assert(len != -1);
}

assert(len == sizeof(uint64_t));
uint64_t timeslice = *static_cast<uint64_t*>(zmq_msg_data(&request));
zmq_msg_close(&request);
bool ComponentSenderZeromq::run_cycle()
{
zmq_msg_t request;
int rc = zmq_msg_init(&request);
assert(rc == 0);

try_send_timeslice(timeslice);
data_source_.proceed();
scheduler_.timer();
int len = zmq_msg_recv(&request, socket_, 0);
if (len == -1 && errno == EAGAIN) {
// timeout reached
return true;
}
assert(len != -1);

assert(len == sizeof(uint64_t));
uint64_t timeslice = *static_cast<uint64_t*>(zmq_msg_data(&request));
zmq_msg_close(&request);

try_send_timeslice(timeslice);
data_source_.proceed();

return true;
}

void ComponentSenderZeromq::run_end()
{
sync_data_source();
time_end_ = std::chrono::high_resolution_clock::now();
}
Expand Down Expand Up @@ -260,7 +277,7 @@ void ComponentSenderZeromq::report_status()
<< human_readable_count(status_data.acked, true) << " ("
<< human_readable_count(rate_data, true, "B/s") << ")";

L_(info) << "[i" << input_index_ << "] |"
L_(info) << "[i" << input_index_ << "] |"
<< bar_graph(status_data.vector(), "#x._", 20) << "|"
<< bar_graph(status_desc.vector(), "#x._", 10) << "| "
<< human_readable_count(rate_data, true, "B/s") << " ("
Expand Down
9 changes: 9 additions & 0 deletions lib/fles_zeromq/ComponentSenderZeromq.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,15 @@ class ComponentSenderZeromq
/// Scheduler for periodic events.
Scheduler scheduler_;

/// Setup at begin of run.
void run_begin();

/// A single cycle in the main run loop.
bool run_cycle();

/// Cleanup at end of run.
void run_end();

/// The central function for distributing timeslice data.
bool try_send_timeslice(uint64_t timeslice);

Expand Down
159 changes: 90 additions & 69 deletions lib/fles_zeromq/TimesliceBuilderZeromq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,82 +61,98 @@ TimesliceBuilderZeromq::~TimesliceBuilderZeromq()

void TimesliceBuilderZeromq::operator()()
{
assert(connections_.size() > 0);
run_begin();
while (ts_index_ < max_timeslice_number_ && *signal_status_ == 0) {
run_cycle();
scheduler_.timer();
}
run_end();
}

void TimesliceBuilderZeromq::run_begin()
{
assert(connections_.size() > 0);
time_begin_ = std::chrono::high_resolution_clock::now();
report_status();
}

while (ts_index_ < max_timeslice_number_ && *signal_status_ == 0) {
for (auto& c : connections_) {

std::size_t msg_size;
do {
// send request for timeslice data
int rc;
do {
rc = zmq_send(c->socket, &ts_index_, sizeof(ts_index_), 0);
} while (rc == -1 && errno == EAGAIN && *signal_status_ == 0);
if (*signal_status_ != 0) {
break;
}

// receive desc answer (part 1), do not release
rc = zmq_msg_init(&c->desc_msg);
assert(rc == 0);
do {
rc = zmq_msg_recv(&c->desc_msg, c->socket, 0);
} while (rc == -1 && errno == EAGAIN && *signal_status_ == 0);
if (*signal_status_ != 0) {
break;
}
assert(rc != -1);
msg_size = zmq_msg_size(&c->desc_msg);
if (msg_size == 0) {
zmq_msg_close(&c->desc_msg);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
} while (msg_size == 0);
if (*signal_status_ != 0) {
break;
}
bool TimesliceBuilderZeromq::run_cycle()
{
auto& c = connections_.at(conn_);

// receive data answer (part 2), do not release
assert(zmq_msg_more(&c->desc_msg));
int rc = zmq_msg_init(&c->data_msg);
assert(rc == 0);
do {
rc = zmq_msg_recv(&c->data_msg, c->socket, 0);
} while (rc == -1 && errno == EAGAIN && *signal_status_ == 0);
if (*signal_status_ != 0) {
break;
}
assert(rc != -1);
std::size_t msg_size;

uint64_t size_required =
zmq_msg_size(&c->desc_msg) + zmq_msg_size(&c->data_msg);
// send request for timeslice data
int rc;
do {
rc = zmq_send(c->socket, &ts_index_, sizeof(ts_index_), 0);
} while (rc == -1 && errno == EAGAIN && *signal_status_ == 0);
if (*signal_status_ != 0) {
return true;
}

while (c->data.size_available_contiguous() < size_required ||
c->desc.size_available() < 1) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
handle_timeslice_completions();
}
// receive desc answer (part 1), do not release
rc = zmq_msg_init(&c->desc_msg);
assert(rc == 0);
do {
rc = zmq_msg_recv(&c->desc_msg, c->socket, 0);
} while (rc == -1 && errno == EAGAIN && *signal_status_ == 0);
if (*signal_status_ != 0) {
return true;
}
assert(rc != -1);
msg_size = zmq_msg_size(&c->desc_msg);
if (msg_size == 0) {
zmq_msg_close(&c->desc_msg);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}

if (msg_size == 0 || *signal_status_ != 0) {
return true;
}

// receive data answer (part 2), do not release
assert(zmq_msg_more(&c->desc_msg));
rc = zmq_msg_init(&c->data_msg);
assert(rc == 0);
do {
rc = zmq_msg_recv(&c->data_msg, c->socket, 0);
} while (rc == -1 && errno == EAGAIN && *signal_status_ == 0);
if (*signal_status_ != 0) {
return true;
}
assert(rc != -1);

uint64_t size_required =
zmq_msg_size(&c->desc_msg) + zmq_msg_size(&c->data_msg);

while (c->data.size_available_contiguous() < size_required ||
c->desc.size_available() < 1) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
handle_timeslice_completions();
}

// skip remaining bytes in data buffer to avoid fractured entry
c->data.skip_buffer_wrap(size_required);

// generate timeslice component descriptor
assert(tpos_ == c->desc.write_index());
c->desc.append(
{ts_index_, c->data.write_index(), size_required,
zmq_msg_size(&c->desc_msg) / sizeof(fles::MicrosliceDescriptor)});

// copy into shared memory and release messages
c->data.append(static_cast<uint8_t*>(zmq_msg_data(&c->desc_msg)),
zmq_msg_size(&c->desc_msg));
c->data.append(static_cast<uint8_t*>(zmq_msg_data(&c->data_msg)),
zmq_msg_size(&c->data_msg));
zmq_msg_close(&c->desc_msg);
zmq_msg_close(&c->data_msg);

++conn_;
if (conn_ == connections_.size()) {
conn_ = 0;

// skip remaining bytes in data buffer to avoid fractured entry
c->data.skip_buffer_wrap(size_required);

// generate timeslice component descriptor
assert(tpos_ == c->desc.write_index());
c->desc.append({ts_index_, c->data.write_index(), size_required,
zmq_msg_size(&c->desc_msg) /
sizeof(fles::MicrosliceDescriptor)});

// copy into shared memory and release messages
c->data.append(static_cast<uint8_t*>(zmq_msg_data(&c->desc_msg)),
zmq_msg_size(&c->desc_msg));
c->data.append(static_cast<uint8_t*>(zmq_msg_data(&c->data_msg)),
zmq_msg_size(&c->data_msg));
zmq_msg_close(&c->desc_msg);
zmq_msg_close(&c->data_msg);
}
handle_timeslice_completions();

timeslice_buffer_.send_work_item(
Expand All @@ -149,6 +165,11 @@ void TimesliceBuilderZeromq::operator()()
ts_index_ += num_compute_nodes_;
}

return true;
}

void TimesliceBuilderZeromq::run_end()
{
time_end_ = std::chrono::high_resolution_clock::now();

// wait until all pending timeslices have been acknowledged
Expand Down
12 changes: 12 additions & 0 deletions lib/fles_zeromq/TimesliceBuilderZeromq.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ class TimesliceBuilderZeromq
/// The local buffer position of the timeslice currently being received.
uint64_t tpos_ = 0;

/// The index of the connection to receive from next.
uint64_t conn_ = 0;

/// Buffer to store acknowledged status of timeslices.
RingBuffer<uint64_t, true> ack_;

Expand Down Expand Up @@ -146,6 +149,15 @@ class TimesliceBuilderZeromq
/// Scheduler for periodic events.
Scheduler scheduler_;

/// Setup at begin of run.
void run_begin();

/// A single cycle in the main run loop.
bool run_cycle();

/// Cleanup at end of run.
void run_end();

/// Handle pending timeslice completions and advance read indexes.
void handle_timeslice_completions();

Expand Down

0 comments on commit a58a16e

Please sign in to comment.