Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asio on Linux stalls in epoll() #180

Closed
jeremyd-gl opened this issue Jan 23, 2017 · 7 comments
Closed

Asio on Linux stalls in epoll() #180

jeremyd-gl opened this issue Jan 23, 2017 · 7 comments

Comments

@jeremyd-gl
Copy link

[ As posted on StackOverflow - http://stackoverflow.com/questions/41804866/asio-on-linux-stalls-in-epoll ]

We're experiencing a problem with asynchronous operation of standalone (non-Boost) Asio 1.10.6 on Linux, which is demonstrated using the following test app:

#define ASIO_STANDALONE
#define ASIO_HEADER_ONLY
#define ASIO_NO_EXCEPTIONS
#define ASIO_NO_TYPEID
#include "asio.hpp"

#include <chrono>
#include <iostream>
#include <list>
#include <map>
#include <thread>

static bool s_freeInboundSocket = false;
static bool s_freeOutboundSocket = false;

class Tester
{
public:
	Tester(asio::io_service& i_ioService, unsigned i_n)
		: m_inboundStrand(i_ioService)
		, m_listener(i_ioService)
		, m_outboundStrand(i_ioService)
		, m_resolver(i_ioService)
		, m_n(i_n)
		, m_traceStart(std::chrono::high_resolution_clock::now())
	{}

	~Tester()
	{}

	void TraceIn(unsigned i_line)
	{
		m_inboundTrace.emplace_back(i_line, std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::high_resolution_clock::now() - m_traceStart));
	}

	void AbortIn(unsigned i_line)
	{
		TraceIn(i_line);
		abort();
	}

	void TraceOut(unsigned i_line)
	{
		m_outboundTrace.emplace_back(i_line, std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::high_resolution_clock::now() - m_traceStart));
	}

	void AbortOut(unsigned i_line)
	{
		TraceOut(i_line);
		abort();
	}

	void DumpTrace(std::map<unsigned, unsigned>& o_counts)
	{
		std::cout << "## " << m_n << " ##\n";
		std::cout << "-- " << m_traceStart.time_since_epoch().count() << "\n";
		std::cout << "- in -             - out -\n";

		auto in = m_inboundTrace.begin();
		auto out = m_outboundTrace.begin();

		while ((in != m_inboundTrace.end()) || (out != m_outboundTrace.end()))
		{
			if (in == m_inboundTrace.end())	
			{
				++o_counts[out->first];

				std::cout << "                  " << out->first << " : " << out->second.count() << "\n";
				++out;
			}
			else if (out == m_outboundTrace.end())	
			{
				++o_counts[in->first];

				std::cout << in->first << " : " << in->second.count() << "\n";
				++in;
			}
			else if (out->second < in->second)
			{
				++o_counts[out->first];

				std::cout << "                  " << out->first << " : " << out->second.count() << "\n";
				++out;
			}
			else
			{
				++o_counts[in->first];

				std::cout << in->first << " : " << in->second.count() << "\n";
				++in;
			}
		}
		std::cout << std::endl;
	}

	//////////////
	// Inbound

	void Listen(uint16_t i_portBase)
	{
		m_inboundSocket.reset(new asio::ip::tcp::socket(m_inboundStrand.get_io_service()));
 
		asio::error_code ec;
		if (m_listener.open(asio::ip::tcp::v4(), ec)
		 || m_listener.bind(asio::ip::tcp::endpoint(asio::ip::tcp::v4(), i_portBase+m_n), ec)
		 || m_listener.listen(-1, ec))
		{
			AbortIn(__LINE__); return;
		}

		TraceIn(__LINE__);

		m_listener.async_accept(*m_inboundSocket,
			m_inboundStrand.wrap([this](const asio::error_code& i_error)
		{
			OnInboundAccepted(i_error);
		}));
	}

	void OnInboundAccepted(const asio::error_code& i_error)
	{
		TraceIn(__LINE__);

		if (i_error) { AbortIn(__LINE__); return; }

		asio::async_read_until(*m_inboundSocket, m_inboundRxBuf, '\n',
			m_inboundStrand.wrap([this](const asio::error_code& i_err, size_t i_nRd)
		{
			OnInboundReadCompleted(i_err, i_nRd);
		}));
	}


	void OnInboundReadCompleted(const asio::error_code& i_error, size_t i_nRead)
	{
		TraceIn(__LINE__);

		if (i_error.value() != 0) { AbortIn(__LINE__); return; }
		if (bool(i_error)) { AbortIn(__LINE__); return; }
		if (i_nRead != 4) { AbortIn(__LINE__); return; }  // "msg\n"

		std::istream is(&m_inboundRxBuf);
		std::string s;
		if (!std::getline(is, s)) { AbortIn(__LINE__); return; }
		if (s != "msg") { AbortIn(__LINE__); return; }
		if (m_inboundRxBuf.in_avail() != 0) { AbortIn(__LINE__); return; }
 
		asio::async_read_until(*m_inboundSocket, m_inboundRxBuf, '\n',
			m_inboundStrand.wrap([this](const asio::error_code& i_err, size_t i_nRd)
		{
			OnInboundWaitCompleted(i_err, i_nRd);
		}));

	}

	void OnInboundWaitCompleted(const asio::error_code& i_error, size_t i_nRead)
	{
		TraceIn(__LINE__);

		if (i_error != asio::error::eof) { AbortIn(__LINE__); return; }
		if (i_nRead != 0) { AbortIn(__LINE__); return; }
 
		if (s_freeInboundSocket)
		{
			m_inboundSocket.reset();
		}
	}

	//////////////
	// Outbound

	void Connect(std::string i_host, uint16_t i_portBase)
	{
		asio::error_code ec;
		auto endpoint = m_resolver.resolve(asio::ip::tcp::resolver::query(i_host, std::to_string(i_portBase+m_n)), ec);
		if (ec) { AbortOut(__LINE__); return; }

		m_outboundSocket.reset(new asio::ip::tcp::socket(m_outboundStrand.get_io_service()));

		TraceOut(__LINE__);

		asio::async_connect(*m_outboundSocket, endpoint,
			m_outboundStrand.wrap([this](const std::error_code& i_error, const asio::ip::tcp::resolver::iterator& i_ep)
		{
			OnOutboundConnected(i_error, i_ep);
		}));
	}

	void OnOutboundConnected(const asio::error_code& i_error, const asio::ip::tcp::resolver::iterator& i_endpoint)
	{
		TraceOut(__LINE__);

		if (i_error) { AbortOut(__LINE__); return; }

		std::ostream(&m_outboundTxBuf) << "msg" << '\n';

		asio::async_write(*m_outboundSocket, m_outboundTxBuf.data(),
			m_outboundStrand.wrap([this](const asio::error_code& i_error, size_t i_nWritten)
		{
			OnOutboundWriteCompleted(i_error, i_nWritten);
		}));
	}

	void OnOutboundWriteCompleted(const asio::error_code& i_error, size_t i_nWritten)
	{
		TraceOut(__LINE__);

		if (i_error) { AbortOut(__LINE__); return; }
		if (i_nWritten != 4) { AbortOut(__LINE__); return; } // "msg\n"

		TraceOut(__LINE__);
		m_outboundSocket->shutdown(asio::socket_base::shutdown_both);

		asio::async_read_until(*m_outboundSocket, m_outboundRxBuf, '\n',
			m_outboundStrand.wrap([this](const asio::error_code& i_error, size_t i_nRead)
		{
			OnOutboundWaitCompleted(i_error, i_nRead);
		}));
	}

	void OnOutboundWaitCompleted(const asio::error_code& i_error, size_t i_nRead)
	{
		TraceOut(__LINE__);

		if (i_error != asio::error::eof) { AbortOut(__LINE__); return; }
		if (i_nRead != 0) { AbortOut(__LINE__); return; }

		if (s_freeOutboundSocket)
		{
			m_outboundSocket.reset();
		}
	}

