Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support subscriptions of value types #3876

Merged
merged 2 commits into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,20 @@ public async Task Basic()
Observer.ShouldHaveNoMoreResults();
}

[Fact]
public async Task BasicInt()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This tests the auto-registering types and worked without changes.

{
var result = await ExecuteAsync("subscription { testInt }");
result.ShouldBeSuccessful();
SubscriptionObj.ShouldNotBeNull();
result.Perf.ShouldBeNull();
Source.Next("1");
Source.Next("2");
Observer.ShouldHaveResult().ShouldBeSimilarTo("""{ "data": { "testInt": 1 } }""");
Observer.ShouldHaveResult().ShouldBeSimilarTo("""{ "data": { "testInt": 2 } }""");
Observer.ShouldHaveNoMoreResults();
}

[Fact]
public async Task NoMetricsForDataEvents()
{
Expand Down Expand Up @@ -546,6 +560,9 @@ private class Subscription
{
public static IObservable<string> Test([FromServices] IObservable<string> source) => source;

public static IObservable<int> TestInt([FromServices] IObservable<string> source)
=> new ObservableSelect<string, int>(source, value => int.Parse(value));

public static IObservable<string> TestWithInitialExtensions(IResolveFieldContext context, [FromServices] IObservable<string> source)
{
context.SetOutputExtension("alpha", "beta");
Expand Down
14 changes: 11 additions & 3 deletions src/GraphQL.Tests/Subscription/SubscriptionSchema.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ public ChatSubscriptions(IChat chat)
Type = typeof(StringGraphType),
StreamResolver = new SourceStreamResolver<string>(context => Subscribe(context).Select(message => message.Content))
});

int counter = 0;
AddField(new FieldType
{
Name = "messageCounter",
Type = typeof(IntGraphType),
StreamResolver = new SourceStreamResolver<int>(context => Subscribe(context).Select(_ => ++counter))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Threw exception here previously

});
}

private IObservable<Message> SubscribeById(IResolveFieldContext context)
Expand Down Expand Up @@ -145,7 +153,7 @@ public MessageInputType()
{
Field<StringGraphType>("fromId");
Field<StringGraphType>("content");
Field<DateGraphType>("sentAt");
Field<DateTimeOffsetGraphType>("sentAt");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the serialized responses would be repeatable as they would carry timezone info which can be set to UTC.

}
}

Expand All @@ -164,7 +172,7 @@ public class Message

public string Content { get; set; }

public DateTime SentAt { get; set; }
public DateTimeOffset SentAt { get; set; }
}

public class MessageFrom
Expand All @@ -180,7 +188,7 @@ public class ReceivedMessage

public string Content { get; set; }

public DateTime SentAt { get; set; }
public DateTimeOffset SentAt { get; set; }
}

public interface IChat
Expand Down
85 changes: 60 additions & 25 deletions src/GraphQL.Tests/Subscription/SubscriptionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ namespace GraphQL.Tests.Subscription;

public class SubscriptionTests
{
private readonly DateTimeOffset DateConst = new DateTimeOffset(2024, 1, 1, 0, 0, 0, TimeSpan.Zero);

protected async Task<ExecutionResult> ExecuteSubscribeAsync(ExecutionOptions options)
{
var executer = new DocumentExecuter();
Expand All @@ -28,7 +30,7 @@ public async Task SubscribeGetAll()
DisplayName = "test",
Id = "1"
},
SentAt = DateTime.Now.Date
SentAt = DateConst
};

var chat = new Chat();
Expand Down Expand Up @@ -67,7 +69,7 @@ public async Task SubscribeToContent(bool useMiddleware)
DisplayName = "test",
Id = "1"
},
SentAt = DateTime.Now.Date
SentAt = DateConst
};

var chat = new Chat();
Expand Down Expand Up @@ -112,7 +114,7 @@ public async Task Subscribe()
DisplayName = "test",
Id = "1"
},
SentAt = DateTime.Now.Date
SentAt = DateConst
};
var chat = new Chat();
var schema = new ChatSchema(chat);
Expand All @@ -127,13 +129,47 @@ public async Task Subscribe()
chat.AddMessage(addedMessage);

/* Then */
var stream = result.Streams!.Values.First();
var stream = result.Streams.ShouldNotBeNull().Values.First();
var message = await stream.FirstOrDefaultAsync();

message.ShouldNotBeNull();
message.ShouldBeOfType<ExecutionResult>();
message.Data.ShouldNotBeNull();
message.Data.ShouldNotBeAssignableTo<Task>();
message.ShouldBeSimilarTo("""
{"data":{"messageAdded":{"from":{"id":"1","displayName":"test"},"content":"test","sentAt":"2024-01-01T00:00:00\u002B00:00"}}}
""");
}

[Fact]
public async Task SubscribeInt()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests in this case direct usage of SourceStreamResolver, which is what all the field builders use.

