Skip to content

Commit

Permalink
feat: adding support for Reply Rpc on server side
Browse files Browse the repository at this point in the history
Server can now send Reply Rpc to client and wait for them to reply. This is only allowed when an RPC targets 1 player

Also removing ServerRpcMessage, RpcMessage can be used for both server and client rpcs, having duplicate message for each doesn't seem required

BREAKING CHANGE: Rpc messages renamed
  • Loading branch information
James-Frowen committed Jun 17, 2023
1 parent 5067a5c commit dc56f27
Show file tree
Hide file tree
Showing 30 changed files with 868 additions and 504 deletions.
71 changes: 4 additions & 67 deletions Assets/Mirage/Runtime/ClientObjectManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@

namespace Mirage
{

[AddComponentMenu("Network/ClientObjectManager")]
[DisallowMultipleComponent]
public class ClientObjectManager : MonoBehaviour
{
private static readonly ILogger logger = LogFactory.GetLogger(typeof(ClientObjectManager));

internal RpcHandler _rpcHandler;

private NetworkClient _client;
public NetworkClient Client => _client;

Expand Down Expand Up @@ -91,8 +94,6 @@ internal void RegisterHostHandlers()
_client.MessageHandler.RegisterHandler<SpawnMessage>(OnHostClientSpawn);
_client.MessageHandler.RegisterHandler<RemoveAuthorityMessage>(OnRemoveAuthority);
_client.MessageHandler.RegisterHandler<RemoveCharacterMessage>(OnRemoveCharacter);
_client.MessageHandler.RegisterHandler<ServerRpcReply>(msg => { });
_client.MessageHandler.RegisterHandler<RpcMessage>(msg => { });
}

internal void RegisterMessageHandlers()
Expand All @@ -102,8 +103,7 @@ internal void RegisterMessageHandlers()
_client.MessageHandler.RegisterHandler<SpawnMessage>(OnSpawn);
_client.MessageHandler.RegisterHandler<RemoveAuthorityMessage>(OnRemoveAuthority);
_client.MessageHandler.RegisterHandler<RemoveCharacterMessage>(OnRemoveCharacter);
_client.MessageHandler.RegisterHandler<ServerRpcReply>(OnServerRpcReply);
_client.MessageHandler.RegisterHandler<RpcMessage>(OnRpcMessage);
_rpcHandler = new RpcHandler(_client.MessageHandler, _client.World, RpcInvokeType.ClientRpc);
}

private bool ConsiderForSpawning(NetworkIdentity identity)
Expand Down Expand Up @@ -701,29 +701,6 @@ internal void OnHostClientSpawn(SpawnMessage msg)
}
}

internal void OnRpcMessage(RpcMessage msg)
{
if (logger.LogEnabled()) logger.Log($"ClientScene.OnRPCMessage index:{msg.FunctionIndex} netId:{msg.NetId}");

if (!_client.World.TryGetIdentity(msg.NetId, out var identity))
{
if (logger.WarnEnabled()) logger.LogWarning($"Spawned object not found when handling ClientRpc message [netId={msg.NetId}]");
return;
}

var remoteCall = identity.RemoteCallCollection.GetAbsolute(msg.FunctionIndex);

if (remoteCall.InvokeType != RpcInvokeType.ClientRpc)
{
throw new MethodInvocationException($"Invalid RPC call with index {msg.FunctionIndex}");
}

using (var reader = NetworkReaderPool.GetReader(msg.Payload, _client.World))
{
remoteCall.Invoke(reader, null, 0);
}
}

private void CheckForLocalPlayer(NetworkIdentity identity)
{
if (identity && identity == _client.Player?.Identity)
Expand All @@ -735,46 +712,6 @@ private void CheckForLocalPlayer(NetworkIdentity identity)
}
}

private void OnServerRpcReply(INetworkPlayer player, ServerRpcReply reply)
{
// find the callback that was waiting for this and invoke it.
if (_callbacks.TryGetValue(reply.ReplyId, out var action))
{
_callbacks.Remove(_replyId);
using (var reader = NetworkReaderPool.GetReader(reply.Payload, _client.World))
{
action.Invoke(reader);
}
}
else
{
throw new MethodAccessException("Received reply but no handler was registered");
}
}

private readonly Dictionary<int, Action<NetworkReader>> _callbacks = new Dictionary<int, Action<NetworkReader>>();
private int _replyId;

/// <summary>
/// Creates a task that waits for a reply from the server
/// </summary>
/// <typeparam name="T"></typeparam>
/// <returns>the task that will be completed when the result is in, and the id to use in the request</returns>
internal (UniTask<T> task, int replyId) CreateReplyTask<T>()
{
var newReplyId = _replyId++;
var completionSource = AutoResetUniTaskCompletionSource<T>.Create();
void Callback(NetworkReader reader)
{
var result = reader.Read<T>();
completionSource.TrySetResult(result);
}

_callbacks.Add(newReplyId, Callback);
return (completionSource.Task, newReplyId);
}




#if UNITY_EDITOR
Expand Down
48 changes: 0 additions & 48 deletions Assets/Mirage/Runtime/Messages.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

