SEGV while using remote actor? #71

Closed
ArtemGr opened this Issue Sep 12, 2012 · 11 comments

Projects

None yet

2 participants

@ArtemGr

I see this periodically:

#3  <signal handler called>
#4  operator-> (this=0x2) at /root/work/libcppa/./cppa/intrusive_ptr.hpp:147
#5  cppa::remote_actor (peer=<error reading variable: Cannot access memory at address 0x2>) at /root/work/libcppa/src/unicast_network.cpp:78
#6  0x00007fab58663c0e in cppa::remote_actor (host=<optimized out>, port=<optimized out>) at /root/work/libcppa/src/unicast_network.cpp:104
#7  0x00007fab5b34a474 in img2::Service::scaleRpc (this=this@entry=0x7cf5a0, scaled=..., id=..., original=..., watermark=..., maxWidth=800, maxHeight=600,
    version=version@entry=1) at src/service.cpp:193

scaleRpc is:

    using namespace cppa; using std::chrono::seconds;
    actor_ptr img2actor = remote_actor ("127.0.0.1", 1100); // Calling into "img2.scaleService".
    sync_send (img2actor, atom("scale"), id, original, watermark, maxWidth, maxHeight, version) .await (
      on_arg_match >> [&scaled](ScaleReply reply){
        scaled = reply;
      },
      after (seconds (3)) >> [this]{log_error ("scaleRpc: timeout waiting for reply");}
    );
@Neverlord
CAF: C++ Actor Framework member

I guess I need some more informations about your use case. "Cannot access 0x2" looks like a dereferenced nullptr. However, in unicast_network.cpp line 78 is an io_stream_ptr accessed which cannot be nullptr, since cppa throws an exception in case of errors beforehand. I cannot reproduce a bug from the snippet above.

Are you calling remote_actor from several threads/actors in parallel? Or is there anything else in your setup I that might affect the call to remote_actor?

@ArtemGr

Seems to me that could only happen if new returned NULL. Maybe an Electric Fence artefact. Sorry I haven't looked at the trace site first.

@ArtemGr ArtemGr closed this Sep 17, 2012
@ArtemGr

On further investigation it is not an Electrif Fence falut, but I can't make much sense of it yet.

@ArtemGr

Here is it from a different vantage point (Valgrind):

==32344== Syscall param socketcall.recvfrom(buf) points to unaddressable byte(s)
==32344==    at 0x8DDA54C: recv (recv.c:34)
==32344==    by 0x801BE30: cppa::detail::ipv4_io_stream::read(void*, unsigned long) (ipv4_io_stream.cpp:68)
==32344==    by 0x8065845: cppa::remote_actor(std::pair<cppa::intrusive_ptr<cppa::util::input_stream>, cppa::intrusive_ptr<cppa::util::output_stream> >) (unicast_network.cpp:77)
==32344==    by 0x8065BC0: cppa::remote_actor(char const*, unsigned short) (unicast_network.cpp:104)
==32344==    by 0x5149703: img2::Service::scaleRpc(img2::ScaleReply&, std::string const&, std::string const&, std::string const&, int, int, int) (service.cpp:190)
==32344==    by 0x514A870: img2::Service::serve(tnt::HttpRequest&, tnt::HttpReply&, tnt::QueryParams&) (service.cpp:327)
==32344==    by 0x514B62C: img2::TntService::operator()(tnt::HttpRequest&, tnt::HttpReply&, tnt::QueryParams&) (service.cpp:80)
==32344==    by 0x5406203: tnt::Worker::dispatch(tnt::HttpRequest&, tnt::HttpReply&) (worker.cpp:431)
==32344==    by 0x5406D6E: tnt::Worker::processRequest(tnt::HttpRequest&, std::iostream&, unsigned int) (worker.cpp:240)
==32344==    by 0x5407886: tnt::Worker::run() (worker.cpp:139)
==32344==    by 0x51540F1: cxxtools::DetachedThread::exec() (thread.h:315)
==32344==    by 0x5703AFE: thread_entry (callable.tpp:314)
==32344==  Address 0x2 is not stack'd, malloc'd or (recently) free'd
@ArtemGr

So far, I've instrumented your code:

