1+ using System ;
2+ using System . Collections . Generic ;
3+ using System . Reactive . Disposables ;
4+ using WampSharp . Core . Serialization ;
5+ using WampSharp . V2 . Core ;
6+ using WampSharp . V2 . Core . Contracts ;
7+ using WampSharp . V2 . PubSub ;
8+ using WampSharp . V2 . Rpc ;
9+
10+ namespace WampSharp . V2 . MetaApi
11+ {
12+ public interface IWampEventHistoryProvider
13+ {
14+ [ WampProcedure ( "wamp.subscription.get_events" ) ]
15+ WampHistoricalEvent [ ] GetEvents ( long subscriptionId , long limit = 10 ) ;
16+ }
17+
18+ public class MonitoredTopics
19+ {
20+ public string TopicUri { get ; set ; }
21+
22+ public SubscribeOptions SubscribeOptions { get ; set ; }
23+ }
24+
25+ public interface IWampEventStore
26+ {
27+ void StoreEvent < TMessage > ( IWampFormatter < TMessage > formatter , long subscriptionId , WampHistoricalEvent < TMessage > historicalEvent ) ;
28+
29+ WampHistoricalEvent < object > [ ] GetEvents ( long subscriptionId , long limit ) ;
30+ }
31+
32+ class WampInMemoryEventStore : IWampEventStore
33+ {
34+ public void StoreEvent < TMessage >
35+ ( IWampFormatter < TMessage > formatter ,
36+ long subscriptionId ,
37+ WampHistoricalEvent < TMessage > historicalEvent )
38+ {
39+ }
40+
41+ public WampHistoricalEvent < object > [ ] GetEvents ( long subscriptionId , long limit )
42+ {
43+ return new WampHistoricalEvent < object > [ ] { } ;
44+ }
45+ }
46+
47+ public class WampEventHistoryProvider : IDisposable
48+ {
49+ private readonly IWampEventStore mStore ;
50+ private readonly IDisposable mDisposable ;
51+
52+ public WampEventHistoryProvider ( IWampTopicContainer container ,
53+ IEnumerable < MonitoredTopics > topicsToRecord , IWampEventStore store )
54+ {
55+ mStore = store ;
56+
57+ List < IDisposable > disposables = new List < IDisposable > ( ) ;
58+
59+ foreach ( MonitoredTopics topicToRecord in topicsToRecord )
60+ {
61+ EventHistorySubscriber subscriber = new EventHistorySubscriber ( this ) ;
62+
63+ IWampRegistrationSubscriptionToken disposable =
64+ container . Subscribe ( subscriber ,
65+ topicToRecord . TopicUri ,
66+ topicToRecord . SubscribeOptions ) ;
67+
68+ subscriber . SubscriptionId = disposable . TokenId ;
69+
70+ disposables . Add ( disposable ) ;
71+ }
72+
73+ mDisposable = new CompositeDisposable ( disposables ) ;
74+ }
75+
76+ private void OnEvent < TMessage > ( IWampFormatter < TMessage > formatter , long subscriptionId , WampHistoricalEvent < TMessage > wampHistoricalEvent )
77+ {
78+ mStore . StoreEvent ( formatter , subscriptionId , wampHistoricalEvent ) ;
79+ }
80+
81+ private class EventHistorySubscriber : IWampRawTopicRouterSubscriber
82+ {
83+ private readonly WampEventHistoryProvider mParent ;
84+
85+ public EventHistorySubscriber ( WampEventHistoryProvider parent )
86+ {
87+ mParent = parent ;
88+ }
89+
90+ public long SubscriptionId
91+ {
92+ get ;
93+ set ;
94+ }
95+
96+ public void Event < TMessage > ( IWampFormatter < TMessage > formatter ,
97+ long publicationId ,
98+ PublishOptions options )
99+ {
100+ InnerEvent ( formatter , publicationId , options ) ;
101+ }
102+
103+ public void Event < TMessage > ( IWampFormatter < TMessage > formatter ,
104+ long publicationId ,
105+ PublishOptions options ,
106+ TMessage [ ] arguments )
107+ {
108+ InnerEvent ( formatter , publicationId , options , arguments ) ;
109+ }
110+
111+ public void Event < TMessage > ( IWampFormatter < TMessage > formatter ,
112+ long publicationId ,
113+ PublishOptions options ,
114+ TMessage [ ] arguments ,
115+ IDictionary < string , TMessage > argumentsKeywords )
116+ {
117+ InnerEvent ( formatter , publicationId , options , arguments , argumentsKeywords ) ;
118+ }
119+
120+ private void InnerEvent < TMessage > ( IWampFormatter < TMessage > formatter , long publicationId , PublishOptions options , TMessage [ ] arguments = null , IDictionary < string , TMessage > argumentsKeywords = null )
121+ {
122+ PublishOptionsExtended optionsExtended = options as PublishOptionsExtended ;
123+
124+ mParent . OnEvent ( formatter ,
125+ SubscriptionId ,
126+ new WampHistoricalEvent < TMessage > ( )
127+ {
128+ Timestamp = DateTime . Now ,
129+ Topic = optionsExtended . TopicUri ,
130+ Arguments = arguments ,
131+ ArgumentsKeywords = argumentsKeywords ,
132+ PublicationId = publicationId ,
133+ PublisherId = optionsExtended . PublisherId
134+ } ) ;
135+ }
136+ }
137+
138+ [ WampProcedure ( "wamp.subscription.get_events" ) ]
139+ public WampHistoricalEvent < object > [ ] GetEvents ( long subscriptionId , long limit = 10 )
140+ {
141+ return mStore . GetEvents ( subscriptionId , limit ) ;
142+ }
143+
144+ public void Dispose ( )
145+ {
146+ mDisposable . Dispose ( ) ;
147+ }
148+ }
149+ }
0 commit comments