private:
	//////////////
	// Inbound

	asio::io_service::strand m_inboundStrand;

	asio::ip::tcp::acceptor m_listener;
	std::unique_ptr<asio::ip::tcp::socket> m_inboundSocket;

	asio::streambuf m_inboundRxBuf;
	asio::streambuf m_inboundTxBuf;

	//////////////
	// Outbound

	asio::io_service::strand m_outboundStrand;

	asio::ip::tcp::resolver m_resolver;
	std::unique_ptr<asio::ip::tcp::socket> m_outboundSocket;

	asio::streambuf m_outboundRxBuf;
	asio::streambuf m_outboundTxBuf;

	//////////////
	// Common

	unsigned m_n;

	const std::chrono::high_resolution_clock::time_point m_traceStart;
	std::vector<std::pair<unsigned, std::chrono::nanoseconds>> m_inboundTrace;
	std::vector<std::pair<unsigned, std::chrono::nanoseconds>> m_outboundTrace;
};

static int Usage(int i_ret)
{
	std::cout << "[" << i_ret << "]" << "Usage: example <nThreads> <nConnections> <inboundFree> <outboundFree>" << std::endl;
	return i_ret;
}

int main(int argc, char* argv[])
{
	if (argc < 5)
		return Usage(__LINE__);

	const unsigned nThreads = unsigned(std::stoul(argv[1]));
	if (nThreads == 0)
		return Usage(__LINE__);
	const unsigned nConnections = unsigned(std::stoul(argv[2]));
	if (nConnections == 0)
		return Usage(__LINE__);

	s_freeInboundSocket = (*argv[3] == 'y');
	s_freeOutboundSocket = (*argv[4] == 'y');
	
	const uint16_t listenPortBase = 25000;
	const uint16_t connectPortBase = 25000;
	const std::string connectHost = "127.0.0.1";

	asio::io_service ioService;

	std::cout << "Creating." << std::endl;
	
	std::list<Tester> testers;

	for (unsigned i = 0; i < nConnections; ++i)
	{
		testers.emplace_back(ioService, i);
		testers.back().Listen(listenPortBase);
		testers.back().Connect(connectHost, connectPortBase);
	}
	
	std::cout << "Starting." << std::endl;

	std::vector<std::thread> threads;
	
	for (unsigned i = 0; i < nThreads; ++i)
	{	
		threads.emplace_back([&]()
		{
			ioService.run();
		});
	}

	std::cout << "Waiting." << std::endl;

	for (auto& thread : threads)
	{
		thread.join();
	}

	std::cout << "Stopped." << std::endl;

	return 0;
}

