Skip to content

Commit

Permalink
Adding implementation of ServerCallContext and pass to services
Browse files Browse the repository at this point in the history
- Add extension method to retrieve HttpContext from ServerCallContext
  • Loading branch information
= committed Feb 6, 2019
1 parent 3f1cf93 commit 5de105c
Show file tree
Hide file tree
Showing 22 changed files with 585 additions and 44 deletions.
5 changes: 4 additions & 1 deletion build/sources.props
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
<PropertyGroup Label="RestoreSources">
<RestoreSources>
$(RestoreSources);
$(MSBuildThisFileDirectory)feed;
https://api.nuget.org/v3/index.json;
</RestoreSources>
<RestoreSources Condition="Exists('$(MSBuildThisFileDirectory)feed')">
$(RestoreSources);
$(MSBuildThisFileDirectory)feed;
</RestoreSources>
</PropertyGroup>
</Project>
5 changes: 4 additions & 1 deletion examples/Server/Chatter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

using System.Collections.Generic;
using System.Threading.Tasks;
using Grpc.Core;
using Chat;
using Grpc.Core;
using Microsoft.Extensions.Logging;

namespace GRPCServer
Expand All @@ -37,6 +37,9 @@ public ChatterService(ILoggerFactory loggerFactory)

public override async Task Chat(IAsyncStreamReader<ChatMessage> requestStream, IServerStreamWriter<ChatMessage> responseStream, ServerCallContext context)
{
var httpContext = context.GetHttpContext();
_logger.LogInformation($"Connection id: {httpContext.Connection.Id}");

if (!await requestStream.MoveNext())
{
// No messages so don't register and just exit.
Expand Down
6 changes: 6 additions & 0 deletions examples/Server/Counter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,19 @@ public CounterService(IncrementingCounter counter, ILoggerFactory loggerFactory)

public override Task<CounterReply> IncrementCount(Empty request, ServerCallContext context)
{
var httpContext = context.GetHttpContext();
_logger.LogInformation($"Connection id: {httpContext.Connection.Id}");

_logger.LogInformation("Incrementing count by 1");
_counter.Increment(1);
return Task.FromResult(new CounterReply { Count = _counter.Count });
}

public override async Task<CounterReply> AccumulateCount(IAsyncStreamReader<CounterRequest> requestStream, ServerCallContext context)
{
var httpContext = context.GetHttpContext();
_logger.LogInformation($"Connection id: {httpContext.Connection.Id}");

while (await requestStream.MoveNext(CancellationToken.None))
{
_logger.LogInformation($"Incrementing count by {requestStream.Current.Count}");
Expand Down
6 changes: 6 additions & 0 deletions examples/Server/Greeter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,18 @@ public GreeterService(ILoggerFactory loggerFactory)
//Server side handler of the SayHello RPC
public override Task<HelloReply> SayHello(HelloRequest request, ServerCallContext context)
{
var httpContext = context.GetHttpContext();
_logger.LogInformation($"Connection id: {httpContext.Connection.Id}");

_logger.LogInformation($"Sending hello to {request.Name}");
return Task.FromResult(new HelloReply { Message = "Hello " + request.Name });
}

public override async Task SayHellos(HelloRequest request, IServerStreamWriter<HelloReply> responseStream, ServerCallContext context)
{
var httpContext = context.GetHttpContext();
_logger.LogInformation($"Connection id: {httpContext.Connection.Id}");

for (int i = 0; i < 3; i++)
{
var message = $"How are you {request.Name}? {i}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,21 @@ public override async Task HandleCallAsync(HttpContext httpContext)
// Activate the implementation type via DI.
var activator = httpContext.RequestServices.GetRequiredService<IGrpcServiceActivator<TService>>();
var service = activator.Create();
var serverCallContext = new HttpContextServerCallContext(httpContext);

var response = await _invoker(
service,
new HttpContextStreamReader<TRequest>(httpContext, Method.RequestMarshaller.Deserializer),
null);
serverCallContext);

// TODO(JunTaoLuo, JamesNK): make sure the response is not null
var responsePayload = Method.ResponseMarshaller.Serializer(response);
var responseBodyPipe = httpContext.Response.BodyPipe;
await responseBodyPipe.WriteMessageAsync(response, Method.ResponseMarshaller.Serializer, serverCallContext.WriteOptions);

await httpContext.Response.BodyPipe.WriteMessageAsync(responsePayload, flush: true);
httpContext.Response.ConsolidateTrailers(serverCallContext);

httpContext.Response.AppendTrailer(GrpcProtocolConstants.StatusTrailer, GrpcProtocolConstants.StatusOk);
// Flush any buffered content
await httpContext.Response.BodyPipe.FlushAsync();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,19 @@ public override async Task HandleCallAsync(HttpContext httpContext)
// Activate the implementation type via DI.
var activator = httpContext.RequestServices.GetRequiredService<IGrpcServiceActivator<TService>>();
var service = activator.Create();
var serverCallContext = new HttpContextServerCallContext(httpContext);
var streamWriter = new HttpContextStreamWriter<TResponse>(serverCallContext, Method.ResponseMarshaller.Serializer);

await _invoker(service,
new HttpContextStreamReader<TRequest>(httpContext, Method.RequestMarshaller.Deserializer),
new HttpContextStreamWriter<TResponse>(httpContext, Method.ResponseMarshaller.Serializer),
null);
await _invoker(
service,
new HttpContextStreamReader<TRequest>(httpContext, Method.RequestMarshaller.Deserializer),
streamWriter,
serverCallContext);

httpContext.Response.AppendTrailer(GrpcProtocolConstants.StatusTrailer, GrpcProtocolConstants.StatusOk);
httpContext.Response.ConsolidateTrailers(serverCallContext);

// Flush any buffered content
await httpContext.Response.BodyPipe.FlushAsync();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ namespace Grpc.AspNetCore.Server.Internal
internal static class GrpcProtocolConstants
{
internal const string StatusTrailer = "grpc-status";
// This represents the numeric value of Grpc.Core.StatusCode.OK
internal const string StatusOk = "0";
internal const string MessageTrailer = "grpc-message";
}
}
134 changes: 134 additions & 0 deletions src/Grpc.AspNetCore.Server/Internal/HttpContextServerCallContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
#region Copyright notice and license

// Copyright 2019 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#endregion

using System;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using Microsoft.AspNetCore.Http;

namespace Grpc.AspNetCore.Server.Internal
{
internal class HttpContextServerCallContext : ServerCallContext
{
private string _peer;
private Metadata _requestHeaders;
private Metadata _responseTrailers;

internal HttpContextServerCallContext(HttpContext httpContext)
{
HttpContext = httpContext;
}

internal HttpContext HttpContext { get; }

internal bool HasResponseTrailers => _responseTrailers != null;

protected override string MethodCore => HttpContext.Request.Path.Value;

protected override string HostCore => HttpContext.Request.Host.Value;

protected override string PeerCore
{
get
{
if (_peer == null)
{
var connection = HttpContext.Connection;
if (connection.RemoteIpAddress != null)
{
_peer = (connection.RemoteIpAddress.AddressFamily == AddressFamily.InterNetwork ? "ipv4:" : "ipv6:") + connection.RemoteIpAddress + ":" + connection.RemotePort;
}
}

return _peer;
}
}

// TODO(JunTaoLuo, JamesNK): implement this
protected override DateTime DeadlineCore => throw new NotImplementedException();

protected override Metadata RequestHeadersCore
{
get
{
if (_requestHeaders == null)
{
_requestHeaders = new Metadata();

foreach (var header in HttpContext.Request.Headers)
{
if (!header.Key.StartsWith(':'))
{
_requestHeaders.Add(header.Key, header.Value);
}
}
}

return _requestHeaders;
}
}

// TODO(JunTaoLuo, JamesNK): implement this
protected override CancellationToken CancellationTokenCore => throw new NotImplementedException();

protected override Metadata ResponseTrailersCore
{
get
{
if (_responseTrailers == null)
{
_responseTrailers = new Metadata();
}

return _responseTrailers;
}
}

protected override Status StatusCore { get; set; }

protected override WriteOptions WriteOptionsCore { get; set; }

// TODO(JunTaoLuo, JamesNK): implement this
protected override AuthContext AuthContextCore => throw new NotImplementedException();

// TODO(JunTaoLuo, JamesNK): implement this
protected override ContextPropagationToken CreatePropagationTokenCore(ContextPropagationOptions options)
{
throw new NotImplementedException();
}

protected override Task WriteResponseHeadersAsyncCore(Metadata responseHeaders)
{
if (responseHeaders != null)
{
foreach (var entry in responseHeaders)
{
if (!entry.IsBinary)
{
// TODO(juntaoluo): what about binary headers?
HttpContext.Response.Headers[entry.Key] = entry.Value;
}
}
}

return HttpContext.Response.Body.FlushAsync();
}
}
}
22 changes: 8 additions & 14 deletions src/Grpc.AspNetCore.Server/Internal/HttpContextStreamWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,26 @@
using System;
using System.Threading.Tasks;
using Grpc.Core;
using Microsoft.AspNetCore.Http;

namespace Grpc.AspNetCore.Server.Internal
{
internal class HttpContextStreamWriter<TResponse> : IServerStreamWriter<TResponse>
{
private readonly HttpContext _httpContext;
private readonly HttpContextServerCallContext _context;
private readonly Func<TResponse, byte[]> _serializer;

public HttpContextStreamWriter(HttpContext context, Func<TResponse, byte[]> serializer)
public HttpContextStreamWriter(HttpContextServerCallContext context, Func<TResponse, byte[]> serializer)
{
_httpContext = context;
_context = context;
_serializer = serializer;
}

public WriteOptions WriteOptions { get; set; }

public Task WriteAsync(TResponse message)
public WriteOptions WriteOptions
{
// TODO(JunTaoLuo, JamesNK): make sure the response is not null
var responsePayload = _serializer(message);

// Flush messages unless WriteOptions.Flags has BufferHint set
var flush = ((WriteOptions?.Flags ?? default) & WriteFlags.BufferHint) != WriteFlags.BufferHint;

return _httpContext.Response.BodyPipe.WriteMessageAsync(responsePayload, flush);
get => _context.WriteOptions;
set => _context.WriteOptions = value;
}

public Task WriteAsync(TResponse message) => _context.HttpContext.Response.BodyPipe.WriteMessageAsync(message, _serializer, WriteOptions);
}
}
44 changes: 44 additions & 0 deletions src/Grpc.AspNetCore.Server/Internal/HttpResponseExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#region Copyright notice and license

// Copyright 2019 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#endregion

using Microsoft.AspNetCore.Http;

namespace Grpc.AspNetCore.Server.Internal
{
internal static class HttpResponseExtensions
{
public static void ConsolidateTrailers(this HttpResponse httpResponse, HttpContextServerCallContext context)
{
if (context.HasResponseTrailers)
{
foreach (var trailer in context.ResponseTrailers)
{
// TODO(juntaoluo): What about binary trailers
if (!trailer.IsBinary)
{
httpResponse.AppendTrailer(trailer.Key, trailer.Value);
}
}
}

// Append status trailers, these overwrite any existing status trailers set via ServerCallContext.ResponseTrailers
httpResponse.AppendTrailer(GrpcProtocolConstants.StatusTrailer, context.Status.StatusCode.ToTrailerString());
httpResponse.AppendTrailer(GrpcProtocolConstants.MessageTrailer, context.Status.Detail);
}
}
}
11 changes: 11 additions & 0 deletions src/Grpc.AspNetCore.Server/Internal/PipeExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
using System.IO;
using System.IO.Pipelines;
using System.Threading.Tasks;
using Grpc.Core;

namespace Grpc.AspNetCore.Server.Internal
{
Expand All @@ -31,6 +32,16 @@ internal static class PipeExtensions
private const int MessageDelimiterSize = 4; // how many bytes it takes to encode "Message-Length"
private const int HeaderSize = MessageDelimiterSize + 1; // message length + compression flag

public static Task WriteMessageAsync<TResponse>(this PipeWriter pipeWriter, TResponse response, Func<TResponse, byte[]> serializer, WriteOptions writeOptions)
{
var responsePayload = serializer(response);

// Flush messages unless WriteOptions.Flags has BufferHint set
var flush = ((writeOptions?.Flags ?? default) & WriteFlags.BufferHint) != WriteFlags.BufferHint;

return pipeWriter.WriteMessageAsync(responsePayload, flush);
}

public static Task WriteMessageAsync(this PipeWriter pipeWriter, byte[] messageData, bool flush = false)
{
WriteHeader(pipeWriter, messageData.Length);
Expand Down
2 changes: 0 additions & 2 deletions src/Grpc.AspNetCore.Server/Internal/ServerCallHandlerBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
#endregion

using System;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using Microsoft.AspNetCore.Http;
Expand Down
Loading

0 comments on commit 5de105c

Please sign in to comment.