/
Connection.cs
309 lines (271 loc) · 11.4 KB
/
Connection.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using PuppeteerSharp.Helpers;
using PuppeteerSharp.Helpers.Json;
using PuppeteerSharp.Messaging;
using PuppeteerSharp.Transport;
namespace PuppeteerSharp
{
/// <summary>
/// A connection handles the communication with a Chromium browser
/// </summary>
public class Connection : IDisposable
{
private readonly ILogger _logger;
private TaskQueue _callbackQueue = new TaskQueue();
internal Connection(string url, int delay, IConnectionTransport transport, ILoggerFactory loggerFactory = null)
{
LoggerFactory = loggerFactory ?? new LoggerFactory();
Url = url;
Delay = delay;
Transport = transport;
_logger = LoggerFactory.CreateLogger<Connection>();
Transport.MessageReceived += Transport_MessageReceived;
Transport.Closed += Transport_Closed;
_callbacks = new ConcurrentDictionary<int, MessageTask>();
_sessions = new ConcurrentDictionary<string, CDPSession>();
_asyncSessions = new AsyncDictionaryHelper<string, CDPSession>(_sessions, "Session {0} not found");
}
#region Private Members
private int _lastId;
private readonly ConcurrentDictionary<int, MessageTask> _callbacks;
private readonly ConcurrentDictionary<string, CDPSession> _sessions;
private readonly AsyncDictionaryHelper<string, CDPSession> _asyncSessions;
#endregion
#region Properties
/// <summary>
/// Gets the WebSocket URL.
/// </summary>
/// <value>The URL.</value>
public string Url { get; }
/// <summary>
/// Gets the sleep time when a message is received.
/// </summary>
/// <value>The delay.</value>
public int Delay { get; }
/// <summary>
/// Gets the Connection transport.
/// </summary>
/// <value>Connection transport.</value>
public IConnectionTransport Transport { get; }
/// <summary>
/// Occurs when the connection is closed.
/// </summary>
public event EventHandler Disconnected;
/// <summary>
/// Occurs when a message from chromium is received.
/// </summary>
public event EventHandler<MessageEventArgs> MessageReceived;
/// <summary>
/// Gets or sets a value indicating whether this <see cref="Connection"/> is closed.
/// </summary>
/// <value><c>true</c> if is closed; otherwise, <c>false</c>.</value>
public bool IsClosed { get; internal set; }
/// <summary>
/// Connection close reason.
/// </summary>
public string CloseReason { get; private set; }
/// <summary>
/// Gets the logger factory.
/// </summary>
/// <value>The logger factory.</value>
public ILoggerFactory LoggerFactory { get; }
#endregion
#region Public Methods
internal int GetMessageID() => Interlocked.Increment(ref _lastId);
internal Task RawSendASync(int id, string method, object args, string sessionId = null)
{
_logger.LogTrace("Send ► {Id} Method {Method} Params {@Params}", id, method, args);
return Transport.SendAsync(JsonConvert.SerializeObject(new ConnectionRequest
{
Id = id,
Method = method,
Params = args,
SessionId = sessionId
}, JsonHelper.DefaultJsonSerializerSettings));
}
internal async Task<JObject> SendAsync(string method, object args = null, bool waitForCallback = true)
{
if (IsClosed)
{
throw new TargetClosedException($"Protocol error({method}): Target closed.", CloseReason);
}
var id = GetMessageID();
MessageTask callback = null;
if (waitForCallback)
{
callback = new MessageTask
{
TaskWrapper = new TaskCompletionSource<JObject>(),
Method = method
};
_callbacks[id] = callback;
}
await RawSendASync(id, method, args).ConfigureAwait(false);
return waitForCallback ? await callback.TaskWrapper.Task.ConfigureAwait(false) : null;
}
internal async Task<T> SendAsync<T>(string method, object args = null)
{
var response = await SendAsync(method, args).ConfigureAwait(false);
return response.ToObject<T>(true);
}
internal async Task<CDPSession> CreateSessionAsync(TargetInfo targetInfo)
{
var sessionId = (await SendAsync<TargetAttachToTargetResponse>("Target.attachToTarget", new TargetAttachToTargetRequest
{
TargetId = targetInfo.TargetId,
Flatten = true
}).ConfigureAwait(false)).SessionId;
return await GetSessionAsync(sessionId).ConfigureAwait(false);
}
internal bool HasPendingCallbacks() => _callbacks.Count != 0;
#endregion
internal void Close(string closeReason)
{
if (IsClosed)
{
return;
}
IsClosed = true;
CloseReason = closeReason;
Transport.StopReading();
Disconnected?.Invoke(this, new EventArgs());
foreach (var session in _sessions.Values.ToArray())
{
session.Close(closeReason);
}
_sessions.Clear();
foreach (var response in _callbacks.Values.ToArray())
{
response.TaskWrapper.TrySetException(new TargetClosedException(
$"Protocol error({response.Method}): Target closed.",
closeReason
));
}
_callbacks.Clear();
}
internal static Connection FromSession(CDPSession session) => session.Connection;
internal CDPSession GetSession(string sessionId) => _sessions.GetValueOrDefault(sessionId);
internal Task<CDPSession> GetSessionAsync(string sessionId) => _asyncSessions.GetItemAsync(sessionId);
#region Private Methods
private async void Transport_MessageReceived(object sender, MessageReceivedEventArgs e) => await _callbackQueue.Enqueue(() => ProcessMessage(e));
private async Task ProcessMessage(MessageReceivedEventArgs e)
{
try
{
var response = e.Message;
ConnectionResponse obj = null;
if (response.Length > 0 && Delay > 0)
{
await Task.Delay(Delay).ConfigureAwait(false);
}
try
{
obj = JsonConvert.DeserializeObject<ConnectionResponse>(response, JsonHelper.DefaultJsonSerializerSettings);
}
catch (JsonException exc)
{
_logger.LogError(exc, "Failed to deserialize response", response);
return;
}
_logger.LogTrace("◀ Receive {Message}", response);
ProcessIncomingMessage(obj);
}
catch (Exception ex)
{
var message = $"Connection failed to process {e.Message}. {ex.Message}. {ex.StackTrace}";
_logger.LogError(ex, message);
Close(message);
}
}
private void ProcessIncomingMessage(ConnectionResponse obj)
{
var method = obj.Method;
var param = obj.Params?.ToObject<ConnectionResponseParams>();
if (method == "Target.attachedToTarget")
{
var sessionId = param.SessionId;
var session = new CDPSession(this, param.TargetInfo.Type, sessionId);
_asyncSessions.AddItem(sessionId, session);
}
else if (method == "Target.detachedFromTarget")
{
var sessionId = param.SessionId;
if (_sessions.TryRemove(sessionId, out var session) && !session.IsClosed)
{
session.Close("Target.detachedFromTarget");
}
}
if (!string.IsNullOrEmpty(obj.SessionId))
{
var session = GetSession(obj.SessionId);
session.OnMessage(obj);
}
else if (obj.Id.HasValue)
{
//If we get the object we are waiting for we return if
//if not we add this to the list, sooner or later some one will come for it
if (_callbacks.TryRemove(obj.Id.Value, out var callback))
{
if (obj.Error != null)
{
callback.TaskWrapper.TrySetException(new MessageException(callback, obj.Error));
}
else
{
callback.TaskWrapper.TrySetResult(obj.Result);
}
}
}
else
{
MessageReceived?.Invoke(this, new MessageEventArgs
{
MessageID = method,
MessageData = obj.Params
});
}
}
void Transport_Closed(object sender, TransportClosedEventArgs e) => Close(e.CloseReason);
#endregion
#region Static Methods
/// <summary>
/// Gets default web socket factory implementation.
/// </summary>
[Obsolete("Use " + nameof(WebSocketTransport) + "." + nameof(WebSocketTransport.DefaultWebSocketFactory) + " instead")]
public static readonly WebSocketFactory DefaultWebSocketFactory = WebSocketTransport.DefaultWebSocketFactory;
internal static async Task<Connection> Create(string url, IConnectionOptions connectionOptions, ILoggerFactory loggerFactory = null, CancellationToken cancellationToken = default)
{
#pragma warning disable 618
var transport = connectionOptions.Transport;
#pragma warning restore 618
if (transport == null)
{
var transportFactory = connectionOptions.TransportFactory ?? WebSocketTransport.DefaultTransportFactory;
transport = await transportFactory(new Uri(url), connectionOptions, cancellationToken);
}
return new Connection(url, connectionOptions.SlowMo, transport, loggerFactory);
}
/// <summary>
/// Releases all resource used by the <see cref="Connection"/> object.
/// It will raise the <see cref="Disconnected"/> event and dispose <see cref="Transport"/>.
/// </summary>
/// <remarks>Call <see cref="Dispose"/> when you are finished using the <see cref="Connection"/>. The
/// <see cref="Dispose"/> method leaves the <see cref="Connection"/> in an unusable state.
/// After calling <see cref="Dispose"/>, you must release all references to the
/// <see cref="Connection"/> so the garbage collector can reclaim the memory that the
/// <see cref="Connection"/> was occupying.</remarks>
public void Dispose()
{
Close("Connection disposed");
Transport.Dispose();
}
#endregion
}
}