{
/* Given */
var addedMessage = new Message
{
Content = "test",
From = new MessageFrom
{
DisplayName = "test",
Id = "1"
},
SentAt = DateConst
};
var chat = new Chat();
var schema = new ChatSchema(chat);

/* When */
var result = await ExecuteSubscribeAsync(new ExecutionOptions
{
Query = "subscription { messageCounter }",
Schema = schema
});

chat.AddMessage(addedMessage);

/* Then */
var stream = result.Streams.ShouldNotBeNull().Values.First();
var message = await stream.FirstOrDefaultAsync();

message.ShouldBeSimilarTo("""
{"data":{"messageCounter":1}}
""");
}

[Fact]
Expand All @@ -148,7 +184,7 @@ public async Task SubscribeAsync()
DisplayName = "test",
Id = "1"
},
SentAt = DateTime.Now.Date
SentAt = DateConst
};
var chat = new Chat();
var schema = new ChatSchema(chat);
Expand All @@ -163,13 +199,12 @@ public async Task SubscribeAsync()
chat.AddMessage(addedMessage);

/* Then */
var stream = result.Streams!.Values.First();
var stream = result.Streams.ShouldNotBeNull().Values.First();
var message = await stream.FirstOrDefaultAsync();

message.ShouldNotBeNull();
message.ShouldBeOfType<ExecutionResult>();
message.Data.ShouldNotBeNull();
message.Data.ShouldNotBeAssignableTo<Task>();
message.ShouldBeSimilarTo("""
{"data":{"messageAddedAsync":{"from":{"id":"1","displayName":"test"},"content":"test","sentAt":"2024-01-01T00:00:00\u002B00:00"}}}
""");
}

[Fact]
Expand All @@ -184,7 +219,7 @@ public async Task SubscribeWithArgument()
DisplayName = "test",
Id = "1"
},
SentAt = DateTime.Now.Date
SentAt = DateConst
};
var chat = new Chat();
var schema = new ChatSchema(chat);
Expand All @@ -203,12 +238,12 @@ public async Task SubscribeWithArgument()
chat.AddMessage(addedMessage);

/* Then */
var stream = result.Streams!.Values.First();
var stream = result.Streams.ShouldNotBeNull().Values.First();
var message = await stream.FirstOrDefaultAsync();

message.ShouldNotBeNull();
message.ShouldBeOfType<ExecutionResult>();
message.Data.ShouldNotBeNull();
message.ShouldBeSimilarTo("""
{"data":{"messageAddedByUser":{"from":{"id":"1","displayName":"test"},"content":"test","sentAt":"2024-01-01T00:00:00\u002B00:00"}}}
""");
}

[Fact]
Expand All @@ -223,7 +258,7 @@ public async Task SubscribeWithArgumentAsync()
DisplayName = "test",
Id = "1"
},
SentAt = DateTime.Now.Date
SentAt = DateConst
};
var chat = new Chat();
var schema = new ChatSchema(chat);
Expand All @@ -242,12 +277,12 @@ public async Task SubscribeWithArgumentAsync()
chat.AddMessage(addedMessage);

/* Then */
var stream = result.Streams!.Values.First();
var stream = result.Streams.ShouldNotBeNull().Values.First();
var message = await stream.FirstOrDefaultAsync();

message.ShouldNotBeNull();
message.ShouldBeOfType<ExecutionResult>();
message.Data.ShouldNotBeNull();
message.ShouldBeSimilarTo("""
{"data":{"messageAddedByUserAsync":{"from":{"id":"1","displayName":"test"},"content":"test","sentAt":"2024-01-01T00:00:00\u002B00:00"}}}
""");
}

[Fact]
Expand All @@ -267,7 +302,7 @@ public async Task OnError()
chat.AddError(new Exception("test"));

/* Then */
var stream = result.Streams!.Values.First();
var stream = result.Streams.ShouldNotBeNull().Values.First();
var error = await Should.ThrowAsync<ExecutionError>(async () => await stream.FirstOrDefaultAsync());
error.InnerException!.Message.ShouldBe("test");
error.Path.ShouldBe(new[] { "messageAdded" });
Expand Down
28 changes: 28 additions & 0 deletions src/GraphQL/Resolvers/ObservableAdapter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
namespace GraphQL.Resolvers;

