Skip to content

Commit

Permalink
adapter/darwin: sync because the kernel lies
Browse files Browse the repository at this point in the history
  • Loading branch information
Will committed Mar 8, 2024
1 parent 24304a6 commit c8bdffe
Show file tree
Hide file tree
Showing 9 changed files with 752 additions and 313 deletions.
56 changes: 56 additions & 0 deletions devel/include/detail/wtr/watcher/adapter/darwin/notes.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,61 @@
# Notes

## Darwin's Native Inter-Process Communication (Also inter-thread, ofc)

```
inline CFDataRef on_listening_port_response(
CFMessagePortRef, // on_port
SInt32, // msgid
CFDataRef req,
void* // ctx
)
{
long const len = 2;
if (! req || CFDataGetLength(req) < len) { return nullptr; }
// Should always be 2, but let wiggles happen.
CFRange req_range = CFRangeMake(0, len);
unsigned char req_bytes[len] = {0};
CFDataGetBytes(req, req_range, req_bytes);
req_bytes[len - 1] = 0;
unsigned char res_byte_head = req_bytes[0] == 'x' ? 'o' : '?';
unsigned char res_bytes[len] = {res_byte_head, 0};
CFDataRef res = CFDataCreate(NULL, res_bytes, sizeof(res_bytes));
return res;
}
inline CFMessagePortRef open_listening_port(char const* const port_name)
{
CFStringRef cf_port_name =
CFStringCreateWithCString(NULL, port_name, kCFStringEncodingASCII);
CFMessagePortContext context = {0, nullptr, nullptr, nullptr, nullptr};
CFMessagePortRef port = CFMessagePortCreateLocal(
kCFAllocatorDefault,
cf_port_name,
on_listening_port_response,
&context,
nullptr);
CFRunLoopSourceRef source =
CFMessagePortCreateRunLoopSource(kCFAllocatorDefault, port, 0);
auto sched = CFRunLoopGetCurrent();
CFRunLoopAddSource(sched, source, kCFRunLoopCommonModes);
CFRelease(source);
CFRelease(cf_port_name);
return port;
}
inline void send_msg_to_port(
CFMessagePortRef to_port,
unsigned char const* const message,
unsigned message_len)
{
CFDataRef data = CFDataCreate(NULL, message, message_len);
CFMessagePortSendRequest(to_port, 0, data, 1, 1, kCFRunLoopDefaultMode, NULL);
CFRelease(data);
}
```

## The "Rename Triplet"

