Skip to content

Commit

Permalink
When the network connection wants to know if we're currently blocking…
Browse files Browse the repository at this point in the history
… on a read/write, it should check with the linux_event_watcher_t directly. Closes #331.
  • Loading branch information
timmaxw committed May 14, 2011
1 parent a783f4f commit f8a37ef
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 23 deletions.
29 changes: 24 additions & 5 deletions src/arch/linux/event_queue.cc
Expand Up @@ -101,7 +101,7 @@ struct linux_event_watcher_guts_t :
signal_t *aborter;

void watch(const boost::function<void()> &cb, signal_t *ab) {
rassert(!callback && !aborter);
rassert(!is_watching());
rassert(cb && ab);
if (!ab->is_pulsed()) {
callback = cb;
Expand All @@ -111,8 +111,14 @@ struct linux_event_watcher_guts_t :
}
}

bool is_watching() {
/* We should have neither or both a callback and an aborter */
rassert(callback.empty() == (aborter == NULL));
return !callback.empty();
}

void pulse() {
rassert(callback && aborter, "%p got a pulse() when event mask is %d", parent, parent->old_mask);
rassert(is_watching(), "%p got a pulse() when event mask is %d", parent, parent->old_mask);
aborter->remove_waiter(this);
boost::function<void()> temp = callback;
callback = 0;
Expand All @@ -122,7 +128,7 @@ struct linux_event_watcher_guts_t :
}

void on_signal_pulsed() {
rassert(callback && aborter);
rassert(is_watching());
callback = 0;
aborter = NULL;
parent->remask();
Expand Down Expand Up @@ -168,13 +174,22 @@ struct linux_event_watcher_guts_t :
handler->watch(cb, ab);
}

bool is_watching(int event) {
assert_thread();
switch (event) {
case poll_event_in: return read_handler.is_watching();
case poll_event_out: return write_handler.is_watching();
default: crash("expected poll_event_in or poll_event_out");
}
}

void remask() {
/* Change our registration with the event queue depending on what events
we are actually waiting for. */

int new_mask = 0;
if (read_handler.callback) new_mask |= poll_event_in;
if (write_handler.callback) new_mask |= poll_event_out;
if (read_handler.is_watching()) new_mask |= poll_event_in;
if (write_handler.is_watching()) new_mask |= poll_event_out;

if (old_mask != new_mask) {
linux_thread_pool_t::thread->queue.adjust_resource(fd, new_mask, this);
Expand All @@ -194,3 +209,7 @@ void linux_event_watcher_t::watch(int event, const boost::function<void()> &cb,
guts->watch(event, cb, ab);
}

bool linux_event_watcher_t::is_watching(int event) {
return guts->is_watching(event);
}

5 changes: 5 additions & 0 deletions src/arch/linux/event_queue.hpp
Expand Up @@ -79,6 +79,11 @@ struct linux_event_watcher_t {
cancelled. */
void watch(int event, const boost::function<void()> &callback, signal_t *aborter);

/* Returns `true` if `watch()` was called for events of type `event` but has
not completed or been aborted yet. `event` should be `poll_event_in` or
`poll_event_out`. */
bool is_watching(int event);

private:
/* The guts are a separate object so that if one of the callbacks we call destroys us,
we don't have to destroy the guts immediately. */
Expand Down
53 changes: 35 additions & 18 deletions src/arch/linux/network.cc
Expand Up @@ -511,30 +511,47 @@ void linux_tcp_conn_t::on_event(int events) {
/* This is called by linux_event_watcher_t when error events occur. Ordinary
poll_event_in/poll_event_out events are not sent through this function. */

if (events == (poll_event_err | poll_event_hup) && write_in_progress) {
/* We get this when the socket is closed but there is still data we are trying to send.
For example, it can sometimes be reproduced by sending "nonsense\r\n" and then sending
"set [key] 0 0 [length] noreply\r\n[value]\r\n" a hundred times then immediately closing the
socket.
bool reading = event_watcher->is_watching(poll_event_in);
bool writing = event_watcher->is_watching(poll_event_out);

I speculate that the "error" part comes from the fact that there is undelivered data
in the socket send buffer, and the "hup" part comes from the fact that the remote end
has hung up.
if (events == (poll_event_err | poll_event_hup)) {

The same can happen for reads, see next case. */
/* HEY: What's the significance of these 'if' statements? Do they actually make
any sense? Why don't we just close both halves of the socket? */

on_shutdown_write();
if (writing) {
/* We get this when the socket is closed but there is still data we are trying to send.
For example, it can sometimes be reproduced by sending "nonsense\r\n" and then sending
"set [key] 0 0 [length] noreply\r\n[value]\r\n" a hundred times then immediately closing
the socket.
} else if (events == (poll_event_err | poll_event_hup) && read_in_progress) {
/* See description for write case above */
on_shutdown_read();
I speculate that the "error" part comes from the fact that there is undelivered data
in the socket send buffer, and the "hup" part comes from the fact that the remote end
has hung up.
} else if (events & poll_event_err) {
/* We don't know why we got this, so shut the hell down. */
logERR("Unexpected poll_event_err. events=%s, read=%s, write=%s\n",
The same can happen for reads, see next case. */

on_shutdown_write();
}

if (reading) {
/* See description for write case above */
on_shutdown_read();
}

if (!reading && !writing) {
/* We often get a combination of poll_event_err and poll_event_hup when a socket
suddenly disconnects. It seems safe to assume it just indicates a hang-up. */
if (!read_closed.is_pulsed()) shutdown_read();
if (!write_closed.is_pulsed()) shutdown_write();
}

} else {
/* We don't know why we got this, so log it and then shut down */
logERR("Unexpected epoll err/hup/rdhup. events=%s, reading=%s, writing=%s\n",
format_poll_event(events).c_str(),
read_in_progress ? "yes" : "no",
write_in_progress ? "yes" : "no");
reading ? "yes" : "no",
writing ? "yes" : "no");
if (!read_closed.is_pulsed()) shutdown_read();
if (!write_closed.is_pulsed()) shutdown_write();
}
Expand Down

0 comments on commit f8a37ef

Please sign in to comment.