void ipv4_io_stream::read(void* vbuf, size_t len) {
  if ((intptr_t) vbuf < 10) VALGRIND_PRINTF ("trace11, vbuf < 10\n");
    auto buf = reinterpret_cast<char*>(vbuf);
  if ((intptr_t) buf < 10) VALGRIND_PRINTF ("trace12, buf < 10\n");
    size_t rd = 0;
    while (rd < len) {
  if ((intptr_t) buf < 10) VALGRIND_PRINTF ("trace13, buf < 10\n");
  if (rd < 0) VALGRIND_PRINTF ("trace14, rd < 0: %i", (int) rd);
  if (rd > 99111222) VALGRIND_PRINTF ("trace14, rd > 99111222: %i", (int) rd);
        auto recv_result = ::recv(m_fd, buf + rd, len - rd, 0);
        handle_read_result(recv_result, true);
  if ((intptr_t) buf < 10) VALGRIND_PRINTF ("trace15, buf < 10\n");
        if (recv_result > 0) {
            rd += static_cast<size_t>(recv_result);
        }
        if (rd < len) {
  if ((intptr_t) buf < 10) VALGRIND_PRINTF ("trace21, buf < 10\n");
            fd_set rdset;
  if ((intptr_t) buf < 10) VALGRIND_PRINTF ("trace22, buf < 10\n");
            FD_ZERO(&rdset);
  if ((intptr_t) buf < 10) VALGRIND_PRINTF ("trace23, buf < 10\n");
            FD_SET(m_fd, &rdset);
  if ((intptr_t) buf < 10) VALGRIND_PRINTF ("trace24, buf < 10\n");
            if (select(m_fd + 1, &rdset, nullptr, nullptr, nullptr) < 0) {
                throw network_error("select() failed");
            }
  if ((intptr_t) buf < 10) VALGRIND_PRINTF ("trace25, buf < 10\n");
        }
  if ((intptr_t) buf < 10) VALGRIND_PRINTF ("trace16, buf < 10\n");
    }
}

and found that "buf" is first corrupted at "trace25":

**19461** trace25, buf < 10
**19461** trace16, buf < 10
**19461** trace13, buf < 10
==19461== Thread 17:
==19461== Syscall param socketcall.recvfrom(buf) points to unaddressable byte(s)
==19461==    at 0x8DDC54C: recv (recv.c:34)
==19461==    by 0x801D0D7: cppa::detail::ipv4_io_stream::read(void*, unsigned long) (ipv4_io_stream.cpp:73)
==19461==    by 0x8066EAF: cppa::remote_actor(std::pair<cppa::intrusive_ptr<cppa::util::input_stream>, cppa::intrusive_ptr<cppa::util::output_stream> >) (unicast_network.cpp:81)
==19461==    by 0x8067236: cppa::remote_actor(char const*, unsigned short) (unicast_network.cpp:108)
==19461==    by 0x514A093: img2::Service::scaleRpc(img2::ScaleReply&, std::string const&, std::string const&, std::string const&, int, int, int) (service.cpp:190)
==19461==    by 0x514B210: img2::Service::serve(tnt::HttpRequest&, tnt::HttpReply&, tnt::QueryParams&) (service.cpp:327)
==19461==    by 0x514BFBC: img2::TntService::operator()(tnt::HttpRequest&, tnt::HttpReply&, tnt::QueryParams&) (service.cpp:80)
==19461==    by 0x5407203: tnt::Worker::dispatch(tnt::HttpRequest&, tnt::HttpReply&) (worker.cpp:431)
==19461==    by 0x5407D6E: tnt::Worker::processRequest(tnt::HttpRequest&, std::iostream&, unsigned int) (worker.cpp:240)
==19461==    by 0x5408886: tnt::Worker::run() (worker.cpp:139)
==19461==    by 0x5154E71: cxxtools::DetachedThread::exec() (thread.h:315)
==19461==    by 0x5704AFE: thread_entry (callable.tpp:314)
==19461==  Address 0x2 is not stack'd, malloc'd or (recently) free'd
@ArtemGr ArtemGr reopened this Sep 24, 2012
@ArtemGr

Looks like a bug in select or select-related macros (or GCC optimisations of them), because when I replace select with poll, cppa::detail::ipv4_io_stream::read no longer crashes and the application becomes more stable.

diff --git a/src/ipv4_io_stream.cpp b/src/ipv4_io_stream.cpp
index 8271026..36f06f8 100644
--- a/src/ipv4_io_stream.cpp
+++ b/src/ipv4_io_stream.cpp
@@ -45,6 +45,7 @@
 #   include <sys/socket.h>
 #   include <netinet/in.h>
 #   include <netinet/tcp.h>
+#   include <poll.h>
 #endif

 namespace cppa { namespace detail {
@@ -71,12 +72,9 @@ void ipv4_io_stream::read(void* vbuf, size_t len) {
             rd += static_cast<size_t>(recv_result);
         }
         if (rd < len) {
-            fd_set rdset;
-            FD_ZERO(&rdset);
-            FD_SET(m_fd, &rdset);
-            if (select(m_fd + 1, &rdset, nullptr, nullptr, nullptr) < 0) {
-                throw network_error("select() failed");
-            }
+            struct pollfd fds;
+            fds.fd = m_fd; fds.events = POLLIN; fds.revents = 0;
+            if (poll (&fds, 1, -1) < 0) throw network_error("poll() failed");
         }
     }
 }

