/
AsyncSubject.cs
53 lines (50 loc) · 2.15 KB
/
AsyncSubject.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
using System.ComponentModel;
using System.Linq.Expressions;
using System.Reactive.Subjects;
using System.Xml.Serialization;
using Bonsai.Expressions;
using Rx = System.Reactive.Subjects;
namespace Bonsai.Reactive
{
/// <summary>
/// Represents an expression builder that broadcasts the last value of an observable
/// sequence to all subscribed and future observers using a shared subject.
/// </summary>
[WorkflowElementIcon(nameof(AsyncSubject))]
[XmlType(Namespace = Constants.ReactiveXmlNamespace)]
[Description("Broadcasts the last value of an observable sequence to all subscribed and future observers using a shared subject.")]
public class AsyncSubject : SubjectBuilder
{
/// <inheritdoc/>
protected override Expression BuildSubject(Expression expression)
{
var builderExpression = Expression.Constant(this);
var parameterType = expression.Type.GetGenericArguments()[0];
return Expression.Call(builderExpression, nameof(CreateSubject), new[] { parameterType });
}
Rx.AsyncSubject<TSource> CreateSubject<TSource>()
{
return new Rx.AsyncSubject<TSource>();
}
}
/// <summary>
/// Represents an expression builder that broadcasts the result of the first observable
/// sequence to complete to all subscribed and future observers.
/// </summary>
/// <typeparam name="T">The type of the result stored by the subject.</typeparam>
[WorkflowElementIcon(nameof(AsyncSubject))]
[XmlType(Namespace = Constants.ReactiveXmlNamespace)]
[Description("Broadcasts the result of the first observable sequence to complete to all subscribed and future observers.")]
public class AsyncSubject<T> : SubjectBuilder<T>
{
/// <summary>
/// Creates a shared subject that broadcasts the result of the first observable
/// sequence to complete to all subscribed and future observers.
/// </summary>
/// <returns>A new instance of <see cref="ISubject{T}"/>.</returns>
protected override ISubject<T> CreateSubject()
{
return new Rx.AsyncSubject<T>();
}
}
}