namespace Mirage
{
#region Public System Messages

/// <summary>
/// Sent to client to mark their scene as not ready
/// <para>Client can sent <see cref="SceneReadyMessage"/> once its scene is ready again</para>
Expand All @@ -32,50 +30,7 @@ public struct SceneMessage
[NetworkMessage]
public struct SceneReadyMessage { }

#endregion

#region System Messages required for code gen path
[NetworkMessage]
public struct ServerRpcMessage
{
public uint NetId;
public int FunctionIndex;

// the parameters for the Cmd function
// -> ArraySegment to avoid unnecessary allocations
public ArraySegment<byte> Payload;
}

[NetworkMessage]
public struct ServerRpcWithReplyMessage
{
public uint NetId;
public int FunctionIndex;

// if the server Rpc can return values
// this then a ServerRpcReply will be sent with this id
public int ReplyId;

public ArraySegment<byte> Payload;
}

[NetworkMessage]
public struct ServerRpcReply
{
public int ReplyId;
public ArraySegment<byte> Payload;
}

[NetworkMessage]
public struct RpcMessage
{
public uint NetId;
public int FunctionIndex;
public ArraySegment<byte> Payload;
}
#endregion

#region Internal System Messages
[NetworkMessage]
public struct SpawnMessage
{
Expand Down Expand Up @@ -130,9 +85,7 @@ public override string ToString()
else
authStr = "Remote";


return $"SpawnMessage[NetId:{NetId},{spawnIDStr},Authority:{authStr},{SpawnValues},Payload:{Payload.Count}bytes]";

}
}

Expand Down Expand Up @@ -232,5 +185,4 @@ public struct NetworkPongMessage
public double ClientTime;
public double ServerTime;
}
#endregion
}
42 changes: 35 additions & 7 deletions Assets/Mirage/Runtime/RemoteCalls/ClientRpcSender.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.ComponentModel;
using System.Runtime.CompilerServices;
using Cysharp.Threading.Tasks;
using Mirage.Logging;
using Mirage.Serialization;
using UnityEngine;
Expand All @@ -14,6 +15,8 @@ public static class ClientRpcSender
public static void Send(NetworkBehaviour behaviour, int relativeIndex, NetworkWriter writer, Channel channelId, bool excludeOwner)
{
var index = behaviour.Identity.RemoteCallCollection.GetIndexOffset(behaviour) + relativeIndex;
Validate(behaviour, index);

var message = CreateMessage(behaviour, index, writer);

// The public facing parameter is excludeOwner in [ClientRpc]
Expand All @@ -25,28 +28,53 @@ public static void Send(NetworkBehaviour behaviour, int relativeIndex, NetworkWr
public static void SendTarget(NetworkBehaviour behaviour, int relativeIndex, NetworkWriter writer, Channel channelId, INetworkPlayer player)
{
var index = behaviour.Identity.RemoteCallCollection.GetIndexOffset(behaviour) + relativeIndex;
Validate(behaviour, index);

var message = CreateMessage(behaviour, index, writer);

player = GetTarget(behaviour, player);

player.Send(message, channelId);
}

public static UniTask<T> SendTargetWithReturn<T>(NetworkBehaviour behaviour, int relativeIndex, NetworkWriter writer, INetworkPlayer player)
{
var index = behaviour.Identity.RemoteCallCollection.GetIndexOffset(behaviour) + relativeIndex;
Validate(behaviour, index);

(var task, var id) = behaviour.ServerObjectManager._rpcHandler.CreateReplyTask<T>();
var message = new RpcWithReplyMessage
{
NetId = behaviour.NetId,
FunctionIndex = index,
ReplyId = id,
Payload = writer.ToArraySegment()
};

player = GetTarget(behaviour, player);

// reply rpcs are always reliable
player.Send(message, Channel.Reliable);

return task;
}

private static INetworkPlayer GetTarget(NetworkBehaviour behaviour, INetworkPlayer player)
{
// player parameter is optional. use owner if null
if (player == null)
{
player = behaviour.Owner;
}

// if still null throw to give useful error
if (player == null)
{
throw new InvalidOperationException("Player target was null for Rpc");
}

player.Send(message, channelId);
return player;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static RpcMessage CreateMessage(NetworkBehaviour behaviour, int index, NetworkWriter writer)
{
Validate(behaviour, index);

var message = new RpcMessage
{
NetId = behaviour.NetId,
Expand Down
6 changes: 3 additions & 3 deletions Assets/Mirage/Runtime/RemoteCalls/RemoteCallHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void Register(int index, string name, bool cmdRequireAuthority, RpcInvoke
}
}

public void RegisterRequest<T>(int index, string name, bool cmdRequireAuthority, NetworkBehaviour behaviour, RequestDelegate<T> func)
public void RegisterRequest<T>(int index, string name, bool cmdRequireAuthority, RpcInvokeType invokerType, NetworkBehaviour behaviour, RequestDelegate<T> func)
{
async UniTaskVoid Wrapper(NetworkBehaviour obj, NetworkReader reader, INetworkPlayer senderPlayer, int replyId)
{
Expand All @@ -66,7 +66,7 @@ async UniTaskVoid Wrapper(NetworkBehaviour obj, NetworkReader reader, INetworkPl
using (var writer = NetworkWriterPool.GetWriter())
{
writer.Write(result);
var serverRpcReply = new ServerRpcReply
var serverRpcReply = new RpcReply
{
ReplyId = replyId,
Payload = writer.ToArraySegment()
Expand All @@ -81,7 +81,7 @@ void CmdWrapper(NetworkBehaviour obj, NetworkReader reader, INetworkPlayer sende
Wrapper(obj, reader, senderPlayer, replyId).Forget();
}

Register(index, name, cmdRequireAuthority, RpcInvokeType.ServerRpc, behaviour, CmdWrapper);
Register(index, name, cmdRequireAuthority, invokerType, behaviour, CmdWrapper);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand Down

0 comments on commit dc56f27

Please sign in to comment.