-
Notifications
You must be signed in to change notification settings - Fork 4
/
Client.php
298 lines (266 loc) · 8.7 KB
/
Client.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
<?php
namespace WordPress\AsyncHttp;
use Exception;
use WordPress\Util\Map;
use function WordPress\Streams\stream_monitor_progress;
use function WordPress\Streams\streams_http_response_await_bytes;
use function WordPress\Streams\streams_send_http_requests;
/**
* An asynchronous HTTP client library designed for WordPress. Main features:
*
* **Streaming support**
* Enqueuing a request returns a PHP resource that can be read by PHP functions like `fopen()`
* and `stream_get_contents()`
*
* ```php
* $client = new AsyncHttpClient();
* $fp = $client->enqueue(
* new Request( "https://downloads.wordpress.org/plugin/gutenberg.17.7.0.zip" ),
* );
* // Read some data
* $first_4_kilobytes = fread($fp, 4096);
* // We've only waited for the first four kilobytes. The download
* // is still in progress at this point, and yet we're free to do
* // other work.
* ```
*
* **Delayed execution and concurrent downloads**
* The actual socket are not open until the first time the stream is read from:
*
* ```php
* $client = new AsyncHttpClient();
* // Enqueuing the requests does not start the data transmission yet.
* $batch = $client->enqueue( [
* new Request( "https://downloads.wordpress.org/plugin/gutenberg.17.7.0.zip" ),
* new Request( "https://downloads.wordpress.org/theme/pendant.zip" ),
* ] );
* // Even though stream_get_contents() will return just the response body for
* // one request, it also opens the network sockets and starts streaming
* // both enqueued requests. The response data for $batch[1] is buffered.
* $gutenberg_zip = stream_get_contents( $batch[0] )
*
* // At least a chunk of the pendant.zip have already been downloaded, let's
* // wait for the rest of the data:
* $pendant_zip = stream_get_contents( $batch[1] )
* ```
*
* **Concurrency limits**
* The `AsyncHttpClient` will only keep up to `$concurrency` connections open. When one of the
* requests finishes, it will automatically start the next one.
*
* For example:
* ```php
* $client = new AsyncHttpClient();
* // Process at most 10 concurrent request at a time.
* $client->set_concurrency_limit( 10 );
* ```
*
* **Progress monitoring**
* A developer-provided callback (`AsyncHttpClient->set_progress_callback()`) receives progress
* information about every HTTP request.
*
* ```php
* $client = new AsyncHttpClient();
* $client->set_progress_callback( function ( Request $request, $downloaded, $total ) {
* // $total is computed based on the Content-Length response header and
* // null if it's missing.
* echo "$request->url – Downloaded: $downloaded / $total\n";
* } );
* ```
*
* **HTTPS support**
* TLS connections work out of the box.
*
* **Non-blocking sockets**
* The act of opening each socket connection is non-blocking and happens nearly
* instantly. The streams themselves are also set to non-blocking mode via `stream_set_blocking($fp, 0);`
*
* **Asynchronous downloads**
* Start downloading now, do other work in your code, only block once you need the data.
*
* **PHP 7.0 support and no dependencies**
* `AsyncHttpClient` works on any WordPress installation with vanilla PHP only.
* It does not require any PHP extensions, CURL, or any external PHP libraries.
*
* **Supports custom request headers and body**
*/
class Client {
protected $concurrency = 10;
protected $requests;
protected $onProgress;
protected $queue_needs_processing = false;
public function __construct() {
$this->requests = new Map();
$this->onProgress = function () {
};
}
/**
* Sets the limit of concurrent connections this client will open.
*
* @param int $concurrency
*/
public function set_concurrency_limit( $concurrency ) {
$this->concurrency = $concurrency;
}
/**
* Sets the callback called when response bytes are received on any of the enqueued
* requests.
*
* @param callable $onProgress A function of three arguments:
* Request $request, int $downloaded, int $total.
*/
public function set_progress_callback( $onProgress ) {
$this->onProgress = $onProgress;
}
/**
* Enqueues one or multiple HTTP requests for asynchronous processing.
* It does not open the network sockets, only adds the Request objects to
* an internal queue. Network transmission is delayed until one of the returned
* streams is read from.
*
* @param Request|Request[] $requests The HTTP request(s) to enqueue. Can be a single request or an array of requests.
*
* @return resource|array The enqueued streams.
*/
public function enqueue( $requests ) {
if ( ! is_array( $requests ) ) {
return $this->enqueue_request( $requests );
}
$enqueued_streams = array();
foreach ( $requests as $request ) {
$enqueued_streams[] = $this->enqueue_request( $request );
}
return $enqueued_streams;
}
/**
* Returns the response stream associated with the given Request object.
* Enqueues the Request if it hasn't been enqueued yet.
*
* @param Request $request
*
* @return resource
*/
public function get_stream( $request ) {
if ( ! isset( $this->requests[ $request ] ) ) {
$this->enqueue_request( $request );
}
if ( $this->queue_needs_processing ) {
$this->process_queue();
}
return $this->requests[ $request ]->stream;
}
/**
* @param \WordPress\AsyncHttp\Request $request
*/
protected function enqueue_request( $request ) {
$stream = StreamWrapper::create_resource(
new StreamData( $request, $this )
);
$this->requests[ $request ] = new RequestInfo( $stream );
$this->queue_needs_processing = true;
return $stream;
}
/**
* Starts n enqueued request up to the $concurrency_limit.
*/
public function process_queue() {
$this->queue_needs_processing = false;
$active_requests = count( $this->get_streamed_requests() );
$backfill = $this->concurrency - $active_requests;
if ( $backfill <= 0 ) {
return;
}
$enqueued = array_slice( $this->get_enqueued_request(), 0, $backfill );
list( $streams, $response_headers ) = streams_send_http_requests( $enqueued );
foreach ( $streams as $k => $stream ) {
$request = $enqueued[ $k ];
$total = $response_headers[ $k ]['headers']['content-length'] ?? null;
$this->requests[ $request ]->state = RequestInfo::STATE_STREAMING;
$this->requests[ $request ]->stream = stream_monitor_progress(
$stream,
function ( $downloaded ) use ( $request, $total ) {
$onProgress = $this->onProgress;
$onProgress( $request, $downloaded, $total );
}
);
}
}
protected function get_enqueued_request() {
$enqueued_requests = array();
foreach ( $this->requests as $request => $info ) {
if ( $info->state === RequestInfo::STATE_ENQUEUED ) {
$enqueued_requests[] = $request;
}
}
return $enqueued_requests;
}
protected function get_streamed_requests() {
$active_requests = array();
foreach ( $this->requests as $request => $info ) {
if ( $info->state !== RequestInfo::STATE_ENQUEUED ) {
$active_requests[] = $request;
}
}
return $active_requests;
}
/**
* Reads up to $length bytes from the stream while polling all the active streams.
*
* @param Request $request
* @param $length
*
* @return false|string
* @throws Exception
*/
public function read_bytes( $request, $length ) {
if ( ! isset( $this->requests[ $request ] ) ) {
return false;
}
if ( $this->queue_needs_processing ) {
$this->process_queue();
}
$request_info = $this->requests[ $request ];
$stream = $request_info->stream;
$active_requests = $this->get_streamed_requests();
$active_streams = array_map(
function ( $request ) {
return $this->requests[ $request ]->stream;
},
$active_requests
);
if ( ! count( $active_streams ) ) {
return false;
}
while ( true ) {
if ( ! $request_info->is_finished() && feof( $stream ) ) {
$request_info->state = RequestInfo::STATE_FINISHED;
fclose( $stream );
$this->queue_needs_processing = true;
}
if ( strlen( $request_info->buffer ) >= $length ) {
$buffered = substr( $request_info->buffer, 0, $length );
$request_info->buffer = substr( $request_info->buffer, $length );
return $buffered;
} elseif ( $request_info->is_finished() ) {
unset( $this->requests[ $request ] );
return $request_info->buffer;
}
$active_streams = array_filter(
$active_streams,
function ( $stream ) {
return ! feof( $stream );
}
);
if ( ! count( $active_streams ) ) {
continue;
}
$bytes = streams_http_response_await_bytes(
$active_streams,
$length - strlen( $request_info->buffer )
);
foreach ( $bytes as $k => $chunk ) {
$this->requests[ $active_requests[ $k ] ]->buffer .= $chunk;
}
}
}
}