/// <summary>
/// Converts an <see cref="IObservable{T}"/> for value types into an <see cref="IObservable{T}">IObservable&lt;object?&gt;</see>.
/// </summary>
internal sealed class ObservableAdapter<T> : IObservable<object?>
{
private readonly IObservable<T> _observable;

public ObservableAdapter(IObservable<T> observable)
{
_observable = observable;
}

public IDisposable Subscribe(IObserver<object?> observer) => _observable.Subscribe(new ObserverAdapter(observer));

private sealed class ObserverAdapter : IObserver<T>
{
private readonly IObserver<object?> _observer;
public ObserverAdapter(IObserver<object?> observer)
{
_observer = observer;
}
public void OnCompleted() => _observer.OnCompleted();
public void OnError(Exception error) => _observer.OnError(error);
public void OnNext(T value) => _observer.OnNext(value); // note: boxing here
}
}
27 changes: 0 additions & 27 deletions src/GraphQL/Resolvers/SourceStreamMethodResolver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -163,32 +163,5 @@ public SourceStreamMethodResolver(MethodInfo methodInfo, LambdaExpression instan
public ValueTask<IObservable<object?>> ResolveStreamAsync(IResolveFieldContext context) => _sourceStreamResolver(context);

ValueTask<IObservable<object?>> ISourceStreamResolver.ResolveAsync(IResolveFieldContext context) => ResolveStreamAsync(context);

/// <summary>
/// Converts an <see cref="IObservable{T}"/> for value types into an <see cref="IObservable{T}">IObservable&lt;object?&gt;</see>.
/// </summary>
private sealed class ObservableAdapter<T> : IObservable<object?>
{
private readonly IObservable<T> _observable;

public ObservableAdapter(IObservable<T> observable)
{
_observable = observable;
}

public IDisposable Subscribe(IObserver<object?> observer) => _observable.Subscribe(new ObserverAdapter(observer));

private sealed class ObserverAdapter : IObserver<T>
{
private readonly IObserver<object?> _observer;
public ObserverAdapter(IObserver<object?> observer)
{
_observer = observer;
}
public void OnCompleted() => _observer.OnCompleted();
public void OnError(Exception error) => _observer.OnError(error);
public void OnNext(T value) => _observer.OnNext(value); // note: boxing here
}
}
}
}
42 changes: 30 additions & 12 deletions src/GraphQL/Resolvers/SourceStreamResolver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@
throw new ArgumentNullException(nameof(sourceStreamResolver));

if (typeof(TReturnType).IsValueType)
throw new InvalidOperationException("The generic type TReturnType must not be a value type.");

_sourceStreamResolver = context => new ValueTask<IObservable<object?>>((IObservable<object?>)sourceStreamResolver(context));
{
_sourceStreamResolver = context => new(new ObservableAdapter<TReturnType?>(sourceStreamResolver(context)));
}
else
{
_sourceStreamResolver = context => new((IObservable<object?>)sourceStreamResolver(context));
}
Comment on lines 18 to +25
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes in this file are the only "real" changes in the project

}

/// <inheritdoc cref="SourceStreamResolver{TReturnType}(Func{IResolveFieldContext, IObservable{TReturnType}})"/>
Expand All @@ -28,9 +32,13 @@
throw new ArgumentNullException(nameof(sourceStreamResolver));

if (typeof(TReturnType).IsValueType)
throw new InvalidOperationException("The generic type TReturnType must not be a value type.");

_sourceStreamResolver = async context => (IObservable<object?>)await sourceStreamResolver(context).ConfigureAwait(false);
{
_sourceStreamResolver = async context => new ObservableAdapter<TReturnType?>(await sourceStreamResolver(context).ConfigureAwait(false));
}
else
{
_sourceStreamResolver = async context => (IObservable<object?>)await sourceStreamResolver(context).ConfigureAwait(false);
}
}

/// <inheritdoc/>
Expand All @@ -50,21 +58,31 @@
throw new ArgumentNullException(nameof(sourceStreamResolver));

if (typeof(TReturnType).IsValueType)
throw new InvalidOperationException("The generic type TReturnType must not be a value type.");

_sourceStreamResolver = context => new ValueTask<IObservable<object?>>((IObservable<object?>)sourceStreamResolver(context.As<TSourceType>()));
{
_sourceStreamResolver = context => new(new ObservableAdapter<TReturnType?>(sourceStreamResolver(context.As<TSourceType>())));
}
else
{
_sourceStreamResolver = context => new((IObservable<object?>)sourceStreamResolver(context.As<TSourceType>()));
}
}

/// <inheritdoc cref="SourceStreamResolver{TSourceType, TReturnType}(Func{IResolveFieldContext{TSourceType}, IObservable{TReturnType}})"/>
///
public SourceStreamResolver(Func<IResolveFieldContext<TSourceType>, ValueTask<IObservable<TReturnType?>>> sourceStreamResolver)
{
if (sourceStreamResolver == null)
throw new ArgumentNullException(nameof(sourceStreamResolver));

if (typeof(TReturnType).IsValueType)
throw new InvalidOperationException("The generic type TReturnType must not be a value type.");

_sourceStreamResolver = async context => (IObservable<object?>)await sourceStreamResolver(context.As<TSourceType>()).ConfigureAwait(false);
if (typeof(TReturnType).IsValueType)
{
_sourceStreamResolver = async context => new ObservableAdapter<TReturnType?>(await sourceStreamResolver(context.As<TSourceType>()).ConfigureAwait(false));
}
else
{
_sourceStreamResolver = async context => (IObservable<object?>)await sourceStreamResolver(context.As<TSourceType>()).ConfigureAwait(false);
}
Comment on lines +78 to +85

Check notice

Code scanning / CodeQL

Missed ternary opportunity Note

Both branches of this 'if' statement write to the same variable - consider using '?' to express intent better.
}

/// <inheritdoc/>
Expand Down
Loading