Skip to content

Commit

Permalink
ipc: improve detection of removed ipc devices (files)
Browse files Browse the repository at this point in the history
When a process exit (could be a core/signal) DM tries to remove the
associated ipc socket file. This to enable processes to detect that an
ipc device is gone, and handle potential consequences.

Before: DM did not remove the ipc socket file properly (or at all?)

Now: DM removes the ipc socket file, and others can detect this.

resolves #194
  • Loading branch information
lazan committed May 19, 2023
1 parent 42cbff8 commit e120b02
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 13 deletions.
4 changes: 4 additions & 0 deletions middleware/common/include/common/communication/ipc.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "common/memory.h"
#include "common/flag.h"

#include <filesystem>

#include <sys/un.h>

Expand Down Expand Up @@ -50,6 +51,9 @@ namespace casual

static_assert( sizeof( Handle) == sizeof( Socket) + sizeof( strong::ipc::id), "padding problem");

//! @returns the path to the ipc "device"
std::filesystem::path path( const strong::ipc::id& id);

struct Address
{
Address() = default;
Expand Down
2 changes: 1 addition & 1 deletion middleware/common/include/common/flag.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ namespace casual

namespace has
{
template< std::uint64_t flags, typename T>
template< auto flags, typename T>
constexpr inline bool flag( T value)
{
return ( value & flags) == flags;
Expand Down
47 changes: 37 additions & 10 deletions middleware/common/source/communication/ipc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,26 @@ namespace casual

Handle::~Handle() = default;


Address::Address( strong::ipc::id ipc)
std::filesystem::path path( const strong::ipc::id& id)
{
const auto path = ( environment::directory::ipc() / uuid::string( ipc.value())).string();
constexpr auto max_size = sizeof( ::sockaddr_un::sun_path) -1;

auto result = environment::directory::ipc() / uuid::string( id.value());

if( path.size() > ( sizeof( m_native.sun_path) - 1))
if( result.native().size() > max_size)
code::raise::error( code::casual::invalid_path, "transient directory path too long");

return result;
}


Address::Address( strong::ipc::id ipc)
{
const auto path = ipc::path( ipc);

auto target = std::begin( m_native.sun_path);

algorithm::copy( path, target);
algorithm::copy( path.native(), target);

m_native.sun_family = AF_UNIX;
}
Expand Down Expand Up @@ -488,17 +497,35 @@ namespace casual

} // partial

namespace local
{
namespace
{
bool exists( const std::filesystem::path& path) noexcept
{
log::line( verbose::log, "path: ", path);
return std::filesystem::exists( path);
}

} // <unnamed>
} // local

bool exists( strong::ipc::id id)
{
const Address address{ id};
return ::access( address.native().sun_path, F_OK) != -1;
Trace trace{ "common::communication::ipc::exists"};
return local::exists( ipc::path( id));
}


bool remove( strong::ipc::id id)
{
Address address{ id};
return ::unlink( address.native().sun_path) != -1;
Trace trace{ "common::communication::ipc::remove"};

auto path = ipc::path( id);

if( ! local::exists( path))
return false;

return std::filesystem::remove( path);
}

bool remove( const process::Handle& owner)
Expand Down
35 changes: 35 additions & 0 deletions middleware/common/unittest/source/communication/test_ipc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,41 @@ namespace casual
);
}

TEST( common_communication_ipc, send_multiplex__remove_inbound__expect_detection)
{
common::unittest::Trace trace;

communication::ipc::inbound::Device inbound;
const auto ipc = inbound.connector().handle().ipc();

communication::select::Directive directive;
communication::ipc::send::Coordinator coordinator{ directive};

const unittest::Message message{ 64};

bool error_detected = false;

// "fill" the inbound
while( coordinator.empty())
coordinator.send( ipc, message, [ &error_detected]( auto& id, auto& complete){ error_detected = true;});

EXPECT_TRUE( ! error_detected);

// The coordinator now have a socket "bound" to the inbound.
// Lets destroy the inbound.
common::sink( std::move( inbound));

EXPECT_TRUE( ! communication::ipc::exists( ipc));

// coordinator tries to send the pending message to the
// "bound" socket, this should be detected, and error callback should
// be invoked.
EXPECT_TRUE( coordinator.send());

EXPECT_TRUE( error_detected);

}

namespace local
{
namespace
Expand Down
3 changes: 1 addition & 2 deletions middleware/domain/source/manager/state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ namespace casual
{
void remove( strong::ipc::id ipc)
{
if( communication::ipc::exists( ipc))
communication::ipc::remove( ipc);
communication::ipc::remove( ipc);
}
} // ipc

Expand Down
83 changes: 83 additions & 0 deletions test/unittest/source/test_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#include "common/communication/instance.h"
#include "common/message/event.h"
#include "common/event/listen.h"
#include "common/service/lookup.h"
#include "common/communication/ipc/send.h"

#include "casual/xatmi.h"

Expand Down Expand Up @@ -328,7 +330,88 @@ namespace casual
}
}

TEST( test_service, multiplex_call_to_service__kill_server__expect_multiplex_detect_ipc_device_is_gone)
{
common::unittest::Trace trace;

auto domain = casual::domain::unittest::manager( local::configuration::base, R"(
domain:
servers:
- path: ${CASUAL_MAKE_SOURCE_ROOT}/middleware/example/server/bin/casual-example-server
memberships: [ user]
instances: 1
restrictions:
- casual/example/echo
)");

// lookup / reserve the instance
auto lookup = common::service::Lookup{ "casual/example/echo"}();
ASSERT_TRUE( ! lookup.busy());

communication::ipc::inbound::Device inbound;


communication::select::Directive directive;
communication::ipc::send::Coordinator coordinator{ directive};

long error_count = 0;


// we send as many request we can until the send::coordinator fails to send, due to
// example server can't send the replies (our inbound gets "full").
// we want the send::Coordinator to create a socket to the example server ipc device
{
auto handle = process::Handle{ process::handle().pid, inbound.connector().handle().ipc()};

message::service::call::callee::Request request{ handle};
request.buffer.type = buffer::type::binary;
request.buffer.data = unittest::random::binary( 64);
request.service = lookup.service;

auto error_callback = [ &error_count]( auto& ipc, auto& complete){ ++error_count;};

// this will render unexpected unreserves for casual/example/echo to SM -> error logs
// We'll just ignore this, to keep the unittest less complex.
while( coordinator.empty())
coordinator.send( lookup.process.ipc, request, error_callback);

// add a few extra...
algorithm::for_n( 20, [ &]()
{
coordinator.send( lookup.process.ipc, request, error_callback);
});

EXPECT_TRUE( ! coordinator.empty());
}

// Ok, now we've got send::Coordinator to be "bound" to example-server inbound
// ipc device.
// If we kill example-server, send::Coordinator should detect that the ipc-device is
// logically removed (even if the socket keeps it alive due to fd reference counting and such)
{
common::message::event::process::Exit event;
auto guard = common::event::scope::subscribe( common::process::handle(), { event.type()});

signal::send( lookup.process.pid, code::signal::kill);

// wait until DM knows about it, and has removed the ipc device
while( event.state.pid != lookup.process.pid)
{
common::communication::device::blocking::receive(
common::communication::ipc::inbound::device(),
event);
}

// try to send all "cached" messages -> invoke error-callback
while( ! coordinator.empty())
coordinator.send();

EXPECT_TRUE( error_count > 0);
}



}


} // test::domain::service
Expand Down

0 comments on commit e120b02

Please sign in to comment.