Skip to content

Commit

Permalink
再次简化消息池,除了网络部分序列化后自动回收消息以及反序列化从池中取消息,其它任何地方用不用池,回不回收都让用户决定
Browse files Browse the repository at this point in the history
还是参考C2G_Ping
  • Loading branch information
egametang committed Jan 12, 2024
1 parent 6a1c863 commit efcdb60
Show file tree
Hide file tree
Showing 12 changed files with 60 additions and 88 deletions.
6 changes: 3 additions & 3 deletions Unity/Assets/Resources/GlobalConfig.asset
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ MonoBehaviour:
m_Script: {fileID: 11500000, guid: 36527db572638af47b03c805671cba75, type: 3}
m_Name: GlobalConfig
m_EditorClassIdentifier:
CodeMode: 1
CodeMode: 3
BuildType: 1
AppType: 8
EPlayMode: 1
AppType: 7
EPlayMode: 0
6 changes: 3 additions & 3 deletions Unity/Assets/Scripts/Core/Fiber/Module/Actor/MessageHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ namespace ET
{
public static class MessageHelper
{
public static IResponse CreateResponse(IRequest iRequest, int error)
public static IResponse CreateResponse(Type requestType, int rpcId, int error)
{
Type responseType = OpcodeType.Instance.GetResponseType(iRequest.GetType());
Type responseType = OpcodeType.Instance.GetResponseType(requestType);
IResponse response = (IResponse)ObjectPool.Instance.Fetch(responseType);
response.Error = error;
response.RpcId = iRequest.RpcId;
response.RpcId = rpcId;
return response;
}
}
Expand Down
32 changes: 12 additions & 20 deletions Unity/Assets/Scripts/Core/Fiber/Module/Actor/MessageSenderStruct.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,43 +8,35 @@ public readonly struct MessageSenderStruct
{
public ActorId ActorId { get; }

public IRequest Request { get; }

private readonly bool IsFromPool;

public bool NeedException { get; }

public Type RequestType { get; }

private readonly ETTask<IResponse> tcs;

public MessageSenderStruct(ActorId actorId, IRequest iRequest, ETTask<IResponse> tcs, bool needException)
public bool NeedException { get; }

public MessageSenderStruct(ActorId actorId, Type requestType, bool needException)
{
this.ActorId = actorId;

this.Request = iRequest;
MessageObject messageObject = (MessageObject)this.Request;
this.IsFromPool = messageObject.IsFromPool;
messageObject.IsFromPool = false;
this.RequestType = requestType;

this.tcs = tcs;
this.tcs = ETTask<IResponse>.Create(true);
this.NeedException = needException;
}

public void SetResult(IResponse response)
{
MessageObject messageObject = (MessageObject)this.Request;
messageObject.IsFromPool = this.IsFromPool;
messageObject.Dispose();

this.tcs.SetResult(response);
}

public void SetException(Exception exception)
{
MessageObject messageObject = (MessageObject)this.Request;
messageObject.IsFromPool = this.IsFromPool;
messageObject.Dispose();

this.tcs.SetException(exception);
}

public async ETTask<IResponse> Wait()
{
return await this.tcs;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,9 @@ private static void HandleMessage(this ProcessInnerSender self, Fiber fiber, in
Log.Warning($"actor not found mailbox, from: {actorId} current: {fiber.Address} {message}");
if (message is IRequest request)
{
IResponse resp = MessageHelper.CreateResponse(request, ErrorCore.ERR_NotFoundActor);
IResponse resp = MessageHelper.CreateResponse(request.GetType(), request.RpcId, ErrorCore.ERR_NotFoundActor);
self.Reply(actorId.Address, resp);
}
message.Dispose();
return;
}
mailBoxComponent.Add(actorId.Address, message);
Expand All @@ -74,13 +73,13 @@ private static void Run(MessageSenderStruct self, IResponse response)
{
if (response.Error == ErrorCore.ERR_MessageTimeout)
{
self.SetException(new RpcException(response.Error, $"Rpc error: request, 注意Actor消息超时,请注意查看是否死锁或者没有reply: actorId: {self.ActorId} {self.Request}, response: {response}"));
self.SetException(new RpcException(response.Error, $"Rpc error: request, 注意Actor消息超时,请注意查看是否死锁或者没有reply: actorId: {self.ActorId} {self.RequestType.FullName}, response: {response}"));
return;
}

if (self.NeedException && ErrorCore.IsRpcNeedThrowException(response.Error))
{
self.SetException(new RpcException(response.Error, $"Rpc error: actorId: {self.ActorId} request: {self.Request}, response: {response}"));
self.SetException(new RpcException(response.Error, $"Rpc error: actorId: {self.ActorId} request: {self.RequestType.FullName}, response: {response}"));
return;
}

Expand Down Expand Up @@ -156,10 +155,10 @@ public static async ETTask<IResponse> Call(
{
throw new Exception($"actor inner process diff: {actorId.Process} {fiber.Process}");
}

var tcs = ETTask<IResponse>.Create(true);

self.requestCallback.Add(rpcId, new MessageSenderStruct(actorId, iRequest, tcs, needException));
Type requestType = iRequest.GetType();
MessageSenderStruct messageSenderStruct = new(actorId, requestType, needException);
self.requestCallback.Add(rpcId, messageSenderStruct);

self.SendInner(actorId, (MessageObject)iRequest);

Expand All @@ -175,11 +174,11 @@ async ETTask Timeout()

if (needException)
{
action.SetException(new Exception($"actor sender timeout: {iRequest}"));
action.SetException(new Exception($"actor sender timeout: {requestType.FullName}"));
}
else
{
IResponse response = MessageHelper.CreateResponse(iRequest, ErrorCore.ERR_Timeout);
IResponse response = MessageHelper.CreateResponse(requestType, rpcId, ErrorCore.ERR_Timeout);
action.SetResult(response);
}
}
Expand All @@ -188,14 +187,14 @@ async ETTask Timeout()

long beginTime = TimeInfo.Instance.ServerFrameTime();

IResponse response = await tcs;
IResponse response = await messageSenderStruct.Wait();

long endTime = TimeInfo.Instance.ServerFrameTime();

long costTime = endTime - beginTime;
if (costTime > 200)
{
Log.Warning($"actor rpc time > 200: {costTime} {iRequest}");
Log.Warning($"actor rpc time > 200: {costTime} {requestType.FullName}");
}

return response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ protected override async ETTask Run(Scene root, Room2C_CheckHashFail message)
Log.Debug($"check hash fail, client: {message.Frame} {clientWorld.ToJson()}");
}

message.Dispose();
await ETTask.CompletedTask;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ public abstract class MessageLocationHandler<E, Message>: IMHandler where E : En

public async ETTask Handle(Entity entity, Address fromAddress, MessageObject actorMessage)
{
using MessageObject _ = actorMessage;
Fiber fiber = entity.Fiber();
if (actorMessage is not Message message)
{
Expand Down Expand Up @@ -52,7 +51,6 @@ public async ETTask Handle(Entity entity, Address fromAddress, MessageObject act
{
try
{
using MessageObject _ = actorMessage;
Fiber fiber = entity.Fiber();
if (actorMessage is not Request request)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ private static async ETTask<IResponse> CallInner(this MessageLocationSenderOneTy
messageLocationSender.LastSendOrRecvTime = TimeInfo.Instance.ServerNow();

Scene root = self.Root();


Type requestType = iRequest.GetType();
while (true)
{
if (messageLocationSender.ActorId == default)
Expand All @@ -233,13 +234,13 @@ private static async ETTask<IResponse> CallInner(this MessageLocationSenderOneTy

if (messageLocationSender.ActorId == default)
{
return MessageHelper.CreateResponse(iRequest, ErrorCore.ERR_NotFoundActor);
return MessageHelper.CreateResponse(requestType, rpcId, ErrorCore.ERR_NotFoundActor);
}
IResponse response = await root.GetComponent<MessageSender>().Call(messageLocationSender.ActorId, rpcId, iRequest, needException: false);

if (messageLocationSender.InstanceId != instanceId)
{
throw new RpcException(ErrorCore.ERR_ActorLocationSenderTimeout3, $"{iRequest}");
throw new RpcException(ErrorCore.ERR_ActorLocationSenderTimeout3, $"{requestType.FullName}");
}

switch (response.Error)
Expand All @@ -250,7 +251,7 @@ private static async ETTask<IResponse> CallInner(this MessageLocationSenderOneTy
++failTimes;
if (failTimes > 20)
{
Log.Debug($"actor send message fail, actorid: {messageLocationSender.Id} {iRequest}");
Log.Debug($"actor send message fail, actorid: {messageLocationSender.Id} {requestType.FullName}");

// 这里删除actor,后面等待发送的消息会判断InstanceId,InstanceId不一致返回ERR_NotFoundActor
self.Remove(messageLocationSender.Id);
Expand All @@ -261,21 +262,21 @@ private static async ETTask<IResponse> CallInner(this MessageLocationSenderOneTy
await root.GetComponent<TimerComponent>().WaitAsync(500);
if (messageLocationSender.InstanceId != instanceId)
{
throw new RpcException(ErrorCore.ERR_ActorLocationSenderTimeout4, $"{iRequest}");
throw new RpcException(ErrorCore.ERR_ActorLocationSenderTimeout4, $"{requestType.FullName}");
}

messageLocationSender.ActorId = default;
continue;
}
case ErrorCore.ERR_MessageTimeout:
{
throw new RpcException(response.Error, $"{iRequest}");
throw new RpcException(response.Error, $"{requestType.FullName}");
}
}

if (ErrorCore.IsRpcNeedThrowException(response.Error))
{
throw new RpcException(response.Error, $"Message: {response.Message} Request: {iRequest}");
throw new RpcException(response.Error, $"Message: {response.Message} Request: {requestType.FullName}");
}

return response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,13 @@ private static void Run(MessageSenderStruct self, IResponse response)
{
if (response.Error == ErrorCore.ERR_MessageTimeout)
{
self.SetException(new RpcException(response.Error, $"Rpc error: request, 注意Actor消息超时,请注意查看是否死锁或者没有reply: actorId: {self.ActorId} {self.Request}, response: {response}"));
self.SetException(new RpcException(response.Error, $"Rpc error: request, 注意Actor消息超时,请注意查看是否死锁或者没有reply: actorId: {self.ActorId} {self.RequestType.FullName}, response: {response}"));
return;
}

if (self.NeedException && ErrorCore.IsRpcNeedThrowException(response.Error))
{
self.SetException(new RpcException(response.Error, $"Rpc error: actorId: {self.ActorId} request: {self.Request}, response: {response}"));
self.SetException(new RpcException(response.Error, $"Rpc error: actorId: {self.ActorId} request: {self.RequestType.FullName}, response: {response}"));
return;
}

Expand Down Expand Up @@ -207,9 +207,9 @@ public static async ETTask<IResponse> Call(this ProcessOuterSender self, ActorId

iRequest.RpcId = rpcId;

var tcs = ETTask<IResponse>.Create(true);

self.requestCallback.Add(rpcId, new MessageSenderStruct(actorId, iRequest, tcs, needException));
Type requestType = iRequest.GetType();
MessageSenderStruct messageSenderStruct = new(actorId, requestType, needException);
self.requestCallback.Add(rpcId, messageSenderStruct);

self.SendInner(actorId, iRequest as MessageObject);

Expand All @@ -223,11 +223,11 @@ async ETTask Timeout()

if (needException)
{
action.SetException(new Exception($"actor sender timeout: {iRequest}"));
action.SetException(new Exception($"actor sender timeout: {requestType.FullName}"));
}
else
{
IResponse response = ET.MessageHelper.CreateResponse(iRequest, ErrorCore.ERR_Timeout);
IResponse response = MessageHelper.CreateResponse(requestType, rpcId, ErrorCore.ERR_Timeout);
action.SetResult(response);
}
}
Expand All @@ -236,14 +236,14 @@ async ETTask Timeout()

long beginTime = TimeInfo.Instance.ServerFrameTime();

IResponse response = await tcs;
IResponse response = await messageSenderStruct.Wait();

long endTime = TimeInfo.Instance.ServerFrameTime();

long costTime = endTime - beginTime;
if (costTime > 200)
{
Log.Warning($"actor rpc time > 200: {costTime} {iRequest}");
Log.Warning($"actor rpc time > 200: {costTime} {requestType.FullName}");
}

return response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ private static async ETTask HandleInner(MailBoxInvoker args)
Fiber fiber = mailBoxComponent.Fiber();
if (fiber.IsDisposed)
{
messageObject.Dispose();
return;
}

Expand All @@ -28,10 +27,9 @@ private static async ETTask HandleInner(MailBoxInvoker args)
{
if (messageObject is IRequest request)
{
IResponse resp = MessageHelper.CreateResponse(request, ErrorCore.ERR_NotFoundActor);
IResponse resp = MessageHelper.CreateResponse(request.GetType(), request.RpcId, ErrorCore.ERR_NotFoundActor);
mailBoxComponent.Root().GetComponent<ProcessInnerSender>().Reply(args.FromAddress, resp);
}
messageObject.Dispose();
return;
}
await MessageDispatcher.Instance.Handle(mailBoxComponent.Parent, args.FromAddress, messageObject);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ public abstract class MessageHandler<E, Message>: IMHandler where E : Entity whe

public async ETTask Handle(Entity entity, Address fromAddress, MessageObject actorMessage)
{
using MessageObject _ = actorMessage;
if (actorMessage is not Message msg)
{
Log.Error($"消息类型转换错误: {actorMessage.GetType().FullName} to {typeof (Message).Name}");
Expand Down Expand Up @@ -52,7 +51,6 @@ public async ETTask Handle(Entity entity, Address fromAddress, MessageObject act
{
try
{
using MessageObject _ = actorMessage;
Fiber fiber = entity.Fiber();
if (actorMessage is not Request request)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,19 @@ public void Handle(Session session, object msg)

private async ETTask HandleAsync(Session session, object message)
{
using Message msg = message as Message;
if (message == null)
{
Log.Error($"消息类型转换错误: {msg.GetType().FullName} to {typeof (Message).Name}");
Log.Error($"消息类型转换错误: {message.GetType().FullName} to {typeof (Message).Name}");
return;
}

if (session.IsDisposed)
{
Log.Error($"session disconnect {msg}");
Log.Error($"session disconnect {message}");
return;
}

await this.Run(session, msg);
await this.Run(session, (Message)message);
}

public Type GetMessageType()
Expand Down Expand Up @@ -54,7 +53,7 @@ private async ETTask HandleAsync(Session session, object message)
{
try
{
using Request request = message as Request;
Request request = message as Request;
if (request == null)
{
throw new Exception($"消息类型转换错误: {message.GetType().FullName} to {typeof (Request).FullName}");
Expand All @@ -63,6 +62,7 @@ private async ETTask HandleAsync(Session session, object message)
int rpcId = request.RpcId;
long instanceId = session.InstanceId;

// 这里用using很安全,因为后面是session发送出去了
using Response response = ObjectPool.Instance.Fetch<Response>();
try
{
Expand Down
Loading

0 comments on commit efcdb60

Please sign in to comment.