Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor StreamingHub #682

Merged
merged 1 commit into from
Sep 28, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 59 additions & 89 deletions src/MagicOnion.Server/Hubs/StreamingHub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -158,113 +158,83 @@ async Task HandleMessageAsync()
// Be careful to allocation and performance.
while (await reader.MoveNext(ct)) // must keep SyncContext.
{
(int methodId, int messageId, int offset) FetchHeader(byte[] msgData)
{
var messagePackReader = new MessagePackReader(msgData);

var length = messagePackReader.ReadArrayHeader();
if (length == 2)
{
// void: [methodId, [argument]]
var mid = messagePackReader.ReadInt32();
var consumed = (int)messagePackReader.Consumed;

return (mid, -1, consumed);
}
else if (length == 3)
{
// T: [messageId, methodId, [argument]]
var msgId = messagePackReader.ReadInt32();
var metId = messagePackReader.ReadInt32();
var consumed = (int)messagePackReader.Consumed;
return (metId, msgId, consumed);
}
else
{
throw new InvalidOperationException("Invalid data format.");
}
}

var data = reader.Current;
var (methodId, messageId, offset) = FetchHeader(data);
var hasResponse = messageId != -1;

if (messageId == -1)
if (handlers.TryGetValue(methodId, out var handler))
{
if (handlers.TryGetValue(methodId, out var handler))
// Create a context for each call to the hub method.
var context = new StreamingHubContext()
{
var context = new StreamingHubContext() // create per invoke.
{
HubInstance = this,
ServiceContext = (IStreamingServiceContext<byte[], byte[]>)Context,
Request = data.AsMemory(offset, data.Length - offset),
Path = handler.ToString(),
MethodId = handler.MethodId,
MessageId = -1,
Timestamp = DateTime.UtcNow
};

var isErrorOrInterrupted = false;
Context.MethodHandler.Logger.BeginInvokeHubMethod(context, context.Request, handler.RequestType);
try
{
await handler.MethodBody.Invoke(context);
}
catch (Exception ex)
{
isErrorOrInterrupted = true;
Context.MethodHandler.Logger.Error(ex, context);
}
finally
{
Context.MethodHandler.Logger.EndInvokeHubMethod(context, context.responseSize, context.responseType, (DateTime.UtcNow - context.Timestamp).TotalMilliseconds, isErrorOrInterrupted);
}
}
else
HubInstance = this,
ServiceContext = (IStreamingServiceContext<byte[], byte[]>)Context,
Request = data.AsMemory(offset, data.Length - offset),
Path = handler.ToString(),
MethodId = handler.MethodId,
MessageId = messageId,
Timestamp = DateTime.UtcNow
};

var isErrorOrInterrupted = false;
Context.MethodHandler.Logger.BeginInvokeHubMethod(context, context.Request, handler.RequestType);
try
{
throw new InvalidOperationException("Handler not found in received methodId, methodId:" + methodId);
await handler.MethodBody.Invoke(context);
}
}
else
{
if (handlers.TryGetValue(methodId, out var handler))
catch (ReturnStatusException ex)
{
var context = new StreamingHubContext() // create per invoke.
{
HubInstance = this,
ServiceContext = (IStreamingServiceContext<byte[], byte[]>)Context,
Request = data.AsMemory(offset, data.Length - offset),
Path = handler.ToString(),
MethodId = handler.MethodId,
MessageId = messageId,
Timestamp = DateTime.UtcNow
};

var isErrorOrInterrupted = false;
Context.MethodHandler.Logger.BeginInvokeHubMethod(context, context.Request, handler.RequestType);
try
{
await handler.MethodBody.Invoke(context);
}
catch (ReturnStatusException ex)
if (hasResponse)
{
await context.WriteErrorMessage((int)ex.StatusCode, ex.Detail, null, false);
}
catch (Exception ex)
}
catch (Exception ex)
{
isErrorOrInterrupted = true;
Context.MethodHandler.Logger.Error(ex, context);

if (hasResponse)
{
isErrorOrInterrupted = true;
Context.MethodHandler.Logger.Error(ex, context);
await context.WriteErrorMessage((int)StatusCode.Internal, $"An error occurred while processing handler '{handler.ToString()}'.", ex, Context.MethodHandler.IsReturnExceptionStackTraceInErrorDetail);
}
finally
{
Context.MethodHandler.Logger.EndInvokeHubMethod(context, context.responseSize, context.responseType, (DateTime.UtcNow - context.Timestamp).TotalMilliseconds, isErrorOrInterrupted);
}
}
else
finally
{
throw new InvalidOperationException("Handler not found in received methodId, methodId:" + methodId);
Context.MethodHandler.Logger.EndInvokeHubMethod(context, context.responseSize, context.responseType, (DateTime.UtcNow - context.Timestamp).TotalMilliseconds, isErrorOrInterrupted);
}
}
else
{
throw new InvalidOperationException("Handler not found in received methodId, methodId:" + methodId);
}
}
}

static (int methodId, int messageId, int offset) FetchHeader(byte[] msgData)
{
var messagePackReader = new MessagePackReader(msgData);

var length = messagePackReader.ReadArrayHeader();
if (length == 2)
{
// void: [methodId, [argument]]
var mid = messagePackReader.ReadInt32();
var consumed = (int)messagePackReader.Consumed;

return (mid, -1, consumed);
}
else if (length == 3)
{
// T: [messageId, methodId, [argument]]
var msgId = messagePackReader.ReadInt32();
var metId = messagePackReader.ReadInt32();
var consumed = (int)messagePackReader.Consumed;
return (metId, msgId, consumed);
}
else
{
throw new InvalidOperationException("Invalid data format.");
}
}

Expand Down
Loading