```
Expand Down
225 changes: 113 additions & 112 deletions devel/include/detail/wtr/watcher/adapter/darwin/watch.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
#if defined(__APPLE__)

#include "wtr/watcher.hpp"
#include <atomic>
#include <CoreFoundation/CoreFoundation.h>
#include <CoreServices/CoreServices.h>
#include <dispatch/dispatch.h>
#include <filesystem>
#include <mutex>
#include <string>
#include <unordered_set>

Expand Down Expand Up @@ -61,13 +61,15 @@ inline constexpr unsigned fsev_flag_effect_any

// clang-format on

struct ctx_type {
struct ContextData {
using fspath = std::filesystem::path;
/* `fs::path` has no hash function, so we use this. */
using pathset = std::unordered_set<std::string>;

::wtr::watcher::event::callback const& callback{};
pathset* seen_created_paths{nullptr};
fspath* last_rename_path{nullptr};
std::mutex* mtx{nullptr};
};

/* We make a path from a C string...
Expand Down Expand Up @@ -96,7 +98,7 @@ struct ctx_type {
The docs don't say much about these fields.
I don't think they mention fileID at all.
*/
inline auto path_from_event_at(void* event_recv_paths, unsigned long i) noexcept
inline auto path_from_event_at(void* event_recv_paths, unsigned long i)
-> std::filesystem::path
{
if (event_recv_paths)
Expand All @@ -117,8 +119,10 @@ inline auto path_from_event_at(void* event_recv_paths, unsigned long i) noexcept
return {};
}

inline auto
event_recv_one(ctx_type& ctx, std::filesystem::path const& path, unsigned flags)
inline auto event_recv_one(
ContextData& ctx,
std::filesystem::path const& path,
unsigned flags)
{
using ::wtr::watcher::event;
using path_type = enum ::wtr::watcher::event::path_type;
Expand All @@ -145,8 +149,8 @@ event_recv_one(ctx_type& ctx, std::filesystem::path const& path, unsigned flags)
return;
}

/* More than one effect might have happened to the
same path. (Which is why we use non-exclusive `if`s.) */
/* More than one thing can happen to the same path.
(So these `if`s are not exclusive.) */

if (flags & fsev_flag_effect_create) {
auto et = effect_type::create;
Expand Down Expand Up @@ -234,36 +238,28 @@ inline auto event_recv(
void* paths, /* Paths with events */
unsigned const* flags, /* Event flags */
FSEventStreamEventId const* /* A unique stream id */
) noexcept -> void
) -> void
{
auto pctx = static_cast<ctx_type*>(maybe_ctx);
auto ok = paths /* These checks are unfortunate, */
&& flags /* but they are also necessary. */
&& pctx; /* Once in a blue moon, this doesn't exist. */

if (ok) {
auto ctx = *pctx;
for (unsigned long i = 0; i < count; i++) {
auto path = path_from_event_at(paths, i);
auto flag = flags[i];
event_recv_one(ctx, path, flag);
}
auto data_ok = paths /* These checks are unfortunate, */
&& flags /* but they are also necessary. */
&& maybe_ctx; /* Once in a blue moon, this doesn't exist */
if (! data_ok) return;
auto ctx = *static_cast<ContextData*>(maybe_ctx);
if (! ctx.mtx->try_lock()) return;
for (unsigned long i = 0; i < count; i++) {
auto path = path_from_event_at(paths, i);
auto flag = flags[i];
event_recv_one(ctx, path, flag);
}
ctx.mtx->unlock();
}

/* Make sure that event_recv has the same type as, or is
convertible to, an FSEventStreamCallback. We don't use
`is_same_v()` here because `event_recv` is `noexcept`.
Side note: Is an exception qualifier *really* part of
the type? Or, is it a "path_type"? Something else?
We want this assertion for nice compiler errors. */

static_assert(FSEventStreamCallback{event_recv} == event_recv);
static_assert(event_recv == FSEventStreamCallback{event_recv});

inline auto open_event_stream(
std::filesystem::path const& path,
dispatch_queue_t queue,
void* ctx) noexcept -> FSEventStreamRef
void* ctx) -> FSEventStreamRef
{
auto context = FSEventStreamContext{
.version = 0, /* FSEvents.h: "Only valid value is zero." */
Expand All @@ -273,16 +269,19 @@ inline auto open_event_stream(
.copyDescription = nullptr, /* Optional string for debugging. */
};

/* todo: Do we need to release these?
CFRelease(path_cfstring);
CFRelease(path_array);
*/
/* path_cfstring and path_array appear to be managed by the
FSEventStream, are always null when we go to close the
stream, and shouldn't be freed before then. Would seem
to me like we'd need to release them (in re. the Create
rule), but we don't appear to. */

void const* path_cfstring =
CFStringCreateWithCString(nullptr, path.c_str(), kCFStringEncodingUTF8);

/* A predefined structure which is (from CFArray.h) --
"appropriate when the values in a CFArray are CFTypes" */
static auto const cf_arr_ty = kCFTypeArrayCallBacks;

CFArrayRef path_array = CFArrayCreate(
nullptr, /* A custom allocator is optional */
&path_cfstring, /* Data: A ptr-ptr of (in our case) strings */
Expand All @@ -295,7 +294,7 @@ inline auto open_event_stream(
`context` and some details about each filesystem event
the kernel sees for the paths in `path_array`. */

FSEventStreamRef stream = FSEventStreamCreate(
auto stream = FSEventStreamCreate(
nullptr, /* A custom allocator is optional */
&event_recv, /* A callable to invoke on changes */
&context, /* The callable's arguments (context) */
Expand All @@ -314,7 +313,7 @@ inline auto open_event_stream(
return nullptr;
}

inline auto wait(semabin const& sb) noexcept
inline auto wait(semabin const& sb)
{
auto s = sb.state();
if (s == semabin::pending) {
Expand All @@ -325,101 +324,103 @@ inline auto wait(semabin const& sb) noexcept
return s;
}

/* We want to handle any outstanding events before closing,
/* Bugs, footguns
1.
When we flood the filesystem with events, Darwin may choose,
for reason I don't fully understand, to tell us about events
after we are long gone. Maybe the FSEvent stream (which should
be f'ing closed) sometimes ignores us having asking it to stop.
Maybe some tasks in the dispatch queue are not being cleared
properly. I'm not sure.
Creating events as quickly as possible while a few short-lived
FSEvents streams are open is usually enough to reproduce this.
The recipe to reproduce seems to be a ton of filesystem events
ongoing while some FSEvent streams are open, then closing the
streams either during or shortly after the events stop happening
to the filesystem, but before they have all been reported.
A minimal-ish reproducer is in /etc/wip-fsevents-issue.
Whatever the reason, sometimes, Darwin seems happy call into
our event handler with resource after we have left the memory
space those resources belong to. I consider that a bug somewhere
in FSEvents, Dispatch or maybe something deeper.
At one point, I thought that purging events for the device
would help. Even that fails under sufficiently high load.
The positive side effect may have effectively been a sleep.
Sometimes I even consider adding a deliberate sleep here.
Because time is not a synchronization primitive, that will
also eventually fail. Though, if it's the best we can do,
despite the kernel, maybe it's worth it. I'm not sure.
Before that, I added a bunch of synchronization primitives
to the context and made the lifetime of the context a bit
transactional. We'd check on atomic flags which in the event
reception loop to be sure we were alive there, after which
we'd set an idle flag which the `watch` function would wait
on before cleaning up. All well and good, but then again,
it's not like any of that memory *even exists* when Darwin
calls into it after we've asked it to stop and left.
There's a minimal-ish reproducer in /etc/wip-fsevents-issue.
"Worse is better."
2.
We want to handle any outstanding events before closing,
so we flush the event stream before stopping it.
`FSEventStreamInvalidate()` only needs to be called
if we scheduled via `FSEventStreamScheduleWithRunLoop()`.
That scheduling function is deprecated (as of macOS 13).
Calling `FSEventStreamInvalidate()` fails an assertion
and produces a warning in the console. However, calling
`FSEventStreamRelease()` without first invalidating via
`FSEventStreamInvalidate()` *also* fails an assertion,
and produces a warning. I'm not sure what the right call
to make here is.
Bugs, footguns --
The order of flush -> stop -> purge -> invalidate is
extremely sensitive. Purging events *after* stopping
the stream is necessary for some reason. Likely a bug
on Darwin; Should be f'ing stopped.
Bug on Darwin: The system may call the FSEvent stream's
associated callback even after we've stopped the stream.
Only seems to happen when many thousands of events are
being generator for watchers with a very short lifetime.
I don't know what we can do about it. We've tried a mutex
which locks during the context's lifetime, but it's always
released here, and it complicates checks within the event
loop because we're reading into the memory of a dangling
mutex (owned within the context object) because this scope
has been left, and the context no longer exists. Similar
issues cropped up when we went for atomic reference vars,
owner_alive and borrower_alive, trying to leave this scope
only when both were false. Very transactional, and doomed
ultimately with the same issues as the mutex; The context
itself does not exist. A slew of other errors and UB come
from the system calling on a non-existent object. In our
case, the set of seen-created paths may need to allocate
and deallocate. That is not going to end well when the
system betrays us.
The issue being addressed is a rare use, by FSEvents, of
the context we give it, after the FSEvent stream has been
released and invalidated. The issue is probably within the
FSEvents system, or maybe dispatch, probably not with us.
Which is why a transactional lifetime on the context we own,
lent to FSEvents, does not work.
The only semi-reliable way of synchronizing the (should
be f'ing closed) stream is to maintain the order of our
calls to flush, stop, purge and invalidate. It's not clear
if flushing has much of an effect at all here, especially
before stopping the stream. */
inline auto close_event_stream(FSEventStreamRef s) noexcept -> bool
`FSEventStreamInvalidate()` also fails an assertion and
produces a warning. Releasing seems safer than not, so
we'll do that.
*/
inline auto close_event_stream(FSEventStreamRef stream, ContextData& ctx)
-> bool
{
if (s) {
FSEventStreamFlushSync(s);
FSEventStreamStop(s);
auto event_id = FSEventsGetCurrentEventId();
auto device = FSEventStreamGetDeviceBeingWatched(s);
FSEventsPurgeEventsForDeviceUpToEventId(device, event_id);
FSEventStreamInvalidate(s);
FSEventStreamRelease(s);
s = nullptr;
return true;
}
else
return false;
if (! stream) return false;
auto _ = std::scoped_lock{*ctx.mtx};
FSEventStreamFlushSync(stream);
FSEventStreamStop(stream);
FSEventStreamInvalidate(stream);
FSEventStreamRelease(stream);
stream = nullptr;
return true;
}

} /* namespace */

/* Lifetimes --
We *must* ensure that the queue, context and callback
are alive *at least* until we close the event stream.
We don't really have unique ownership of these resources.
There used to be a shared pointer between us and the system,
but there appeared to be a rare issue with the reference
counts expiring while the object should have still been
alive and in use by the kernel. I witnessed this bevahvior
when running highly concurrent performance tests with many
thousands of events. There may have been another factor.
For now, ensuring that our resources live for long enough
by hand with a "uniquely" owned object works well. */
/* Lifetimes
We will ensure that the queue, context and callback
are alive at least until we close the event stream.
They are shared between us and the system. The system
is self-inconsistent but well-meaning. Sometimes, Apple
will use our resources after we've asked it not to. I
consider that a bug somewhere in FSEvents, Dispatch or
maybe something deeper. There's a minimal-ish reproducer
in the `/etc/wip-fsevents-issue` directory. */
inline auto watch(
std::filesystem::path const& path,
::wtr::watcher::event::callback const& callback,
semabin const& is_living) noexcept -> bool
::wtr::watcher::event::callback const& cb,
semabin const& is_living) -> bool
{
auto queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
auto seen_created_paths = ctx_type::pathset{};
auto last_rename_path = ctx_type::fspath{};
auto ctx = ctx_type{callback, &seen_created_paths, &last_rename_path};
static auto queue = dispatch_get_global_queue(QOS_CLASS_USER_INITIATED, 0);
auto seen_created_paths = ContextData::pathset{};
auto last_rename_path = ContextData::fspath{};
auto mtx = std::mutex{};
auto ctx = ContextData{cb, &seen_created_paths, &last_rename_path, &mtx};

auto fsevs = open_event_stream(path, queue, &ctx);

auto state_ok = wait(is_living) == semabin::released;
auto close_ok = close_event_stream(fsevs);
auto close_ok = close_event_stream(fsevs, ctx);
return state_ok && close_ok;
}

Expand Down
Loading

0 comments on commit c8bdffe

Please sign in to comment.