void DumpAllTraces(std::list<Tester>& i_testers)
{
	std::map<unsigned, unsigned> counts;

	for (auto& tester : i_testers)
	{
		tester.DumpTrace(counts);
	}	

	std::cout << "##############################\n";
	for (const auto& count : counts)
	{
		std::cout << count.first << " : " << count.second << "\n";
	}
	std::cout << std::endl;
}

#if defined(ASIO_NO_EXCEPTIONS)
namespace asio
{
        namespace detail
        {

                template <typename Exception>
                void throw_exception(const Exception& e)
                {
			abort();
                }

        } // namespace detail
} // namespace asio
#endif

We compile as follows (the problem only occurs in optimised builds):

g++ -o example -m64 -g -O3 --no-exceptions --no-rtti --std=c++11 -I asio-1.10.6/include -lpthread example.cpp

We're running on Debian Jessie. uname -a reports (Linux <hostname> 3.16.0-4-amd64 #1 SMP Debian 3.16.36-1+deb8u2 (2016-10-19) x86_64 GNU/Linux.
The problem appears under both GCC (g++ (Debian 4.9.2-10) 4.9.2) and Clang (Debian clang version 3.5.0-10 (tags/RELEASE_350/final) (based on LLVM 3.5.0)).

In summary, the test app does the following:

  • We create N connections, each consisting of an inbound (listening)
    end and an outbound (connecting) end. Each inbound listener is bound
    to a unique port (starting at 25000), and each outbound connector
    uses a system-selected originating port.

  • The inbound end performs an async_accept, and on
    completion issues an async_read. When the read completes it issues
    another async_read that we expect to return eof. When that
    completes, we either free the socket immediately, or leave it as-is
    (with no pending async operations) to be cleaned up by the relevant
    destructors at program exit. (Note that the listener socket is
    always left as-is, with no pending accept, until exit.)

  • The outbound end performs an async_connect, and on completion issues
    an async_write. When the write completes it issues a shutdown
    (specifically, shutdown(both)) followed by an async_read that we
    expect to return eof. On completion, we once again either leave the
    socket as-is, with no pending operations, or we free it immediately.

  • Any error or unexpected receive data results in an immediate abort()
    call.

  • The test app lets us specify the number of worker threads for the
    io_service, as well as the total number of connections to create, as
    well as flags controlling whether inbound and outbound sockets
    respectively are freed or left as-is.

  • We run the test app repeatedly, specifying 50 threads and 1000
    connections.

    i.e. while ./example 50 1000 n y >out.txt ; do echo -n . ; done

