Skip to content

Commit

Permalink
Added In-Memory Subscription Support (#2083)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelstaib committed Jun 23, 2020
1 parent a298447 commit 0e5e995
Show file tree
Hide file tree
Showing 93 changed files with 1,538 additions and 2,721 deletions.
30 changes: 0 additions & 30 deletions src/HotChocolate/Core/HotChocolate.Core.sln
Expand Up @@ -45,10 +45,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "HotChocolate.Core.Diagnosti
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "HotChocolate.StarWars", "test\StarWars\HotChocolate.StarWars.csproj", "{AADFF9B1-B275-48B4-8F32-F6CD837619E5}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "HotChocolate.Subscriptions.Redis.Tests", "test\Subscriptions.Redis.Tests\HotChocolate.Subscriptions.Redis.Tests.csproj", "{6E4762F6-69A0-4559-ADAA-815970925D14}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "HotChocolate.Subscriptions.Tests", "test\Subscriptions.Tests\HotChocolate.Subscriptions.Tests.csproj", "{85074B57-CAF2-42C6-A334-ED9C273BA67A}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "HotChocolate.Types.Tests", "test\Types.Tests\HotChocolate.Types.Tests.csproj", "{CD2B383F-2957-4A83-B965-070DC8CF36F6}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "HotChocolate.Types.Tests.Documentation", "test\Types.Tests.Documentation\HotChocolate.Types.Tests.Documentation.csproj", "{3DFDEFBE-D2BD-4F25-8A26-66AFB62B9C4C}"
Expand Down Expand Up @@ -295,30 +291,6 @@ Global
{AADFF9B1-B275-48B4-8F32-F6CD837619E5}.Release|x64.Build.0 = Release|Any CPU
{AADFF9B1-B275-48B4-8F32-F6CD837619E5}.Release|x86.ActiveCfg = Release|Any CPU
{AADFF9B1-B275-48B4-8F32-F6CD837619E5}.Release|x86.Build.0 = Release|Any CPU
{6E4762F6-69A0-4559-ADAA-815970925D14}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{6E4762F6-69A0-4559-ADAA-815970925D14}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6E4762F6-69A0-4559-ADAA-815970925D14}.Debug|x64.ActiveCfg = Debug|Any CPU
{6E4762F6-69A0-4559-ADAA-815970925D14}.Debug|x64.Build.0 = Debug|Any CPU
{6E4762F6-69A0-4559-ADAA-815970925D14}.Debug|x86.ActiveCfg = Debug|Any CPU
{6E4762F6-69A0-4559-ADAA-815970925D14}.Debug|x86.Build.0 = Debug|Any CPU
{6E4762F6-69A0-4559-ADAA-815970925D14}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6E4762F6-69A0-4559-ADAA-815970925D14}.Release|Any CPU.Build.0 = Release|Any CPU
{6E4762F6-69A0-4559-ADAA-815970925D14}.Release|x64.ActiveCfg = Release|Any CPU
{6E4762F6-69A0-4559-ADAA-815970925D14}.Release|x64.Build.0 = Release|Any CPU
{6E4762F6-69A0-4559-ADAA-815970925D14}.Release|x86.ActiveCfg = Release|Any CPU
{6E4762F6-69A0-4559-ADAA-815970925D14}.Release|x86.Build.0 = Release|Any CPU
{85074B57-CAF2-42C6-A334-ED9C273BA67A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{85074B57-CAF2-42C6-A334-ED9C273BA67A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{85074B57-CAF2-42C6-A334-ED9C273BA67A}.Debug|x64.ActiveCfg = Debug|Any CPU
{85074B57-CAF2-42C6-A334-ED9C273BA67A}.Debug|x64.Build.0 = Debug|Any CPU
{85074B57-CAF2-42C6-A334-ED9C273BA67A}.Debug|x86.ActiveCfg = Debug|Any CPU
{85074B57-CAF2-42C6-A334-ED9C273BA67A}.Debug|x86.Build.0 = Debug|Any CPU
{85074B57-CAF2-42C6-A334-ED9C273BA67A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{85074B57-CAF2-42C6-A334-ED9C273BA67A}.Release|Any CPU.Build.0 = Release|Any CPU
{85074B57-CAF2-42C6-A334-ED9C273BA67A}.Release|x64.ActiveCfg = Release|Any CPU
{85074B57-CAF2-42C6-A334-ED9C273BA67A}.Release|x64.Build.0 = Release|Any CPU
{85074B57-CAF2-42C6-A334-ED9C273BA67A}.Release|x86.ActiveCfg = Release|Any CPU
{85074B57-CAF2-42C6-A334-ED9C273BA67A}.Release|x86.Build.0 = Release|Any CPU
{CD2B383F-2957-4A83-B965-070DC8CF36F6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{CD2B383F-2957-4A83-B965-070DC8CF36F6}.Debug|Any CPU.Build.0 = Debug|Any CPU
{CD2B383F-2957-4A83-B965-070DC8CF36F6}.Debug|x64.ActiveCfg = Debug|Any CPU
Expand Down Expand Up @@ -450,8 +422,6 @@ Global
{6D645B24-42A2-4F65-B90A-B69115D83212} = {7462D089-D350-44D6-8131-896D949A65B7}
{531EE6AB-27B0-4EC7-8D18-F8C0A6982C8E} = {7462D089-D350-44D6-8131-896D949A65B7}
{AADFF9B1-B275-48B4-8F32-F6CD837619E5} = {7462D089-D350-44D6-8131-896D949A65B7}
{6E4762F6-69A0-4559-ADAA-815970925D14} = {7462D089-D350-44D6-8131-896D949A65B7}
{85074B57-CAF2-42C6-A334-ED9C273BA67A} = {7462D089-D350-44D6-8131-896D949A65B7}
{CD2B383F-2957-4A83-B965-070DC8CF36F6} = {7462D089-D350-44D6-8131-896D949A65B7}
{3DFDEFBE-D2BD-4F25-8A26-66AFB62B9C4C} = {7462D089-D350-44D6-8131-896D949A65B7}
{C08D0A27-6F28-4845-91E7-5C1A0C926939} = {7462D089-D350-44D6-8131-896D949A65B7}
Expand Down
20 changes: 6 additions & 14 deletions src/HotChocolate/Core/benchmark/StarWars/Mutation.cs
@@ -1,6 +1,4 @@
using System;
using System.Threading.Tasks;
using HotChocolate;
using System.Threading.Tasks;
using HotChocolate.Subscriptions;
using HotChocolate.StarWars.Data;
using HotChocolate.StarWars.Models;
Expand All @@ -9,28 +7,22 @@ namespace HotChocolate.StarWars
{
public class Mutation
{
private readonly ReviewRepository _repository;

public Mutation(ReviewRepository repository)
{
_repository = repository
?? throw new ArgumentNullException(nameof(repository));
}

/// <summary>
/// Creates a review for a given Star Wars episode.
/// </summary>
/// <param name="episode">The episode to review.</param>
/// <param name="review">The review.</param>
/// <param name="repository">The review repository.</param>
/// <param name="eventSender">The event sending service.</param>
/// <returns>The created review.</returns>
public async Task<Review> CreateReview(
Episode episode,
Review review,
[Service]IEventSender eventSender)
[Service]ReviewRepository repository,
[Service]ITopicEventSender eventSender)
{
_repository.AddReview(episode, review);
await eventSender.SendAsync(new OnReviewMessage(episode, review));
repository.AddReview(episode, review);
await eventSender.SendAsync(episode, review);
return review;
}
}
Expand Down
23 changes: 0 additions & 23 deletions src/HotChocolate/Core/benchmark/StarWars/OnReviewMessage.cs

This file was deleted.

16 changes: 6 additions & 10 deletions src/HotChocolate/Core/benchmark/StarWars/Subscription.cs
@@ -1,4 +1,5 @@
using System;
using System.Diagnostics.Tracing;
using HotChocolate.Subscriptions;
using HotChocolate.StarWars.Data;
using HotChocolate.StarWars.Models;
Expand All @@ -7,17 +8,12 @@ namespace HotChocolate.StarWars
{
public class Subscription
{
private readonly ReviewRepository _repository;

public Subscription(ReviewRepository repository)
{
_repository = repository
?? throw new ArgumentNullException(nameof(repository));
}

public Review OnReview(Episode episode, IEventMessage message)
public Review OnReview(
Episode episode,
[EventMessage]Review review,
[Service]ReviewRepository repository)
{
return (Review)message.Payload;
return review;
}
}
}
Expand Up @@ -7,7 +7,8 @@ public class MutationType
{
protected override void Configure(IObjectTypeDescriptor<Mutation> descriptor)
{
descriptor.Field(t => t.CreateReview(default, default, default))
descriptor
.Field(t => t.CreateReview(default, default, default, default))
.Type<NonNullType<ReviewType>>()
.Argument("episode", a => a.Type<NonNullType<EpisodeType>>())
.Argument("review", a => a.Type<NonNullType<ReviewInputType>>());
Expand Down
@@ -1,4 +1,5 @@
using HotChocolate.Types;
using HotChocolate.StarWars.Models;
using HotChocolate.Types;

namespace HotChocolate.StarWars.Types
{
Expand All @@ -7,7 +8,9 @@ public class SubscriptionType
{
protected override void Configure(IObjectTypeDescriptor<Subscription> descriptor)
{
descriptor.Field(t => t.OnReview(default, default))
descriptor
.Field(t => t.OnReview(default, default, default))
.SubscribeToTopic<Episode, Review>("episode")
.Type<NonNullType<ReviewType>>()
.Argument("episode", arg => arg.Type<NonNullType<EpisodeType>>());
}
Expand Down
16 changes: 16 additions & 0 deletions src/HotChocolate/Core/src/Abstractions/Execution/IQueryResult.cs
@@ -0,0 +1,16 @@
using System;
using System.Collections.Generic;

#nullable enable

namespace HotChocolate.Execution
{
public interface IQueryResult
: IExecutionResult
, IDisposable
{
IReadOnlyDictionary<string, object?>? Data { get; }

IReadOnlyDictionary<string, object?> ToDictionary();
}
}
@@ -1,16 +1,10 @@
using System;
using System.Collections.Generic;

#nullable enable
#nullable enable

namespace HotChocolate.Execution
{
// todo : deprecate this interface and use IQueryResult
public interface IReadOnlyQueryResult
: IExecutionResult
, IDisposable
: IQueryResult
{
IReadOnlyDictionary<string, object?>? Data { get; }

IReadOnlyDictionary<string, object?> ToDictionary();
}
}
@@ -1,8 +1,20 @@
using System.Collections.Generic;
using System;
using System.Collections.Generic;

#nullable enable

namespace HotChocolate.Execution
{
public interface IResponseStream : IAsyncEnumerable<IReadOnlyQueryResult>
/// <summary>
/// The response stream represents a stream of <see cref="IQueryResult" /> that are produced
/// by the execution engine.
/// </summary>
public interface IResponseStream
: IAsyncDisposable
{
/// <summary>
/// Reads the subscription results from the execution engine.
/// </summary>
IAsyncEnumerable<IQueryResult> ReadResultsAsync();
}
}
20 changes: 20 additions & 0 deletions src/HotChocolate/Core/src/Abstractions/Execution/ISourceStream.cs
@@ -0,0 +1,20 @@
using System;
using System.Collections.Generic;
using System.Threading;

#nullable enable

namespace HotChocolate.Execution
{
/// <summary>
/// The source stream represents a stream of events from a pub/sub system.
/// </summary>
public interface ISourceStream
: IAsyncDisposable
{
/// <summary>
/// Reads the subscription result from the pub/sub system.
/// </summary>
IAsyncEnumerable<object> ReadEventsAsync();
}
}
@@ -0,0 +1,19 @@
using System.Collections.Generic;
using System.Threading;

#nullable enable

namespace HotChocolate.Execution
{
/// <summary>
/// The source stream represents a stream of events from a pub/sub system.
/// </summary>
public interface ISourceStream<TMessage>
: ISourceStream
{
/// <summary>
/// Reads the subscription result from the pub/sub system.
/// </summary>
new IAsyncEnumerable<TMessage> ReadEventsAsync();
}
}
Expand Up @@ -2,10 +2,9 @@

namespace HotChocolate.Execution
{
public interface ISubscriptionExecutionResult
public interface ISubscriptionResult
: IExecutionResult
, IResponseStream
{
new IDictionary<string, object> ContextData { get; }
}
}
Expand Up @@ -5,7 +5,9 @@

namespace HotChocolate.Execution
{
public sealed class QueryResult : IReadOnlyQueryResult
public sealed class QueryResult
: IQueryResult
, IReadOnlyQueryResult
{
private readonly IDisposable? _disposable;

Expand Down
@@ -0,0 +1,83 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

#nullable enable

namespace HotChocolate.Execution
{
public sealed class SubscriptionResult
: ISubscriptionResult
{
private readonly Func<IAsyncEnumerable<IQueryResult>>? _resultStreamFactory;
private readonly IReadOnlyList<IError>? _errors;
private readonly IReadOnlyDictionary<string, object?>? _extensions;
private readonly IReadOnlyDictionary<string, object?>? _contextData;
private readonly IAsyncDisposable? _subscription;
private bool _isRead = false;
private bool _disposed = false;

public SubscriptionResult(
Func<IAsyncEnumerable<IQueryResult>>? resultStreamFactory,
IReadOnlyList<IError>? errors,
IReadOnlyDictionary<string, object?>? extensions = null,
IReadOnlyDictionary<string, object?>? contextData = null,
IAsyncDisposable? subscription = null)
{
if (resultStreamFactory is null && errors is null)
{
throw new ArgumentException("Either provide a result stream factory or errors.");
}

_resultStreamFactory = resultStreamFactory;
_errors = errors;
_extensions = extensions;
_contextData = contextData;
_subscription = subscription;
}

public IReadOnlyList<IError>? Errors => _errors;

public IReadOnlyDictionary<string, object?>? Extensions => _extensions;

public IReadOnlyDictionary<string, object?>? ContextData => _contextData;

public IAsyncEnumerable<IQueryResult> ReadResultsAsync()
{
if (_resultStreamFactory is null)
{
// todo : throw helper
throw new InvalidOperationException(
"This result has errors and cannot read from the response stream.");
}

if (_isRead)
{
// todo : throw helper
throw new InvalidOperationException(
"You can only read a response stream once.");
}

if (_disposed)
{
throw new ObjectDisposedException(nameof(SubscriptionResult));
}

_isRead = true;
return _resultStreamFactory();
}

public async ValueTask DisposeAsync()
{
if (!_disposed)
{
if (_subscription is { })
{
await _subscription.DisposeAsync().ConfigureAwait(false);
}
_disposed = true;
}
}

}
}
2 changes: 1 addition & 1 deletion src/HotChocolate/Core/src/Abstractions/QueryException.cs
Expand Up @@ -40,6 +40,6 @@ public GraphQLException(IEnumerable<IError> errors)
{
}

public IReadOnlyCollection<IError> Errors { get; }
public IReadOnlyList<IError> Errors { get; }
}
}

0 comments on commit 0e5e995

Please sign in to comment.