/
SubscriberExtensions.AsObservable.cs
93 lines (77 loc) · 3.2 KB
/
SubscriberExtensions.AsObservable.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
using System;
namespace MessagePipe
{
public static partial class SubscriberExtensions
{
public static IObservable<TMessage> AsObservable<TMessage>(this ISubscriber<TMessage> subscriber, params MessageHandlerFilter<TMessage>[] filters)
{
return new ObservableSubscriber<TMessage>(subscriber, filters);
}
public static IObservable<TMessage> AsObservable<TMessage>(this IBufferedSubscriber<TMessage> subscriber, params MessageHandlerFilter<TMessage>[] filters)
{
return new ObservableBufferedSubscriber<TMessage>(subscriber, filters);
}
public static IObservable<TMessage> AsObservable<TKey, TMessage>(this ISubscriber<TKey, TMessage> subscriber, TKey key, params MessageHandlerFilter<TMessage>[] filters)
where TKey : notnull
{
return new ObservableSubscriber<TKey, TMessage>(key, subscriber, filters);
}
}
internal sealed class ObservableSubscriber<TKey, TMessage> : IObservable<TMessage>
where TKey : notnull
{
readonly TKey key;
readonly ISubscriber<TKey, TMessage> subscriber;
readonly MessageHandlerFilter<TMessage>[] filters;
public ObservableSubscriber(TKey key, ISubscriber<TKey, TMessage> subscriber, MessageHandlerFilter<TMessage>[] filters)
{
this.key = key;
this.subscriber = subscriber;
this.filters = filters;
}
public IDisposable Subscribe(IObserver<TMessage> observer)
{
return subscriber.Subscribe(key, new ObserverMessageHandler<TMessage>(observer), filters);
}
}
internal sealed class ObservableSubscriber<TMessage> : IObservable<TMessage>
{
readonly ISubscriber<TMessage> subscriber;
readonly MessageHandlerFilter<TMessage>[] filters;
public ObservableSubscriber(ISubscriber<TMessage> subscriber, MessageHandlerFilter<TMessage>[] filters)
{
this.subscriber = subscriber;
this.filters = filters;
}
public IDisposable Subscribe(IObserver<TMessage> observer)
{
return subscriber.Subscribe(new ObserverMessageHandler<TMessage>(observer), filters);
}
}
internal sealed class ObservableBufferedSubscriber<TMessage> : IObservable<TMessage>
{
readonly IBufferedSubscriber<TMessage> subscriber;
readonly MessageHandlerFilter<TMessage>[] filters;
public ObservableBufferedSubscriber(IBufferedSubscriber<TMessage> subscriber, MessageHandlerFilter<TMessage>[] filters)
{
this.subscriber = subscriber;
this.filters = filters;
}
public IDisposable Subscribe(IObserver<TMessage> observer)
{
return subscriber.Subscribe(new ObserverMessageHandler<TMessage>(observer), filters);
}
}
internal sealed class ObserverMessageHandler<TMessage> : IMessageHandler<TMessage>
{
readonly IObserver<TMessage> observer;
public ObserverMessageHandler(IObserver<TMessage> observer)
{
this.observer = observer;
}
public void Handle(TMessage message)
{
observer.OnNext(message);
}
}
}