If we specify that all sockets are left as-is, the test loop runs indefinitely. To avoid muddying the waters with SO_REUSEADDR considerations, we take care that no sockets are in TIME_WAIT state from a previous test run before we start the test, otherwise the listens can fail. But with this caveat satisfied, the test app runs literally hundreds, even thousands of times with no error. Similarly, if we specify that inbound sockets (but NOT outbound sockets) should be explicitly freed, all runs fine.

However, if we specify that outbound sockets should be freed, the app stalls after a variable number of executions - sometimes ten or fewer, sometimes a hundred or more, usually somewhere in between.

Connecting to the stalled process with GDB, we see that the main thread is waiting to join the worker threads, all but one of the worker threads are idle (waiting on an Asio internal condition variable), and that one worker thread is waiting in Asio's call to epoll(). The internal trace instrumentation verifies that some of the sockets are waiting on async operations to complete - sometimes the initial (inbound) accept, sometimes the (inbound) data read, and sometimes the final inbound or outbound reads that normally complete with eof.

In all cases, the other end of the connection has successfully done its bit: if the inbound accept is still pending, we see that the corresponding outbound connect has successfully completed, along with the outbound write; likewise if the inbound data read is pending, the corresponding outbound connect and write have completed; if the inbound EOF read is pending, the outbound shutdown has been performed, and likewise if the outbound EOF read is pending, the inbound EOF read has completed due to the outbound shutdown.

Examining the process's /proc/N/fdinfo shows that the epoll file descriptor is indeed waiting on the file descriptors indicated by the instrumentation.

Most puzzlingly, netstat shows nonzero RecvQ sizes for the waiting sockets - that is, sockets for which there is a read operation pending are shown to have receive data or close events ready to read. This is consistent with our instrumentation, in that it shows that write data has been delivered to the inbound socket, but has not yet been read (or alternatively that the outbound shutdown has issued a FIN to the inbound side, but that the EOF has not yet been 'read').

This leads me to suspect that Asio's epoll bookkeeping - in particular its edge-triggered event management - is getting out of sync somewhere due to a race condition. Clearly this is more than likely due to incorrect operations on my part, but I can't see where the problem would be.

All insights, suggestions, known issues, and pointing-out-glaring-screwups would be greatly appreciated.

@DemiMarie
Copy link

What does ThreadSanitizer say?

@jeremyd-gl
Copy link
Author

Good question. I'll give it a shot when I have time to return to this - although I wouldn't be at all surprised if the 5x-10x overhead introduced by ThreadSanitizer prevents the problem from occurring.
Thanks for responding!

@pascal-simon
Copy link

pascal-simon commented Jun 15, 2017

I suspect we dumped on a similar issue in a totally different context, and not as instrumented as this:

  • using boost::asio (boost v1.63)
  • built with gcc
  • with a server running on a 64 bits ubuntu 16 (details to follow) (bonefish wamp router actually)
  • with clients running on embedded ARM linux system (clients based on boost::asio & autobahn-cpp)
    We end up with a system totally blocked on I/Os, with non-empty sockets buffers.
    gdbing on the router, we find that it is "blocked" sending (poll for write completion as I would summarize it).
    On the wire (as seen with wireshark capture) , the data has been sent and tcp-acked be the peer.
    Note that all peers use blocking sokets.
    Note also that the bonefish router is single-threaded.

@chriskohlhoff
Copy link
Owner

Thanks @jeremyd-gl for the program to reproduce the issue. This was one tough little nut to crack. I have committed the fix to the asio-1-10-branch (669e6b8) and master branch (47b9319).

@pascal-simon, your issue is probably unrelated as you have a single thread. If you can, please try to reduce it to a simple program to reproduce the issue, and then raise a new ticket for it. You might also find asio's handler tracking helpful.

@jeremyd-gl
Copy link
Author

jeremyd-gl commented Aug 1, 2017 via email

@gscuderi
Copy link

gscuderi commented Sep 7, 2017

Is this also affecting non standalone ASIO? Older versions of Boost maybe?

I'm facing a similar issue with Boost 1.55.0 and I wonder if I got into the same issue..

@gscuderi
Copy link

gscuderi commented Sep 8, 2017

It affects boost 1.55.0 and 1.60.0 (likely any other versions except 1.65.0 on which the fix has been commited).
The same fix with some adaptation applied on 1.55.0 seems to fix it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants