Skip to content

Commit

Permalink
Add middleware for unwrapping cloud events
Browse files Browse the repository at this point in the history
Fixes: #74

note: This relies on the fix for dapr/dapr#574 which has been merged.

This change introduces a middleware that can upwrap a *structured* cloud
event. This is the format used by dapr by default now for pub/sub
messaging. Adding the middleware makes it transparent to the developer
whether the data can from a cloud event or was a basic RPC call.

We're adding the middleware for this first since it's the most general
approach. It has a drawback compared with other approaches, performance.

Users could alternatively use the SDK from CloudEvents to read their
data without the middleware.

We might also want to add an MVC formatter in the future, which could do
the unwrapping and deserialization to a user-type in a single operation.
  • Loading branch information
rynowak committed Oct 13, 2019
1 parent 4fb29e4 commit a6ef7e4
Show file tree
Hide file tree
Showing 8 changed files with 469 additions and 2 deletions.
155 changes: 155 additions & 0 deletions src/Dapr.AspNetCore/CloudEventsMiddleware.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------

namespace Dapr
{
using System;
using System.IO;
using System.Net.Http.Headers;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.WebUtilities;

internal class CloudEventsMiddleware
{
private const string ContentType = "application/cloudevents+json";
private readonly RequestDelegate next;

public CloudEventsMiddleware(RequestDelegate next)
{
this.next = next;
}

public Task InvokeAsync(HttpContext httpContext)
{
// This middleware unwraps any requests with a cloud events (JSON) content type
// and replaces the request body + request content type so that it can be read by a
// non-cloud-events-aware piece of code.
//
// This corresponds to cloud events in the *structured* format:
// https://github.com/cloudevents/spec/blob/master/http-transport-binding.md#13-content-modes
//
// For *binary* format, we don't have to do anything
//
// We don't support batching.
//
// The philosophy here is that we don't report an error for things we don't support, because
// that would block someone from implementing their own support for it. We only report an error
// when something we do support isn't correct.
if (!this.MatchesContentType(httpContext, out var charSet))
{
return this.next(httpContext);
}

return this.ProcessBodyAsync(httpContext, charSet);
}

private async Task ProcessBodyAsync(HttpContext httpContext, string charSet)
{
JsonElement json;
if (string.Equals(charSet, Encoding.UTF8.WebName, StringComparison.OrdinalIgnoreCase))
{
json = await JsonSerializer.DeserializeAsync<JsonElement>(httpContext.Request.Body);
}
else
{
using (var reader = new HttpRequestStreamReader(httpContext.Request.Body, Encoding.GetEncoding(charSet)))
{
var text = await reader.ReadToEndAsync();
json = JsonSerializer.Deserialize<JsonElement>(text);
}
}

Stream originalBody;
Stream body;

string originalContentType;
string contentType;

// Data is optional.
if (json.TryGetProperty("data", out var data))
{
body = new MemoryStream();
await JsonSerializer.SerializeAsync<JsonElement>(body, data);
body.Seek(0L, SeekOrigin.Begin);

if (json.TryGetProperty("datacontenttype", out var dataContentType) &&
dataContentType.ValueKind == JsonValueKind.String)
{
contentType = dataContentType.GetString();

// Since S.T.Json always outputs utf-8, we may need to normalize the data content type
// to remove any charset information. We generally just assume utf-8 everywhere, so omitting
// a charset is a safe bet.
if (contentType.Contains("charset") && MediaTypeHeaderValue.TryParse(contentType, out var parsed))
{
parsed.CharSet = null;
contentType = parsed.ToString();
}
}
else
{
// assume JSON is not specified.
contentType = "application/json";
}
}
else
{
body = new MemoryStream();
contentType = null;
}

originalBody = httpContext.Request.Body;
originalContentType = httpContext.Request.ContentType;

try
{
httpContext.Request.Body = body;
httpContext.Request.ContentType = contentType;

await this.next(httpContext);
}
finally
{
httpContext.Request.ContentType = originalContentType;
httpContext.Request.Body = originalBody;
}
}

private bool MatchesContentType(HttpContext httpContext, out string charSet)
{
if (httpContext.Request.ContentType == null)
{
charSet = null;
return false;
}

// Handle cases where the content type includes additional parameters like charset.
// Doing the string comparison up front so we can avoid allocation.
if (!httpContext.Request.ContentType.StartsWith(ContentType))
{
charSet = null;
return false;
}

if (!MediaTypeHeaderValue.TryParse(httpContext.Request.ContentType, out var parsed))
{
charSet = null;
return false;
}

if (parsed.MediaType != ContentType)
{
charSet = null;
return false;
}

charSet = parsed.CharSet ?? "UTF-8";
return true;
}
}
}
33 changes: 33 additions & 0 deletions src/Dapr.AspNetCore/DaprApplicationBuilderExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------

namespace Microsoft.AspNetCore.Builder
{
using System;
using Dapr;

/// <summary>
/// Provides extension methods for <see cref="IApplicationBuilder" />.
/// </summary>
public static class DaprApplicationBuilderExtensions
{
/// <summary>
/// Adds the cloud events middleware to the middleware pipeline. The cloud events middleware will unwrap
/// requests that use the cloud events structured format, allowing the event payload to be read directly.
/// </summary>
/// <param name="builder">An <see cref="IApplicationBuilder" />.</param>
/// <returns>The <see cref="IApplicationBuilder" />.</returns>
public static IApplicationBuilder UseCloudEvents(this IApplicationBuilder builder)
{
if (builder is null)
{
throw new ArgumentNullException(nameof(builder));
}

builder.UseMiddleware<CloudEventsMiddleware>();
return builder;
}
}
}
9 changes: 8 additions & 1 deletion test/Dapr.AspNetCore.IntegrationTest.App/DaprController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,18 @@ namespace Dapr.AspNetCore.IntegrationTest.App
public class DaprController : ControllerBase
{
[Topic("B")]
[HttpPost("/topic-b")]
[HttpPost("/B")]
public void TopicB()
{
}

[Topic("register-user")]
[HttpPost("/register-user")]
public ActionResult<UserInfo> RegisterUser(UserInfo user)
{
return user; // echo back the user for testing
}

[HttpPost("/controllerwithoutstateentry/{widget}")]
public async Task AddOneWithoutStateEntry([FromServices]StateClient state, [FromState] Widget widget)
{
Expand Down
2 changes: 2 additions & 0 deletions test/Dapr.AspNetCore.IntegrationTest.App/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env)

app.UseAuthorization();

app.UseCloudEvents();

app.UseEndpoints(endpoints =>
{
endpoints.MapSubscribeHandler();
Expand Down
15 changes: 15 additions & 0 deletions test/Dapr.AspNetCore.IntegrationTest.App/UserInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------

namespace Dapr.AspNetCore.IntegrationTest.App
{
using System.ComponentModel.DataAnnotations;

public class UserInfo
{
[Required]
public string Name { get; set; }
}
}
127 changes: 127 additions & 0 deletions test/Dapr.AspNetCore.IntegrationTest/CloudEventsIntegrationTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------

namespace Dapr.AspNetCore.IntegrationTest
{
using System.Net.Http;
using System.Net.Http.Headers;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
using Dapr.AspNetCore.IntegrationTest.App;
using FluentAssertions;
using Microsoft.VisualStudio.TestTools.UnitTesting;

[TestClass]
public class CloudEventsIntegrationTest
{
private readonly JsonSerializerOptions options = new JsonSerializerOptions()
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
PropertyNameCaseInsensitive = true,
};

[TestMethod]
public async Task CanSendEmptyStructuredCloudEvent()
{
using (var factory = new AppWebApplicationFactory())
{
var httpClient = factory.CreateClient();

var request = new HttpRequestMessage(HttpMethod.Post, "http://localhost/B");
request.Content = new StringContent("{}", Encoding.UTF8);
request.Content.Headers.ContentType = new MediaTypeHeaderValue("application/cloudevents+json");

var response = await httpClient.SendAsync(request);
response.EnsureSuccessStatusCode();
}
}

[TestMethod]
public async Task CanSendStructuredCloudEvent()
{
using (var factory = new AppWebApplicationFactory())
{
var httpClient = factory.CreateClient();

var request = new HttpRequestMessage(HttpMethod.Post, "http://localhost/register-user");
request.Content = new StringContent(
JsonSerializer.Serialize(
new
{
data = new
{
name = "jimmy",
},
}),
Encoding.UTF8);
request.Content.Headers.ContentType = new MediaTypeHeaderValue("application/cloudevents+json");

var response = await httpClient.SendAsync(request);
response.EnsureSuccessStatusCode();

var userInfo = await JsonSerializer.DeserializeAsync<UserInfo>(await response.Content.ReadAsStreamAsync(), this.options);
userInfo.Name.Should().Be("jimmy");
}
}

[TestMethod]
public async Task CanSendStructuredCloudEvent_WithContentType()
{
using (var factory = new AppWebApplicationFactory())
{
var httpClient = factory.CreateClient();

var request = new HttpRequestMessage(HttpMethod.Post, "http://localhost/register-user");
request.Content = new StringContent(
JsonSerializer.Serialize(
new
{
data = new
{
name = "jimmy",
},
datacontenttype = "text/json",
}),
Encoding.UTF8);
request.Content.Headers.ContentType = new MediaTypeHeaderValue("application/cloudevents+json");

var response = await httpClient.SendAsync(request);
response.EnsureSuccessStatusCode();

var userInfo = await JsonSerializer.DeserializeAsync<UserInfo>(await response.Content.ReadAsStreamAsync(), this.options);
userInfo.Name.Should().Be("jimmy");
}
}

// Yeah, I know, binary isn't a great term for this, it's what the cloudevents spec uses.
// Basically this is here to test that an endpoint can handle requests with and without
// an envelope.
[TestMethod]
public async Task CanSendBinaryCloudEvent_WithContentType()
{
using (var factory = new AppWebApplicationFactory())
{
var httpClient = factory.CreateClient();

var request = new HttpRequestMessage(HttpMethod.Post, "http://localhost/register-user");
request.Content = new StringContent(
JsonSerializer.Serialize(
new
{
name = "jimmy",
}),
Encoding.UTF8);
request.Content.Headers.ContentType = new MediaTypeHeaderValue("application/json");

var response = await httpClient.SendAsync(request);
response.EnsureSuccessStatusCode();

var userInfo = await JsonSerializer.DeserializeAsync<UserInfo>(await response.Content.ReadAsStreamAsync(), this.options);
userInfo.Name.Should().Be("jimmy");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public async Task SubscribeEndpoint_ReportsTopics()
var json = await JsonSerializer.DeserializeAsync<JsonElement>(stream);

json.ValueKind.Should().Be(JsonValueKind.Array);
json.GetArrayLength().Should().Be(2);
json.GetArrayLength().Should().Be(3);
var topics = new List<string>();
foreach (var element in json.EnumerateArray())
{
Expand All @@ -40,6 +40,7 @@ public async Task SubscribeEndpoint_ReportsTopics()

topics.Should().Contain("A");
topics.Should().Contain("B");
topics.Should().Contain("register-user");
}
}
}
Expand Down
Loading

0 comments on commit a6ef7e4

Please sign in to comment.