Skip to content

Commit

Permalink
asyncio: Add some debugging logic for asyncio.
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Greear <greearb@candelatech.com>
  • Loading branch information
greearb committed Sep 6, 2011
1 parent 8a5068b commit bcd90c8
Showing 1 changed file with 68 additions and 18 deletions.
86 changes: 68 additions & 18 deletions xorp/libxorp/asyncio.cc
Expand Up @@ -44,6 +44,17 @@
# include "win_io.h"
#endif

static class TraceAIO {
public:
TraceAIO() {
_do_trace = !(getenv("AIOTRACE") == 0);
}
bool on() const { return _do_trace; }

protected:
bool _do_trace;
} aio_trace;


// ----------------------------------------------------------------------------
// Utility
Expand Down Expand Up @@ -118,6 +129,10 @@ AsyncFileReader::add_buffer(uint8_t* b, size_t b_bytes, const Callback& cb)
{
assert(b_bytes != 0);
_buffers.push_back(new BufferInfo(b, b_bytes, cb));
if (aio_trace.on()) {
XLOG_INFO("afr: %p add_buffer sz: %i buffers: %i\n",
this, b_bytes, _buffers.size());
}
}

void
Expand All @@ -128,6 +143,10 @@ AsyncFileReader::add_buffer_with_offset(uint8_t* b,
{
assert(off < b_bytes);
_buffers.push_back(new BufferInfo(b, b_bytes, off, cb));
if (aio_trace.on()) {
XLOG_INFO("afr: %p add_buffer_w/offset sz: %i buffers: %i\n",
this, b_bytes, _buffers.size());
}
}

#ifdef HOST_OS_WINDOWS
Expand Down Expand Up @@ -222,7 +241,10 @@ AsyncFileReader::read(XorpFd fd, IoEventType type)
errno = 0;
#endif // ! HOST_OS_WINDOWS

debug_msg("Read %d bytes\n", XORP_INT_CAST(done));
if (aio_trace.on()) {
XLOG_INFO("afr: %p Read %d bytes, last-err: %i\n",
this, XORP_INT_CAST(done), _last_error);
}

if (done < 0 && is_pseudo_error("AsyncFileReader", _fd, _last_error)) {
return;
Expand All @@ -237,11 +259,11 @@ AsyncFileReader::read(XorpFd fd, IoEventType type)
// or be delayed due to latency between the primary thread
// and the Winsock thread.
//
if (_fd.is_socket()) {
if (_fd.is_socket() && !_deferred_io_task.scheduled()) {
u_long remaining = 0;
int result = ioctlsocket(_fd.getSocket(), FIONREAD, &remaining);
if (result != SOCKET_ERROR && remaining > 0) {
_deferred_io_tasks = _eventloop.new_oneoff_task(
_deferred_io_task = _eventloop.new_oneoff_task(
callback(this, &AsyncFileReader::read, _fd, IOT_READ));
XLOG_ASSERT(_deferred_io_task.scheduled());
}
Expand Down Expand Up @@ -404,12 +426,16 @@ AsyncFileWriter::add_buffer(const uint8_t* b,
assert(b_bytes != 0);
_buffers.push_back(new BufferInfo(b, b_bytes, cb));
#ifdef EDGE_TRIGGERED_WRITES
if (_running == true) {
if (_running && !_deferred_io_task.scheduled()) {
_deferred_io_task = _eventloop.new_oneoff_task(
callback(this, &AsyncFileWriter::write, _fd, IOT_WRITE));
XLOG_ASSERT(_deferred_io_task.scheduled());
}
#endif // EDGE_TRIGGERED_WRITES
if (aio_trace.on()) {
XLOG_INFO("afw: %p add_buffer sz: %i buffers: %i\n",
this, b_bytes, _buffers.size());
}
}

void
Expand All @@ -422,12 +448,16 @@ AsyncFileWriter::add_buffer_sendto(const uint8_t* b,
assert(b_bytes != 0);
_buffers.push_back(new BufferInfo(b, b_bytes, dst_addr, dst_port, cb));
#ifdef EDGE_TRIGGERED_WRITES
if (_running == true) {
if (_running && !_deferred_io_task.scheduled()) {
_deferred_io_task = _eventloop.new_oneoff_task(
callback(this, &AsyncFileWriter::write, _fd, IOT_WRITE));
XLOG_ASSERT(_deferred_io_task.scheduled());
}
#endif // EDGE_TRIGGERED_WRITES
if (aio_trace.on()) {
XLOG_INFO("afw: %p add_buffer-sendto sz: %i buffers: %i\n",
this, b_bytes, _buffers.size());
}
}

void
Expand All @@ -439,12 +469,16 @@ AsyncFileWriter::add_buffer_with_offset(const uint8_t* b,
assert(off < b_bytes);
_buffers.push_back(new BufferInfo(b, b_bytes, off, cb));
#ifdef EDGE_TRIGGERED_WRITES
if (_running == true) {
if (_running && !_deferred_io_task.scheduled()) {
_deferred_io_task = _eventloop.new_oneoff_task(
callback(this, &AsyncFileWriter::write, _fd, IOT_WRITE));
XLOG_ASSERT(_deferred_io_task.scheduled());
}
#endif // EDGE_TRIGGERED_WRITES
if (aio_trace.on()) {
XLOG_INFO("afw: %p add_buffer-w/offset sz: %i buffers: %i\n",
this, b_bytes, _buffers.size());
}
}

void
Expand All @@ -454,12 +488,16 @@ AsyncFileWriter::add_data(const vector<uint8_t>& data,
assert(data.size() != 0);
_buffers.push_back(new BufferInfo(data, cb));
#ifdef EDGE_TRIGGERED_WRITES
if (_running == true) {
if (_running && !_deferred_io_task.scheduled()) {
_deferred_io_task = _eventloop.new_oneoff_task(
callback(this, &AsyncFileWriter::write, _fd, IOT_WRITE));
XLOG_ASSERT(_deferred_io_task.scheduled());
}
#endif // EDGE_TRIGGERED_WRITES
if (aio_trace.on()) {
XLOG_INFO("afw: %p add_data sz: %i buffers: %i\n",
this, data.size(), _buffers.size());
}
}

void
Expand All @@ -471,12 +509,16 @@ AsyncFileWriter::add_data_sendto(const vector<uint8_t>& data,
assert(data.size() != 0);
_buffers.push_back(new BufferInfo(data, dst_addr, dst_port, cb));
#ifdef EDGE_TRIGGERED_WRITES
if (_running == true) {
if (_running && !_deferred_io_task.scheduled()) {
_deferred_io_task = _eventloop.new_oneoff_task(
callback(this, &AsyncFileWriter::write, _fd, IOT_WRITE));
XLOG_ASSERT(_deferred_io_task.scheduled());
}
#endif // EDGE_TRIGGERED_WRITES
if (aio_trace.on()) {
XLOG_INFO("afw: %p add_data-sendto sz: %i buffers: %i\n",
this, data.size(), _buffers.size());
}
}


Expand Down Expand Up @@ -533,14 +575,17 @@ AsyncFileWriter::write(XorpFd fd, IoEventType type)
#endif

#ifdef EDGE_TRIGGERED_WRITES
if (_running == false)
if (!_running)
return;
#endif

assert(type == IOT_WRITE);
assert(fd == _fd);
assert(_buffers.empty() == false);

_last_error = 0;
errno = 0;

//
// Group together a number of buffers.
// If the buffer is sendto()-type, then send that buffer on its own.
Expand Down Expand Up @@ -634,14 +679,17 @@ AsyncFileWriter::write(XorpFd fd, IoEventType type)
int result = WSASend(_fd.getSocket(), (LPWSABUF)_iov, iov_cnt,
(LPDWORD)&done, 0, NULL, NULL);
_last_error = (result == SOCKET_ERROR) ? WSAGetLastError() : 0;
if (_last_error != 0)
if (_last_error != 0) {
done = -1;
debug_msg("writer: winsock error %d\n", _last_error);
}
} else {
// Non-socket handles take blocking writes.
// There is no writev() equivalent, so emulate it.
BOOL result = TRUE;
DWORD done2;
for (uint32_t j = 0; j < iov_cnt; j++) {
done2 = 0;
result = WriteFile(_fd, (LPVOID)_iov[j].iov_base,
(DWORD)_iov[j].iov_len, (LPDWORD)&done2,
NULL);
Expand All @@ -653,9 +701,6 @@ AsyncFileWriter::write(XorpFd fd, IoEventType type)
}
#else // ! HOST_OS_WINDOWS

errno = 0;
_last_error = 0;

if ((iov_cnt == 1) && (! mod_signals)) {
//
// No need for coalesce, so use send(2). This saves us
Expand All @@ -674,8 +719,11 @@ AsyncFileWriter::write(XorpFd fd, IoEventType type)
#endif // ! HOST_OS_WINDOWS
}

debug_msg("Wrote %d of %u bytes\n",
XORP_INT_CAST(done), XORP_UINT_CAST(total_bytes));
if (aio_trace.on()) {
XLOG_INFO("afw: %p Wrote %d of %u bytes, last-err: %i\n",
this, XORP_INT_CAST(done), XORP_UINT_CAST(total_bytes),
_last_error);
}

if (done < 0 && is_pseudo_error("AsyncFileWriter", _fd, _last_error)) {
XLOG_WARNING("Write error %d\n", _last_error);
Expand All @@ -685,7 +733,7 @@ AsyncFileWriter::write(XorpFd fd, IoEventType type)
complete_transfer(done);

#ifdef EDGE_TRIGGERED_WRITES
if (_buffers.empty() == false) {
if (!(_buffers.empty() || _deferred_io_task.scheduled())) {
_deferred_io_task = _eventloop.new_oneoff_task(
callback(this, &AsyncFileWriter::write, _fd, IOT_WRITE));
XLOG_ASSERT(_deferred_io_task.scheduled());
Expand Down Expand Up @@ -795,8 +843,10 @@ AsyncFileWriter::start()
}

#ifdef EDGE_TRIGGERED_WRITES
_deferred_io_task = _eventloop.new_oneoff_task(
callback(this, &AsyncFileWriter::write, _fd, IOT_WRITE));
if (!_deferred_io_task.scheduled()) {
_deferred_io_task = _eventloop.new_oneoff_task(
callback(this, &AsyncFileWriter::write, _fd, IOT_WRITE));
}
#endif // EDGE_TRIGGERED_WRITES

_running = true;
Expand Down

0 comments on commit bcd90c8

Please sign in to comment.