Skip to content

Commit

Permalink
[Cast Streaming] Clean Up Remoting RPC Connection Init Messages
Browse files Browse the repository at this point in the history
This CL fixes timing related issues in the initialization of the
Renderer and DemuxerStream for beginning a remoting session.

Bug: b/209844961
Change-Id: I1639b980265960ee609605b06ae9e9577c3b6665
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/3635272
Commit-Queue: Ryan Keane <rwkeane@google.com>
Reviewed-by: Fabrice de Gans <fdegans@chromium.org>
Cr-Commit-Position: refs/heads/main@{#1002781}
  • Loading branch information
Ryan Keane authored and Chromium LUCI CQ committed May 12, 2022
1 parent 4453bd6 commit 558a50e
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 100 deletions.
118 changes: 75 additions & 43 deletions components/cast_streaming/browser/playback_command_dispatcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,13 @@ PlaybackCommandDispatcher::PlaybackCommandDispatcher(
mojo::Remote<media::mojom::Renderer> translators_renderer;
RegisterCommandSource(translators_renderer.BindNewPipeAndPassReceiver());

call_translator_ = std::make_unique<remoting::RendererRpcCallTranslator>(
std::move(translators_renderer));
auto message_processor_callback = base::BindRepeating(
&PlaybackCommandDispatcher::SendRemotingRpcMessageToRemote,
weak_factory_.GetWeakPtr());
renderer_call_translator_ =
std::make_unique<remoting::RendererRpcCallTranslator>(
std::move(message_processor_callback),
std::move(translators_renderer));
}

PlaybackCommandDispatcher::~PlaybackCommandDispatcher() {
Expand All @@ -55,32 +60,16 @@ void PlaybackCommandDispatcher::OnRemotingSessionNegotiated(
DCHECK(messenger);

messenger_ = messenger;
handle_ = messenger_->GetUniqueHandle();

// Include the |handle_| in the callback so that it will persist even upon
// re-negotiation.
auto message_processor_callback = base::BindPostTask(
task_runner_,
base::BindRepeating(
&PlaybackCommandDispatcher::SendRemotingRpcMessageToRemote,
weak_factory_.GetWeakPtr(), handle_),
FROM_HERE);
call_translator_->SetMessageProcessor(std::move(message_processor_callback));

auto message_receiver_callback = base::BindPostTask(
task_runner_,
base::BindRepeating(
&PlaybackCommandDispatcher::ProcessRemotingRpcMessageFromRemote,
weak_factory_.GetWeakPtr()),
FROM_HERE);
messenger_->RegisterMessageReceiverCallback(
handle_, [cb = std::move(message_receiver_callback)](
std::unique_ptr<openscreen::cast::RpcMessage> message) {
cb.Run(std::move(message));
});
RegisterHandleForCallbacks(
openscreen::cast::RpcMessenger::kAcquireRendererHandle);
RegisterHandleForCallbacks(
openscreen::cast::RpcMessenger::kAcquireDemuxerHandle);

renderer_call_translator_->set_handle(AcquireHandle());
demuxer_stream_handler_ = std::make_unique<remoting::RpcDemuxerStreamHandler>(
this, messenger_,
this,
base::BindRepeating(&PlaybackCommandDispatcher::AcquireHandle,
base::Unretained(this)),
base::BindRepeating(
&PlaybackCommandDispatcher::SendRemotingRpcMessageToRemote,
base::Unretained(this)));
Expand Down Expand Up @@ -142,16 +131,14 @@ void PlaybackCommandDispatcher::ConfigureRemotingAsync(

void PlaybackCommandDispatcher::OnRemotingSessionEnded() {
demuxer_stream_handler_.reset();
if (messenger_) {
messenger_->UnregisterMessageReceiverCallback(handle_);
messenger_ = nullptr;
}
messenger_ = nullptr;
streaming_init_info_ = absl::nullopt;
}

void PlaybackCommandDispatcher::SendRemotingRpcMessageToRemote(
openscreen::cast::RpcMessenger::Handle handle,
std::unique_ptr<openscreen::cast::RpcMessage> message) {
DCHECK_NE(handle, openscreen::cast::RpcMessenger::kInvalidHandle);
DCHECK(message);
DCHECK(task_runner_->RunsTasksInCurrentSequence());

Expand All @@ -175,8 +162,9 @@ void PlaybackCommandDispatcher::ProcessRemotingRpcMessageFromRemote(
}

const bool did_dispatch_as_renderer_call =
call_translator_ &&
remoting::DispatchRendererRpcCall(message.get(), call_translator_.get());
renderer_call_translator_ &&
remoting::DispatchRendererRpcCall(message.get(),
renderer_call_translator_.get());
if (did_dispatch_as_renderer_call) {
return;
}
Expand All @@ -192,6 +180,29 @@ void PlaybackCommandDispatcher::ProcessRemotingRpcMessageFromRemote(
LOG(ERROR) << "Unhandled RPC Message for command " << message->proc();
}

openscreen::cast::RpcMessenger::Handle
PlaybackCommandDispatcher::AcquireHandle() {
DCHECK(messenger_);
auto handle = messenger_->GetUniqueHandle();
RegisterHandleForCallbacks(handle);
return handle;
}

void PlaybackCommandDispatcher::RegisterHandleForCallbacks(
openscreen::cast::RpcMessenger::Handle handle) {
DCHECK(messenger_);
messenger_->RegisterMessageReceiverCallback(
handle, [ptr = weak_factory_.GetWeakPtr()](
std::unique_ptr<openscreen::cast::RpcMessage> message) {
if (!ptr) {
DVLOG(1)
<< "Message receiver has been invalidated. Dropping message.";
return;
}
ptr->ProcessRemotingRpcMessageFromRemote(std::move(message));
});
}

void PlaybackCommandDispatcher::OnSetPlaybackControllerDone() {
has_set_playback_controller_call_returned_ = true;

Expand All @@ -201,7 +212,11 @@ void PlaybackCommandDispatcher::OnSetPlaybackControllerDone() {
}

void PlaybackCommandDispatcher::RpcAcquireRendererAsync(AcquireRendererCB cb) {
acquire_renderer_cb_ = base::BindOnce(std::move(cb), handle_);
DCHECK(renderer_call_translator_);
const auto handle = renderer_call_translator_->handle();

DCHECK_NE(handle, openscreen::cast::RpcMessenger::kInvalidHandle);
acquire_renderer_cb_ = base::BindOnce(std::move(cb), handle);

if (has_set_playback_controller_call_returned_) {
std::move(acquire_renderer_cb_).Run();
Expand All @@ -228,12 +243,7 @@ void PlaybackCommandDispatcher::OnNewAudioConfig(
}

streaming_init_info_->audio_stream_info->config = std::move(config);
if (!streaming_init_info_->video_stream_info ||
!streaming_init_info_->video_stream_info->config.Matches(
media::VideoDecoderConfig())) {
// |streaming_init_info_| is intentionally copied here.
streaming_dispatcher_->StartStreamingSession(streaming_init_info_.value());
}
MaybeStartStreamingSession();
}

void PlaybackCommandDispatcher::OnNewVideoConfig(
Expand All @@ -247,12 +257,34 @@ void PlaybackCommandDispatcher::OnNewVideoConfig(
}

streaming_init_info_->video_stream_info->config = std::move(config);
if (!streaming_init_info_->audio_stream_info ||
MaybeStartStreamingSession();
}

void PlaybackCommandDispatcher::MaybeStartStreamingSession() {
DCHECK(streaming_init_info_);
const bool is_audio_config_ready =
!streaming_init_info_->audio_stream_info ||
!streaming_init_info_->audio_stream_info->config.Matches(
media::AudioDecoderConfig())) {
// |streaming_init_info_| is intentionally copied here.
streaming_dispatcher_->StartStreamingSession(streaming_init_info_.value());
media::AudioDecoderConfig());
const bool is_video_config_ready =
!streaming_init_info_->video_stream_info ||
!streaming_init_info_->video_stream_info->config.Matches(
media::VideoDecoderConfig());
if (!is_audio_config_ready || !is_video_config_ready) {
return;
}

DCHECK(demuxer_stream_handler_);
if (streaming_init_info_->audio_stream_info) {
demuxer_stream_handler_->RequestMoreAudioBuffers();
}
if (streaming_init_info_->video_stream_info) {
demuxer_stream_handler_->RequestMoreVideoBuffers();
}

// |streaming_init_info_| is intentionally copied here.
DCHECK(streaming_dispatcher_);
streaming_dispatcher_->StartStreamingSession(streaming_init_info_.value());
}

} // namespace cast_streaming
15 changes: 13 additions & 2 deletions components/cast_streaming/browser/playback_command_dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,17 @@ class PlaybackCommandDispatcher
void ProcessRemotingRpcMessageFromRemote(
std::unique_ptr<openscreen::cast::RpcMessage> message);

// Acquires a new handle from |messenger_|.
openscreen::cast::RpcMessenger::Handle AcquireHandle();

// Registers a |handle| with |messenger_| to receive callbacks to
// ProcessRemotingRpcMessageFromRemote().
void RegisterHandleForCallbacks(
openscreen::cast::RpcMessenger::Handle handle);

// Starts streaming if each expected audio or video config has been received.
void MaybeStartStreamingSession();

// Callback for mojom::RendererController::SetPlaybackController() call.
void OnSetPlaybackControllerDone();

Expand All @@ -90,14 +101,14 @@ class PlaybackCommandDispatcher
base::OnceCallback<void()> acquire_renderer_cb_;

openscreen::cast::RpcMessenger* messenger_;
openscreen::cast::RpcMessenger::Handle handle_;

// Multiplexes Renderer commands from a number of senders.
std::unique_ptr<RendererControlMultiplexer> muxer_;

// Handles translating between Remoting commands (in proto form) and mojo
// commands.
std::unique_ptr<remoting::RendererRpcCallTranslator> call_translator_;
std::unique_ptr<remoting::RendererRpcCallTranslator>
renderer_call_translator_;

// Handles DemuxerStream interactions.
std::unique_ptr<remoting::RpcDemuxerStreamHandler> demuxer_stream_handler_;
Expand Down
54 changes: 29 additions & 25 deletions components/cast_streaming/browser/renderer_rpc_call_translator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,33 @@
namespace cast_streaming::remoting {

RendererRpcCallTranslator::RendererRpcCallTranslator(
RpcMessageProcessor processor,
mojo::Remote<media::mojom::Renderer> remote_renderer)
: renderer_client_receiver_(this),
: message_processor_(std::move(processor)),
renderer_client_receiver_(this),
renderer_remote_(std::move(remote_renderer)),
weak_factory_(this) {}

RendererRpcCallTranslator::~RendererRpcCallTranslator() = default;

void RendererRpcCallTranslator::SetMessageProcessor(
RpcMessageProcessor processor) {
message_processor_ = std::move(processor);
}

void RendererRpcCallTranslator::OnRpcInitialize() {
renderer_remote_->Initialize(
renderer_client_receiver_.BindNewEndpointAndPassRemote(),
/* streams */ {}, /* media_url_params */ nullptr,
base::BindOnce(&RendererRpcCallTranslator::OnInitializeCompleted,
weak_factory_.GetWeakPtr(), message_processor_));
if (!has_been_initialized_) {
has_been_initialized_ = true;
renderer_remote_->Initialize(
renderer_client_receiver_.BindNewEndpointAndPassRemote(),
/* streams */ {}, /* media_url_params */ nullptr,
base::BindOnce(&RendererRpcCallTranslator::OnInitializeCompleted,
weak_factory_.GetWeakPtr(), handle_));
} else {
OnInitializeCompleted(handle_, true);
}
}

void RendererRpcCallTranslator::OnRpcFlush(uint32_t audio_count,
uint32_t video_count) {
renderer_remote_->Flush(
base::BindOnce(&RendererRpcCallTranslator::OnFlushCompleted,
weak_factory_.GetWeakPtr(), message_processor_));
weak_factory_.GetWeakPtr(), handle_));
}

void RendererRpcCallTranslator::OnRpcStartPlayingFrom(base::TimeDelta time) {
Expand All @@ -52,58 +54,60 @@ void RendererRpcCallTranslator::OnRpcSetVolume(double volume) {
void RendererRpcCallTranslator::OnTimeUpdate(base::TimeDelta media_time,
base::TimeDelta max_time,
base::TimeTicks capture_time) {
message_processor_.Run(CreateMessageForMediaTimeUpdate(media_time));
message_processor_.Run(handle_, CreateMessageForMediaTimeUpdate(media_time));
}

void RendererRpcCallTranslator::OnBufferingStateChange(
media::BufferingState state,
media::BufferingStateChangeReason reason) {
message_processor_.Run(CreateMessageForBufferingStateChange(state));
message_processor_.Run(handle_, CreateMessageForBufferingStateChange(state));
}

void RendererRpcCallTranslator::OnError(const media::PipelineStatus& status) {
message_processor_.Run(CreateMessageForError());
message_processor_.Run(handle_, CreateMessageForError());
}

void RendererRpcCallTranslator::OnEnded() {
message_processor_.Run(CreateMessageForMediaEnded());
message_processor_.Run(handle_, CreateMessageForMediaEnded());
}

void RendererRpcCallTranslator::OnAudioConfigChange(
const media::AudioDecoderConfig& config) {
message_processor_.Run(CreateMessageForAudioConfigChange(config));
message_processor_.Run(handle_, CreateMessageForAudioConfigChange(config));
}

void RendererRpcCallTranslator::OnVideoConfigChange(
const media::VideoDecoderConfig& config) {
message_processor_.Run(CreateMessageForVideoConfigChange(config));
message_processor_.Run(handle_, CreateMessageForVideoConfigChange(config));
}

void RendererRpcCallTranslator::OnVideoNaturalSizeChange(
const gfx::Size& size) {
message_processor_.Run(CreateMessageForVideoNaturalSizeChange(size));
message_processor_.Run(handle_, CreateMessageForVideoNaturalSizeChange(size));
}

void RendererRpcCallTranslator::OnVideoOpacityChange(bool opaque) {
message_processor_.Run(CreateMessageForVideoOpacityChange(opaque));
message_processor_.Run(handle_, CreateMessageForVideoOpacityChange(opaque));
}

void RendererRpcCallTranslator::OnStatisticsUpdate(
const media::PipelineStatistics& stats) {
message_processor_.Run(CreateMessageForStatisticsUpdate(stats));
message_processor_.Run(handle_, CreateMessageForStatisticsUpdate(stats));
}

void RendererRpcCallTranslator::OnWaiting(media::WaitingReason reason) {}

void RendererRpcCallTranslator::OnInitializeCompleted(
RpcMessageProcessor processor,
openscreen::cast::RpcMessenger::Handle handle_at_time_of_sending,
bool success) {
message_processor_.Run(CreateMessageForInitializationComplete(success));
message_processor_.Run(handle_at_time_of_sending,
CreateMessageForInitializationComplete(success));
}

void RendererRpcCallTranslator::OnFlushCompleted(
RpcMessageProcessor processor) {
message_processor_.Run(CreateMessageForFlushComplete());
openscreen::cast::RpcMessenger::Handle handle_at_time_of_sending) {
message_processor_.Run(handle_at_time_of_sending,
CreateMessageForFlushComplete());
}

} // namespace cast_streaming::remoting

0 comments on commit 558a50e

Please sign in to comment.