Skip to content
This repository
Browse code

Made some fixes to the SSE event stream reader.

- Handle exceptions on the read task better.
- Don't raise onopened if the first read is a failure.
  • Loading branch information...
commit 7f0fd4ddc70e6fe2561a3eb6bfd0a9acf3ee4576 1 parent d26b018
David Fowler authored July 06, 2012
78  SignalR.Client/Transports/ServerSentEvents/EventSourceStreamReader.cs
@@ -2,6 +2,7 @@
2 2
 using System.Diagnostics;
3 3
 using System.IO;
4 4
 using System.Threading;
  5
+using System.Threading.Tasks;
5 6
 using SignalR.Client.Infrastructure;
6 7
 
7 8
 namespace SignalR.Client.Transports.ServerSentEvents
@@ -80,40 +81,72 @@ public void Close()
80 81
 
81 82
         private void Process()
82 83
         {
  84
+        Read:
  85
+
83 86
             if (!Processing)
84 87
             {
85 88
                 return;
86 89
             }
87 90
 
88 91
             var buffer = new byte[4096];
89  
-            _stream.ReadAsync(buffer).ContinueWith(task =>
90  
-            {
91  
-                // When the first get data from the server the trigger the event.
92  
-                Interlocked.Exchange(ref _setOpened, () => { }).Invoke();
93 92
 
94  
-                if (task.IsFaulted)
  93
+            Task<int> readTask = _stream.ReadAsync(buffer);
  94
+
  95
+            if (readTask.IsCompleted)
  96
+            {
  97
+                try
95 98
                 {
96  
-                    Close(task.Exception.Unwrap());
97  
-                    return;
98  
-                }
  99
+                    // Observe all exceptions
  100
+                    readTask.Wait();
99 101
 
100  
-                int read = task.Result;
  102
+                    int read = readTask.Result;
101 103
 
102  
-                if (read > 0)
103  
-                {
104  
-                    // Put chunks in the buffer
105  
-                    ProcessBuffer(buffer, read);
  104
+                    if (TryProcessRead(buffer, read))
  105
+                    {
  106
+                        goto Read;
  107
+                    }
106 108
                 }
107  
-
108  
-                if (read == 0)
  109
+                catch (Exception ex)
109 110
                 {
110  
-                    Close();
111  
-                    return;
  111
+                    Close(ex);
112 112
                 }
  113
+            }
  114
+            else
  115
+            {
  116
+                ReadAsync(readTask, buffer);
  117
+            }
  118
+        }
113 119
 
114  
-                // Keep reading the next set of data
115  
-                Process();
116  
-            });
  120
+        private void ReadAsync(Task<int> readTask, byte[] buffer)
  121
+        {
  122
+            readTask.Catch(ex => Close(ex))
  123
+                    .Then(read =>
  124
+                    {
  125
+                        if (TryProcessRead(buffer, read))
  126
+                        {
  127
+                            Process();
  128
+                        }
  129
+                    })
  130
+                    .Catch();
  131
+        }
  132
+
  133
+        private bool TryProcessRead(byte[] buffer, int read)
  134
+        {
  135
+            Interlocked.Exchange(ref _setOpened, () => { }).Invoke();
  136
+
  137
+            if (read > 0)
  138
+            {
  139
+                // Put chunks in the buffer
  140
+                ProcessBuffer(buffer, read);
  141
+
  142
+                return true;
  143
+            }
  144
+            else if (read == 0)
  145
+            {
  146
+                Close();
  147
+            }
  148
+
  149
+            return false;
117 150
         }
118 151
 
119 152
         private void ProcessBuffer(byte[] buffer, int read)
@@ -152,6 +185,11 @@ private void Close(Exception exception)
152 185
                 Debug.WriteLine("EventSourceReader: Connection Closed");
153 186
                 if (Closed != null)
154 187
                 {
  188
+                    if (exception != null)
  189
+                    {
  190
+                        exception = exception.Unwrap();
  191
+                    }
  192
+
155 193
                     Closed(exception);
156 194
                 }
157 195
             }
20  SignalR.Tests/EventSourceStreamReaderFacts.cs
@@ -27,6 +27,26 @@ public void ReadTriggersOpenedOnOpen()
27 27
             Assert.Equal("somedata", tcs.Task.Result);
28 28
         }
29 29
 
  30
+        [Fact]
  31
+        public void CloseThrowsSouldntTakeProcessDown()
  32
+        {
  33
+            var memoryStream = MemoryStream("");
  34
+            var eventSource = new EventSourceStreamReader(memoryStream);
  35
+
  36
+            eventSource.Closed = (ex) =>
  37
+            {
  38
+                throw new Exception("Throw on closed");
  39
+            };
  40
+
  41
+            eventSource.Start();
  42
+            
  43
+            // Force any finalizers to run so we can see unhandled task errors
  44
+            GC.Collect();
  45
+            GC.WaitForPendingFinalizers();
  46
+
  47
+            Thread.Sleep(TimeSpan.FromSeconds(5));
  48
+        }
  49
+
30 50
         private MemoryStream MemoryStream(string data)
31 51
         {
32 52
             return new MemoryStream(Encoding.UTF8.GetBytes(data));

0 notes on commit 7f0fd4d

Please sign in to comment.
Something went wrong with that request. Please try again.