/
OwinResponse.cs
137 lines (117 loc) · 3.73 KB
/
OwinResponse.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Gate.Owin;
using SignalR.Abstractions;
namespace SignalR.Hosting.Owin
{
public class OwinResponse : IResponse
{
private ResultDelegate _responseCallback;
private Func<ArraySegment<byte>, Action, bool> _responseNext;
private Action<Exception> _responseError;
private Action _responseCompete;
public OwinResponse(ResultDelegate responseCallback)
{
_responseCallback = responseCallback;
IsClientConnected = true;
}
public string ContentType
{
get;
set;
}
public bool IsClientConnected
{
get;
private set;
}
public Task WriteAsync(string data)
{
return WriteAsync(data, end: false);
}
public Task EndAsync(string data)
{
return WriteAsync(data, end: true);
}
public Task End()
{
return EnsureResponseStarted().Then(cb => cb(), _responseCompete);
}
private Task WriteAsync(string data, bool end)
{
return EnsureResponseStarted()
.Then((d, e) => DoWrite(d, e), data, end)
.FastUnwrap();
}
private Task EnsureResponseStarted()
{
var responseCallback = Interlocked.Exchange(ref _responseCallback, null);
if (responseCallback == null)
{
return TaskAsyncHelper.Empty;
}
var tcs = new TaskCompletionSource<object>();
try
{
responseCallback(
"200 OK",
new Dictionary<string, IEnumerable<string>>
{
{ "Content-Type", new[] { ContentType ?? "text/plain" } },
},
(next, error, complete) =>
{
_responseNext = next;
_responseError = error;
_responseCompete = complete;
tcs.SetResult(null);
return StopSending;
});
}
catch (Exception ex)
{
tcs.SetException(ex);
}
return tcs.Task;
}
private void StopSending()
{
IsClientConnected = false;
}
private Task DoWrite(string data, bool end)
{
var tcs = new TaskCompletionSource<object>();
try
{
var value = new ArraySegment<byte>(Encoding.UTF8.GetBytes(data));
if (end)
{
// Write and send the response immediately
_responseNext(value, null);
_responseCompete();
tcs.SetResult(null);
}
else
{
if (!_responseNext(value, () => tcs.SetResult(null)))
{
tcs.SetResult(null);
}
}
}
catch (Exception ex)
{
// Infer client connectedness from fails on write
IsClientConnected = false;
// Raise the respnse error callback
_responseError(ex);
// Mark the task as complete
tcs.SetResult(null);
}
return tcs.Task;
}
}
}