public
Description:
Homepage: http://www.iserviceoriented.com
Clone URL: git://github.com/jezell/iserviceoriented.git
100644 156 lines (128 sloc) 3.842 kb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Collections.ObjectModel;
 
namespace IServiceOriented.ServiceBus
{
    internal class Correlator
    {
        // Todo: replace with something that performs
        IDictionary<string, IAsyncResult> _replies = new Dictionary<string, IAsyncResult>();
 
        void register(string correlationId, IAsyncResult result)
        {
            lock (_replies)
            {
                _replies.Add(correlationId, result);
            }
        }
 
        void unregister(string correlationId)
        {
            lock (_replies)
            {
                _replies.Remove(correlationId);
            }
        }
 
 
        // Todo: do we need to support wait for multiple deliveries?
        public void Reply(string correlationId, MessageDelivery delivery)
        {
            IAsyncResult result;
 
            _replies.TryGetValue(correlationId, out result);
 
            if (result != null)
            {
                CorrelatorAsyncResult car = (CorrelatorAsyncResult)result;
                List<MessageDelivery> deliveries = new List<MessageDelivery>();
                deliveries.Add(delivery);
                car.Complete(new ReadOnlyCollection<MessageDelivery>(deliveries));
            }
        }
 
 
        // Todo: need expiration support
        public IAsyncResult BeginWaitForReply(string correlationId, AsyncCallback callback, object o)
        {
            CorrelatorAsyncResult result = new CorrelatorAsyncResult(correlationId, callback, o);
            register(correlationId, result);
            return result;
        }
 
        public void EndWaitForReply(IAsyncResult result)
        {
            CorrelatorAsyncResult car = (CorrelatorAsyncResult)result;
            result.AsyncWaitHandle.WaitOne(); // todo: this should have a timeout
            unregister(car.CorrelationId);
        }
 
    }
 
 
    public class CorrelatorAsyncResult : IAsyncResult, IDisposable
    {
        public CorrelatorAsyncResult(string correlationId, AsyncCallback callback, object state)
        {
            Callback = callback;
            CorrelationId = correlationId;
            AsyncState = state;
            AsyncWaitHandle = new ManualResetEvent(false);
        }
 
        public void Complete(ReadOnlyCollection<MessageDelivery> results)
        {
            Results = results;
            ((ManualResetEvent)AsyncWaitHandle).Set();
            if (Callback != null) Callback(this);
        }
 
        public ReadOnlyCollection<MessageDelivery> Results
        {
            get;
            private set;
        }
 
        public string CorrelationId
        {
            get;
            private set;
        }
 
        public AsyncCallback Callback
        {
            get;
            private set;
        }
        #region IAsyncResult Members
 
        public object AsyncState
        {
            get;
            private set;
        }
 
        public WaitHandle AsyncWaitHandle
        {
            get;
            private set;
        }
 
        public bool CompletedSynchronously
        {
            get;
            private set;
        }
 
        public bool IsCompleted
        {
            get
            {
                return AsyncWaitHandle.WaitOne(0);
            }
        }
 
        #endregion
 
        void Dispose(bool disposing)
        {
            if (disposing)
            {
                if (AsyncWaitHandle != null)
                {
                    AsyncWaitHandle.Close();
                }
            }
        }
 
        public void Dispose()
        {
            Dispose(true);
 
            GC.SuppressFinalize(this);
        }
 
        ~CorrelatorAsyncResult()
        {
            Dispose(false);
        }
    }
 
}