Skip to content

Commit

Permalink
Made responses to platform methods threadsafe in linux
Browse files Browse the repository at this point in the history
  • Loading branch information
gaaclarke committed Nov 18, 2022
1 parent 35ecb2b commit fe44841
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 33 deletions.
64 changes: 32 additions & 32 deletions shell/platform/linux/fl_binary_messenger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ G_DEFINE_INTERFACE(FlBinaryMessenger, fl_binary_messenger, G_TYPE_OBJECT)
struct _FlBinaryMessengerImpl {
GObject parent_instance;

FlEngine* engine;
GWeakRef engine;

// PlatformMessageHandler keyed by channel name.
GHashTable* platform_message_handlers;
Expand Down Expand Up @@ -81,7 +81,9 @@ static void fl_binary_messenger_response_handle_impl_dispose(GObject* object) {
FlBinaryMessengerResponseHandleImpl* self =
FL_BINARY_MESSENGER_RESPONSE_HANDLE_IMPL(object);

if (self->response_handle != nullptr && self->messenger->engine != nullptr) {
g_autoptr(FlEngine) engine =
FL_ENGINE(g_weak_ref_get(&self->messenger->engine));
if (self->response_handle != nullptr && engine != nullptr) {
g_critical("FlBinaryMessengerResponseHandle was not responded to");
}

Expand Down Expand Up @@ -141,19 +143,6 @@ static void platform_message_handler_free(gpointer data) {
g_free(self);
}

static void engine_weak_notify_cb(gpointer user_data,
GObject* where_the_object_was) {
FlBinaryMessengerImpl* self = FL_BINARY_MESSENGER_IMPL(user_data);
self->engine = nullptr;

// Disconnect any handlers.
// Take the reference in case a handler tries to modify this table.
g_autoptr(GHashTable) handlers = self->platform_message_handlers;
self->platform_message_handlers = g_hash_table_new_full(
g_str_hash, g_str_equal, g_free, platform_message_handler_free);
g_hash_table_remove_all(handlers);
}

static gboolean fl_binary_messenger_platform_message_cb(
FlEngine* engine,
const gchar* channel,
Expand All @@ -179,11 +168,7 @@ static gboolean fl_binary_messenger_platform_message_cb(

static void fl_binary_messenger_impl_dispose(GObject* object) {
FlBinaryMessengerImpl* self = FL_BINARY_MESSENGER_IMPL(object);

if (self->engine != nullptr) {
g_object_weak_unref(G_OBJECT(self->engine), engine_weak_notify_cb, self);
self->engine = nullptr;
}
g_weak_ref_clear(&self->engine);

g_clear_pointer(&self->platform_message_handlers, g_hash_table_unref);

Expand All @@ -199,7 +184,8 @@ static void set_message_handler_on_channel(
FlBinaryMessengerImpl* self = FL_BINARY_MESSENGER_IMPL(messenger);

// Don't set handlers if engine already gone.
if (self->engine == nullptr) {
g_autoptr(FlEngine) engine = FL_ENGINE(g_weak_ref_get(&self->engine));
if (engine == nullptr) {
if (handler != nullptr) {
g_warning(
"Attempted to set message handler on an FlBinaryMessenger without an "
Expand All @@ -220,6 +206,12 @@ static void set_message_handler_on_channel(
}
}

gboolean do_unref(gpointer value) {
g_object_unref(value);
return G_SOURCE_REMOVE;
}

// Note: This function can be called from any thread.
static gboolean send_response(FlBinaryMessenger* messenger,
FlBinaryMessengerResponseHandle* response_handle_,
GBytes* response,
Expand All @@ -233,21 +225,27 @@ static gboolean send_response(FlBinaryMessenger* messenger,
g_return_val_if_fail(response_handle->messenger == self, FALSE);
g_return_val_if_fail(response_handle->response_handle != nullptr, FALSE);

if (self->engine == nullptr) {
FlEngine* engine = FL_ENGINE(g_weak_ref_get(&self->engine));
if (engine == nullptr) {
return TRUE;
}

gboolean result = false;
if (response_handle->response_handle == nullptr) {
g_set_error(
error, FL_BINARY_MESSENGER_ERROR,
FL_BINARY_MESSENGER_ERROR_ALREADY_RESPONDED,
"Attempted to respond to a message that is already responded to");
return FALSE;
result = FALSE;
} else {
result = fl_engine_send_platform_message_response(
engine, response_handle->response_handle, response, error);
response_handle->response_handle = nullptr;
}

gboolean result = fl_engine_send_platform_message_response(
self->engine, response_handle->response_handle, response, error);
response_handle->response_handle = nullptr;
// This guarantees that the dispose method for the engine is executed
// on the platform thread in the rare chance this is the last ref.
g_idle_add(do_unref, engine);

return result;
}
Expand All @@ -267,12 +265,13 @@ static void send_on_channel(FlBinaryMessenger* messenger,
gpointer user_data) {
FlBinaryMessengerImpl* self = FL_BINARY_MESSENGER_IMPL(messenger);

if (self->engine == nullptr) {
g_autoptr(FlEngine) engine = FL_ENGINE(g_weak_ref_get(&self->engine));
if (engine == nullptr) {
return;
}

fl_engine_send_platform_message(
self->engine, channel, message, cancellable,
engine, channel, message, cancellable,
callback != nullptr ? platform_message_ready_cb : nullptr,
callback != nullptr ? g_task_new(self, cancellable, callback, user_data)
: nullptr);
Expand All @@ -287,11 +286,12 @@ static GBytes* send_on_channel_finish(FlBinaryMessenger* messenger,
g_autoptr(GTask) task = G_TASK(result);
GAsyncResult* r = G_ASYNC_RESULT(g_task_propagate_pointer(task, nullptr));

if (self->engine == nullptr) {
g_autoptr(FlEngine) engine = FL_ENGINE(g_weak_ref_get(&self->engine));
if (engine == nullptr) {
return nullptr;
}

return fl_engine_send_platform_message_finish(self->engine, r, error);
return fl_engine_send_platform_message_finish(engine, r, error);
}

static void fl_binary_messenger_impl_class_init(
Expand Down Expand Up @@ -321,8 +321,7 @@ FlBinaryMessenger* fl_binary_messenger_new(FlEngine* engine) {
// Added to stop compiler complaining about an unused function.
FL_IS_BINARY_MESSENGER_IMPL(self);

self->engine = engine;
g_object_weak_ref(G_OBJECT(engine), engine_weak_notify_cb, self);
g_weak_ref_init(&self->engine, G_OBJECT(engine));

fl_engine_set_platform_message_handler(
engine, fl_binary_messenger_platform_message_cb, self, NULL);
Expand All @@ -343,6 +342,7 @@ G_MODULE_EXPORT void fl_binary_messenger_set_message_handler_on_channel(
self, channel, handler, user_data, destroy_notify);
}

// Note: This function can be called from any thread.
G_MODULE_EXPORT gboolean fl_binary_messenger_send_response(
FlBinaryMessenger* self,
FlBinaryMessengerResponseHandle* response_handle,
Expand Down
76 changes: 76 additions & 0 deletions shell/platform/linux/fl_binary_messenger_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// Included first as it collides with the X11 headers.
#include "gtest/gtest.h"

#include <pthread.h>
#include <cstring>

#include "flutter/shell/platform/linux/fl_binary_messenger_private.h"
Expand Down Expand Up @@ -384,3 +385,78 @@ TEST(FlBinaryMessengerTest, ReceiveMessage) {
// Blocks here until response_cb is called.
g_main_loop_run(loop);
}

struct RespondsOnBackgroundThreadInfo {
FlBinaryMessenger* messenger;
FlBinaryMessengerResponseHandle* response_handle;
GMainLoop* loop;
};

static gboolean cleanup_responds_on_background_thread_info(gpointer user_data) {
RespondsOnBackgroundThreadInfo* info =
static_cast<RespondsOnBackgroundThreadInfo*>(user_data);
GMainLoop* loop = info->loop;

g_object_unref(info->messenger);
g_object_unref(info->response_handle);
free(info);

g_main_loop_quit(static_cast<GMainLoop*>(loop));

return G_SOURCE_REMOVE;
}

static void* response_from_thread_main(void* user_data) {
RespondsOnBackgroundThreadInfo* info =
static_cast<RespondsOnBackgroundThreadInfo*>(user_data);

fl_binary_messenger_send_response(info->messenger, info->response_handle,
nullptr, nullptr);

g_idle_add(cleanup_responds_on_background_thread_info, info);

return nullptr;
}

static void response_from_thread_cb(
FlBinaryMessenger* messenger,
const gchar* channel,
GBytes* message,
FlBinaryMessengerResponseHandle* response_handle,
gpointer user_data) {
EXPECT_NE(message, nullptr);
pthread_t thread;
RespondsOnBackgroundThreadInfo* info =
static_cast<RespondsOnBackgroundThreadInfo*>(
malloc(sizeof(RespondsOnBackgroundThreadInfo)));
info->messenger = FL_BINARY_MESSENGER(g_object_ref(messenger));
info->response_handle =
FL_BINARY_MESSENGER_RESPONSE_HANDLE(g_object_ref(response_handle));
info->loop = static_cast<GMainLoop*>(user_data);
EXPECT_EQ(0,
pthread_create(&thread, nullptr, &response_from_thread_main, info));
}

TEST(FlBinaryMessengerTest, RespondOnBackgroundThread) {
g_autoptr(GMainLoop) loop = g_main_loop_new(nullptr, 0);

g_autoptr(FlEngine) engine = make_mock_engine();
FlBinaryMessenger* messenger = fl_binary_messenger_new(engine);

// Listen for messages from the engine.
fl_binary_messenger_set_message_handler_on_channel(
messenger, "test/messages", message_cb, nullptr, nullptr);

// Listen for response from the engine.
fl_binary_messenger_set_message_handler_on_channel(
messenger, "test/responses", response_from_thread_cb, loop, nullptr);

// Trigger the engine to send a message.
const char* text = "Marco!";
g_autoptr(GBytes) message = g_bytes_new(text, strlen(text));
fl_binary_messenger_send_on_channel(messenger, "test/send-message", message,
nullptr, nullptr, nullptr);

// Blocks here until response_cb is called.
g_main_loop_run(loop);
}
1 change: 1 addition & 0 deletions shell/platform/linux/fl_engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,7 @@ void fl_engine_set_on_pre_engine_restart_handler(
self->on_pre_engine_restart_handler_destroy_notify = destroy_notify;
}

// Note: This function can be called from any thread.
gboolean fl_engine_send_platform_message_response(
FlEngine* self,
const FlutterPlatformMessageResponseHandle* handle,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ void fl_binary_messenger_set_message_handler_on_channel(
* @error: (allow-none): #GError location to store the error occurring, or %NULL
* to ignore.
*
* Responds to a platform message.
* Responds to a platform message. This function is thread-safe.
*
* Returns: %TRUE on success.
*/
Expand Down

0 comments on commit fe44841

Please sign in to comment.