/
BehaviorSubject.cs
82 lines (73 loc) · 2.81 KB
/
BehaviorSubject.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
using System;
using System.ComponentModel;
using System.Linq.Expressions;
using System.Reactive.Subjects;
using System.Xml.Serialization;
using Bonsai.Expressions;
using Rx = System.Reactive.Subjects;
namespace Bonsai.Reactive
{
/// <summary>
/// Represents an expression builder that broadcasts the latest value of an observable
/// sequence to all subscribed and future observers using a shared subject.
/// </summary>
[WorkflowElementIcon(nameof(BehaviorSubject))]
[XmlType(Namespace = Constants.ReactiveXmlNamespace)]
[Description("Broadcasts the latest value of an observable sequence to all subscribed and future observers using a shared subject.")]
public class BehaviorSubject : SubjectBuilder
{
/// <inheritdoc/>
protected override Expression BuildSubject(Expression expression)
{
var builderExpression = Expression.Constant(this);
var parameterType = expression.Type.GetGenericArguments()[0];
return Expression.Call(builderExpression, nameof(CreateSubject), new[] { parameterType });
}
BehaviorSubject<TSource>.Subject CreateSubject<TSource>()
{
return new BehaviorSubject<TSource>.Subject();
}
}
/// <summary>
/// Represents an expression builder that broadcasts the latest value from other observable
/// sequences to all subscribed and future observers.
/// </summary>
/// <typeparam name="T">The type of the elements processed by the subject.</typeparam>
[WorkflowElementIcon(nameof(BehaviorSubject))]
[XmlType(Namespace = Constants.ReactiveXmlNamespace)]
[Description("Broadcasts the latest value from other observable sequences to all subscribed and future observers.")]
public class BehaviorSubject<T> : SubjectBuilder<T>
{
/// <summary>
/// Creates a shared subject that broadcasts the latest value from other observable
/// sequences to all subscribed and future observers.
/// </summary>
/// <returns>A new instance of <see cref="ISubject{T}"/>.</returns>
protected override ISubject<T> CreateSubject()
{
return new Subject();
}
internal class Subject : ISubject<T>, IDisposable
{
readonly Rx.ReplaySubject<T> subject = new Rx.ReplaySubject<T>(1);
public void OnCompleted()
{
}
public void OnError(Exception error)
{
}
public void OnNext(T value)
{
subject.OnNext(value);
}
public IDisposable Subscribe(IObserver<T> observer)
{
return subject.Subscribe(observer);
}
public void Dispose()
{
subject.Dispose();
}
}
}
}