The app now crashes less often, in middleman.cpp:787. I'll try to change it to poll as well.

@ArtemGr

Here is my attempt patching middleman to use poll:

diff --git a/src/middleman.cpp b/src/middleman.cpp
index 825fbf6..537a8a6 100644
--- a/src/middleman.cpp
+++ b/src/middleman.cpp
@@ -35,6 +35,11 @@
 #include <sstream>
 #include <iostream>

+#ifdef CPPA_WINDOWS
+#else
+#   include <poll.h>
+#endif
+
 #include "cppa/on.hpp"
 #include "cppa/actor.hpp"
 #include "cppa/match.hpp"
@@ -58,7 +63,7 @@

 using namespace std;

-//#define VERBOSE_MIDDLEMAN
+#define VERBOSE_MIDDLEMAN

 #ifdef VERBOSE_MIDDLEMAN
 #define DEBUG(arg) {                                                           \
@@ -441,7 +446,7 @@ bool peer_connection::continue_reading() {
                 binary_deserializer bd(m_rd_buf.data(), m_rd_buf.size());
                 m_meta_msg->deserialize(&msg, &bd);
                 auto& content = msg.content();
-                DEBUG("<-- " << to_string(msg));
+                //DEBUG("<-- " << to_string(msg));
                 match(content) (
                     // monitor messages are sent automatically whenever
                     // actor_proxy_cache creates a new proxy
@@ -670,7 +675,7 @@ class middleman_overseer : public network_channel {
                         DEBUG("message to an unknown peer: " << to_string(out_msg));
                         break;
                     }
-                    DEBUG("--> " << to_string(out_msg));
+                    //DEBUG("--> " << to_string(out_msg));
                     auto had_unwritten_data = peer->has_unwritten_data();
                     try {
                         peer->write(out_msg);
@@ -702,19 +707,17 @@ class middleman_overseer : public network_channel {

 void middleman::operator()(int pipe_fd, middleman_queue& queue) {
     DEBUG("pself: " << to_string(*m_pself));
-    int maxfd = 0;
-    fd_set rdset;
-    fd_set wrset;
-    fd_set* wrset_ptr = nullptr;
+    std::vector<struct pollfd> fdset;
+    bool wrset = false;
     m_channels.emplace_back(new middleman_overseer(this, pipe_fd, queue));
     auto update_fd_sets = [&] {
-        FD_ZERO(&rdset);
-        maxfd = 0;
+        fdset.clear();
         CPPA_REQUIRE(m_channels.size() > 0);
         for (auto& channel : m_channels) {
-            auto fd = channel->read_handle();
-            maxfd = max(maxfd, fd);
-            FD_SET(fd, &rdset);
+            struct pollfd pfd;
+            pfd.fd = channel->read_handle();
+            pfd.events = POLLIN; pfd.revents = 0;
+            fdset.push_back (pfd);
         }
         // check consistency of m_peers_with_unwritten_data
         if (!m_peers_with_unwritten_data.empty()) {
@@ -728,17 +731,17 @@ void middleman::operator()(int pipe_fd, middleman_queue& queue) {
             }
         }
         if (m_peers_with_unwritten_data.empty()) {
-            if (wrset_ptr) wrset_ptr = nullptr;
+            wrset = false;
         }
         else {
             for (auto& peer : m_peers_with_unwritten_data) {
-                auto fd = peer->write_handle();
-                maxfd = max(maxfd, fd);
-                FD_SET(fd, &wrset);
+                struct pollfd pfd;
+                pfd.fd = peer->write_handle();
+                pfd.events = POLLOUT; pfd.revents = 0;
+                fdset.push_back (pfd);
             }
-            wrset_ptr = &wrset;
+            wrset = true;
         }
-        CPPA_REQUIRE(maxfd > 0);
     };
     auto continue_reading = [&](const network_channel_ptr& ch) {
         bool erase_channel = false;
@@ -799,14 +802,13 @@ void middleman::operator()(int pipe_fd, middleman_queue& queue) {
     };
     do {
         update_fd_sets();
-        //DEBUG("select()");
         int sresult;
         do {
-            DEBUG("select() on "
+            DEBUG("poll() on "
                   << (m_peers_with_unwritten_data.size() + m_channels.size())
                   << " sockets");
-            sresult = select(maxfd + 1, &rdset, wrset_ptr, nullptr, nullptr);
-            DEBUG("select() returned " << sresult);
+            sresult = poll (fdset.data(), fdset.size(), -1);
+            DEBUG("poll() returned " << sresult);
             if (sresult < 0) {
                 // try again or die hard
                 sresult = 0;
@@ -819,9 +821,7 @@ void middleman::operator()(int pipe_fd, middleman_queue& queue) {
                     // nfds is negative or the value
                     // contained within timeout is invalid
                     case EINVAL: {
-                        if ((maxfd + 1) < 0) {
-                            CPPA_CRITICAL("overflow: maxfd + 1 > 0");
-                        }
+                        CPPA_CRITICAL("poll EINVAL");
                         break;
                     }
                     case ENOMEM: {
@@ -855,17 +855,27 @@ void middleman::operator()(int pipe_fd, middleman_queue& queue) {
         //DEBUG("continue reading ...");
         { // iterate over all channels and remove channels as needed
             for (auto& ch : m_channels) {
-                if (FD_ISSET(ch->read_handle(), &rdset)) {
-                    continue_reading(ch);
-                }
+                for (auto& pfd: fdset)
+                    if (pfd.fd == ch->read_handle()) {
+                        if (pfd.revents) DEBUG("fd " << pfd.fd << "; read revents: " << pfd.revents);
+                        if (pfd.revents & (POLLRDHUP | POLLERR | POLLHUP | POLLNVAL))
+                            m_erased_channels.insert (ch);
+                        else if (pfd.revents != 0)
+                            continue_reading(ch);
+                    }
             }
         }
-        if (wrset_ptr) { // iterate over peers with unwritten data
+        if (wrset) { // iterate over peers with unwritten data
             DEBUG("continue writing ...");
             for (auto& peer : m_peers_with_unwritten_data) {
-                if (FD_ISSET(peer->write_handle(), &wrset)) {
-                    continue_writing(peer);
-                }
+                for (auto& pfd: fdset)
+                    if (pfd.fd == peer->write_handle()) {
+                        if (pfd.revents) DEBUG("fd " << pfd.fd << "; write revents: " << pfd.revents);
+                        if (pfd.revents & (POLLRDHUP | POLLERR | POLLHUP | POLLNVAL))
+                            m_peers_with_unwritten_data.erase (peer);
+                        else if (pfd.revents != 0)
+                            continue_writing(peer);
+                    }
             }
         }
         insert_new_handlers();

In debug output I see poll returning POLLIN and POLLOUT events only.
The number of opened sockets being polled grows indefinitely.

The client code which leads to that growths of opened sockets is, as I've already mentioned earlier:

    using namespace cppa; using std::chrono::seconds;
    actor_ptr img2actor = remote_actor ("127.0.0.1", 1100); // Calling into "img2.scaleService".
    sync_send (img2actor, atom("scale"), id, original, watermark, maxWidth, maxHeight, version) .await (
      on_arg_match >> [&scaled](ScaleReply reply){
        scaled = reply;
      },
      after (seconds (3)) >> [this]{log_error ("scaleRpc: timeout waiting for reply");}
    );

If I change it to

    using namespace cppa; using std::chrono::seconds;
    static actor_ptr img2actor = remote_actor ("127.0.0.1", 1100); // Calling into "img2.scaleService".
    sync_send (img2actor, atom("scale"), id, original, watermark, maxWidth, maxHeight, version) .await (
      on_arg_match >> [&scaled](ScaleReply reply){
        scaled = reply;
      },
      after (seconds (3)) >> [this]{log_error ("scaleRpc: timeout waiting for reply");}
    );

then the number of sockets being polled doesn't grow (remains = 2).

I THINK that the select memory corruption mentioned earlier might be related to that growing number of sockets (reaching some internal limits in select).

IMO, the libcppa fails to automatically close the remote_actor sockets where it should.
libcppa should also guard against reaching select limits.

P.S. In retrospect, I should have used epoll...

@Neverlord
CAF: C++ Actor Framework member

Thanks a lot for your effort! I think I'll keep your changes (I'm going to review your patch next week). Using poll() is just fine for me, because epoll() isn't available on all platforms (e.g., MacOS).

You're right about automatically disposal of unused remote actors. Right now, the middleman has a reference to the proxy on its own, since it needs to be able to deserialize it. However, this obviously keeps the reference count above zero, even if no other actor has a reference to the proxy any longer. I'll see how to fix that. Again, thanks a lot for pointing this out.

Btw, sorry for the late answer, but I was ill the whole week.

@ArtemGr

NP!
If the semantics of my patch are okay then I'd like to do a LINUX version with epoll later (with #ifdef __linux__).

@Neverlord
CAF: C++ Actor Framework member

Based on your patch, I have updated the middleman to use poll rather than select. However, I haven't fixed the reference-counting problem yet.

@Neverlord
CAF: C++ Actor Framework member

I have opened a new issue for the reference-counting problem: #75. I'll close this one, since it starts getting too big.

@Neverlord Neverlord closed this Oct 1, 2012
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment