forked from openjdk/jdk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Stream.java
1496 lines (1357 loc) · 58.6 KB
/
Stream.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Copyright (c) 2015, 2021, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package jdk.internal.net.http;
import java.io.EOFException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiPredicate;
import java.net.http.HttpClient;
import java.net.http.HttpHeaders;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodySubscriber;
import jdk.internal.net.http.common.*;
import jdk.internal.net.http.frame.*;
import jdk.internal.net.http.hpack.DecodingCallback;
/**
* Http/2 Stream handling.
*
* REQUESTS
*
* sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q
*
* sendRequest() -- sendHeadersOnly() + sendBody()
*
* sendBodyAsync() -- calls sendBody() in an executor thread.
*
* sendHeadersAsync() -- calls sendHeadersOnly() which does not block
*
* sendRequestAsync() -- calls sendRequest() in an executor thread
*
* RESPONSES
*
* Multiple responses can be received per request. Responses are queued up on
* a LinkedList of CF<HttpResponse> and the first one on the list is completed
* with the next response
*
* getResponseAsync() -- queries list of response CFs and returns first one
* if one exists. Otherwise, creates one and adds it to list
* and returns it. Completion is achieved through the
* incoming() upcall from connection reader thread.
*
* getResponse() -- calls getResponseAsync() and waits for CF to complete
*
* responseBodyAsync() -- calls responseBody() in an executor thread.
*
* incoming() -- entry point called from connection reader thread. Frames are
* either handled immediately without blocking or for data frames
* placed on the stream's inputQ which is consumed by the stream's
* reader thread.
*
* PushedStream sub class
* ======================
* Sending side methods are not used because the request comes from a PUSH_PROMISE
* frame sent by the server. When a PUSH_PROMISE is received the PushedStream
* is created. PushedStream does not use responseCF list as there can be only
* one response. The CF is created when the object created and when the response
* HEADERS frame is received the object is completed.
*/
class Stream<T> extends ExchangeImpl<T> {
final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
final ConcurrentLinkedQueue<Http2Frame> inputQ = new ConcurrentLinkedQueue<>();
final SequentialScheduler sched =
SequentialScheduler.synchronizedScheduler(this::schedule);
final SubscriptionBase userSubscription =
new SubscriptionBase(sched, this::cancel, this::onSubscriptionError);
/**
* This stream's identifier. Assigned lazily by the HTTP2Connection before
* the stream's first frame is sent.
*/
protected volatile int streamid;
long requestContentLen;
final Http2Connection connection;
final HttpRequestImpl request;
final HeadersConsumer rspHeadersConsumer;
final HttpHeadersBuilder responseHeadersBuilder;
final HttpHeaders requestPseudoHeaders;
volatile HttpResponse.BodySubscriber<T> responseSubscriber;
final HttpRequest.BodyPublisher requestPublisher;
volatile RequestSubscriber requestSubscriber;
volatile int responseCode;
volatile Response response;
// The exception with which this stream was canceled.
private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>();
volatile CompletableFuture<T> responseBodyCF;
volatile HttpResponse.BodySubscriber<T> pendingResponseSubscriber;
volatile boolean stopRequested;
/** True if END_STREAM has been seen in a frame received on this stream. */
private volatile boolean remotelyClosed;
private volatile boolean closed;
private volatile boolean endStreamSent;
// Indicates the first reason that was invoked when sending a ResetFrame
// to the server. A streamState of 0 indicates that no reset was sent.
// (see markStream(int code)
private volatile int streamState; // assigned using STREAM_STATE varhandle.
private volatile boolean deRegistered; // assigned using DEREGISTERED varhandle.
// state flags
private boolean requestSent, responseReceived;
// send lock: prevent sending DataFrames after reset occurred.
private final Object sendLock = new Object();
/**
* A reference to this Stream's connection Send Window controller. The
* stream MUST acquire the appropriate amount of Send Window before
* sending any data. Will be null for PushStreams, as they cannot send data.
*/
private final WindowController windowController;
private final WindowUpdateSender windowUpdater;
@Override
HttpConnection connection() {
return connection.connection;
}
/**
* Invoked either from incoming() -> {receiveDataFrame() or receiveResetFrame() }
* of after user subscription window has re-opened, from SubscriptionBase.request()
*/
private void schedule() {
boolean onCompleteCalled = false;
HttpResponse.BodySubscriber<T> subscriber = responseSubscriber;
try {
if (subscriber == null) {
subscriber = responseSubscriber = pendingResponseSubscriber;
if (subscriber == null) {
// can't process anything yet
return;
} else {
if (debug.on()) debug.log("subscribing user subscriber");
subscriber.onSubscribe(userSubscription);
}
}
while (!inputQ.isEmpty()) {
Http2Frame frame = inputQ.peek();
if (frame instanceof ResetFrame) {
inputQ.remove();
handleReset((ResetFrame)frame, subscriber);
return;
}
DataFrame df = (DataFrame)frame;
boolean finished = df.getFlag(DataFrame.END_STREAM);
List<ByteBuffer> buffers = df.getData();
List<ByteBuffer> dsts = Collections.unmodifiableList(buffers);
int size = Utils.remaining(dsts, Integer.MAX_VALUE);
if (size == 0 && finished) {
inputQ.remove();
connection.ensureWindowUpdated(df); // must update connection window
Log.logTrace("responseSubscriber.onComplete");
if (debug.on()) debug.log("incoming: onComplete");
sched.stop();
connection.decrementStreamsCount(streamid);
subscriber.onComplete();
onCompleteCalled = true;
setEndStreamReceived();
return;
} else if (userSubscription.tryDecrement()) {
inputQ.remove();
Log.logTrace("responseSubscriber.onNext {0}", size);
if (debug.on()) debug.log("incoming: onNext(%d)", size);
try {
subscriber.onNext(dsts);
} catch (Throwable t) {
connection.dropDataFrame(df); // must update connection window
throw t;
}
if (consumed(df)) {
Log.logTrace("responseSubscriber.onComplete");
if (debug.on()) debug.log("incoming: onComplete");
sched.stop();
connection.decrementStreamsCount(streamid);
subscriber.onComplete();
onCompleteCalled = true;
setEndStreamReceived();
return;
}
} else {
if (stopRequested) break;
return;
}
}
} catch (Throwable throwable) {
errorRef.compareAndSet(null, throwable);
} finally {
if (sched.isStopped()) drainInputQueue();
}
Throwable t = errorRef.get();
if (t != null) {
sched.stop();
try {
if (!onCompleteCalled) {
if (debug.on())
debug.log("calling subscriber.onError: %s", (Object) t);
subscriber.onError(t);
} else {
if (debug.on())
debug.log("already completed: dropping error %s", (Object) t);
}
} catch (Throwable x) {
Log.logError("Subscriber::onError threw exception: {0}", (Object) t);
} finally {
cancelImpl(t);
drainInputQueue();
}
}
}
// must only be called from the scheduler schedule() loop.
// ensure that all received data frames are accounted for
// in the connection window flow control if the scheduler
// is stopped before all the data is consumed.
private void drainInputQueue() {
Http2Frame frame;
while ((frame = inputQ.poll()) != null) {
if (frame instanceof DataFrame) {
connection.dropDataFrame((DataFrame)frame);
}
}
}
@Override
void nullBody(HttpResponse<T> resp, Throwable t) {
if (debug.on()) debug.log("nullBody: streamid=%d", streamid);
// We should have an END_STREAM data frame waiting in the inputQ.
// We need a subscriber to force the scheduler to process it.
pendingResponseSubscriber = HttpResponse.BodySubscribers.replacing(null);
sched.runOrSchedule();
}
// Callback invoked after the Response BodySubscriber has consumed the
// buffers contained in a DataFrame.
// Returns true if END_STREAM is reached, false otherwise.
private boolean consumed(DataFrame df) {
// RFC 7540 6.1:
// The entire DATA frame payload is included in flow control,
// including the Pad Length and Padding fields if present
int len = df.payloadLength();
boolean endStream = df.getFlag(DataFrame.END_STREAM);
if (len == 0) return endStream;
connection.windowUpdater.update(len);
if (!endStream) {
// Don't send window update on a stream which is
// closed or half closed.
windowUpdater.update(len);
}
// true: end of stream; false: more data coming
return endStream;
}
boolean deRegister() {
return DEREGISTERED.compareAndSet(this, false, true);
}
@Override
CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler,
boolean returnConnectionToPool,
Executor executor)
{
try {
Log.logTrace("Reading body on stream {0}", streamid);
debug.log("Getting BodySubscriber for: " + response);
BodySubscriber<T> bodySubscriber = handler.apply(new ResponseInfoImpl(response));
CompletableFuture<T> cf = receiveData(bodySubscriber, executor);
PushGroup<?> pg = exchange.getPushGroup();
if (pg != null) {
// if an error occurs make sure it is recorded in the PushGroup
cf = cf.whenComplete((t, e) -> pg.pushError(e));
}
return cf;
} catch (Throwable t) {
// may be thrown by handler.apply
cancelImpl(t);
return MinimalFuture.failedFuture(t);
}
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("streamid: ")
.append(streamid);
return sb.toString();
}
private void receiveDataFrame(DataFrame df) {
inputQ.add(df);
sched.runOrSchedule();
}
/** Handles a RESET frame. RESET is always handled inline in the queue. */
private void receiveResetFrame(ResetFrame frame) {
inputQ.add(frame);
sched.runOrSchedule();
}
/**
* Records the first reason which was invoked when sending a ResetFrame
* to the server in the streamState, and return the previous value
* of the streamState. This is an atomic operation.
* A possible use of this method would be to send a ResetFrame only
* if no previous reset frame has been sent.
* For instance: <pre>{@code
* if (markStream(ResetFrame.CANCEL) == 0) {
* connection.sendResetFrame(streamId, ResetFrame.CANCEL);
* }
* }</pre>
* @param code the reason code as per HTTP/2 protocol
* @return the previous value of the stream state.
*/
int markStream(int code) {
if (code == 0) return streamState;
synchronized (sendLock) {
return (int) STREAM_STATE.compareAndExchange(this, 0, code);
}
}
private void sendDataFrame(DataFrame frame) {
synchronized (sendLock) {
// must not send DataFrame after reset.
if (streamState == 0) {
connection.sendDataFrame(frame);
}
}
}
// pushes entire response body into response subscriber
// blocking when required by local or remote flow control
CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber, Executor executor) {
// We want to allow the subscriber's getBody() method to block so it
// can work with InputStreams. So, we offload execution.
responseBodyCF = ResponseSubscribers.getBodyAsync(executor, bodySubscriber,
new MinimalFuture<>(), this::cancelImpl);
if (isCanceled()) {
Throwable t = getCancelCause();
responseBodyCF.completeExceptionally(t);
} else {
pendingResponseSubscriber = bodySubscriber;
sched.runOrSchedule(); // in case data waiting already to be processed
}
return responseBodyCF;
}
@Override
CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
return sendBodyImpl().thenApply( v -> this);
}
@SuppressWarnings("unchecked")
Stream(Http2Connection connection,
Exchange<T> e,
WindowController windowController)
{
super(e);
this.connection = connection;
this.windowController = windowController;
this.request = e.request();
this.requestPublisher = request.requestPublisher; // may be null
this.responseHeadersBuilder = new HttpHeadersBuilder();
this.rspHeadersConsumer = new HeadersConsumer();
this.requestPseudoHeaders = createPseudoHeaders(request);
this.windowUpdater = new StreamWindowUpdateSender(connection);
}
private boolean checkRequestCancelled() {
if (exchange.multi.requestCancelled()) {
if (errorRef.get() == null) cancel();
else sendCancelStreamFrame();
return true;
}
return false;
}
/**
* Entry point from Http2Connection reader thread.
*
* Data frames will be removed by response body thread.
*/
void incoming(Http2Frame frame) throws IOException {
if (debug.on()) debug.log("incoming: %s", frame);
var cancelled = checkRequestCancelled() || closed;
if ((frame instanceof HeaderFrame)) {
HeaderFrame hframe = (HeaderFrame) frame;
if (hframe.endHeaders()) {
Log.logTrace("handling response (streamid={0})", streamid);
handleResponse();
}
if (hframe.getFlag(HeaderFrame.END_STREAM)) {
if (debug.on()) debug.log("handling END_STREAM: %d", streamid);
receiveDataFrame(new DataFrame(streamid, DataFrame.END_STREAM, List.of()));
}
} else if (frame instanceof DataFrame) {
if (cancelled) connection.dropDataFrame((DataFrame) frame);
else receiveDataFrame((DataFrame) frame);
} else {
if (!cancelled) otherFrame(frame);
}
}
void otherFrame(Http2Frame frame) throws IOException {
switch (frame.type()) {
case WindowUpdateFrame.TYPE -> incoming_windowUpdate((WindowUpdateFrame) frame);
case ResetFrame.TYPE -> incoming_reset((ResetFrame) frame);
case PriorityFrame.TYPE -> incoming_priority((PriorityFrame) frame);
default -> throw new IOException("Unexpected frame: " + frame.toString());
}
}
// The Hpack decoder decodes into one of these consumers of name,value pairs
DecodingCallback rspHeadersConsumer() {
return rspHeadersConsumer;
}
protected void handleResponse() throws IOException {
HttpHeaders responseHeaders = responseHeadersBuilder.build();
responseCode = (int)responseHeaders
.firstValueAsLong(":status")
.orElseThrow(() -> new IOException("no statuscode in response"));
response = new Response(
request, exchange, responseHeaders, connection(),
responseCode, HttpClient.Version.HTTP_2);
/* TODO: review if needs to be removed
the value is not used, but in case `content-length` doesn't parse as
long, there will be NumberFormatException. If left as is, make sure
code up the stack handles NFE correctly. */
responseHeaders.firstValueAsLong("content-length");
if (Log.headers()) {
StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n");
Log.dumpHeaders(sb, " ", responseHeaders);
Log.logHeaders(sb.toString());
}
// this will clear the response headers
rspHeadersConsumer.reset();
completeResponse(response);
}
void incoming_reset(ResetFrame frame) {
Log.logTrace("Received RST_STREAM on stream {0}", streamid);
if (endStreamReceived()) {
Log.logTrace("Ignoring RST_STREAM frame received on remotely closed stream {0}", streamid);
} else if (closed) {
Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
} else {
Flow.Subscriber<?> subscriber =
responseSubscriber == null ? pendingResponseSubscriber : responseSubscriber;
if (response == null && subscriber == null) {
// we haven't receive the headers yet, and won't receive any!
// handle reset now.
handleReset(frame, subscriber);
} else {
// put it in the input queue in order to read all
// pending data frames first. Indeed, a server may send
// RST_STREAM after sending END_STREAM, in which case we should
// ignore it. However, we won't know if we have received END_STREAM
// or not until all pending data frames are read.
receiveResetFrame(frame);
// RST_STREAM was pushed to the queue. It will be handled by
// asyncReceive after all pending data frames have been
// processed.
Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid);
}
}
}
void handleReset(ResetFrame frame, Flow.Subscriber<?> subscriber) {
Log.logTrace("Handling RST_STREAM on stream {0}", streamid);
if (!closed) {
synchronized (this) {
if (closed) {
if (debug.on()) debug.log("Stream already closed: ignoring RESET");
return;
}
closed = true;
}
try {
int error = frame.getErrorCode();
IOException e = new IOException("Received RST_STREAM: "
+ ErrorFrame.stringForCode(error));
if (errorRef.compareAndSet(null, e)) {
if (subscriber != null) {
subscriber.onError(e);
}
}
completeResponseExceptionally(e);
if (!requestBodyCF.isDone()) {
requestBodyCF.completeExceptionally(errorRef.get()); // we may be sending the body..
}
if (responseBodyCF != null) {
responseBodyCF.completeExceptionally(errorRef.get());
}
} finally {
connection.decrementStreamsCount(streamid);
connection.closeStream(streamid);
}
} else {
Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
}
}
void incoming_priority(PriorityFrame frame) {
// TODO: implement priority
throw new UnsupportedOperationException("Not implemented");
}
private void incoming_windowUpdate(WindowUpdateFrame frame)
throws IOException
{
int amount = frame.getUpdate();
if (amount <= 0) {
Log.logTrace("Resetting stream: {0}, Window Update amount: {1}",
streamid, amount);
connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
} else {
assert streamid != 0;
boolean success = windowController.increaseStreamWindow(amount, streamid);
if (!success) { // overflow
connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
}
}
}
void incoming_pushPromise(HttpRequestImpl pushRequest,
PushedStream<T> pushStream)
throws IOException
{
if (Log.requests()) {
Log.logRequest("PUSH_PROMISE: " + pushRequest.toString());
}
PushGroup<T> pushGroup = exchange.getPushGroup();
if (pushGroup == null || exchange.multi.requestCancelled()) {
Log.logTrace("Rejecting push promise stream " + streamid);
connection.resetStream(pushStream.streamid, ResetFrame.REFUSED_STREAM);
pushStream.close();
return;
}
PushGroup.Acceptor<T> acceptor = null;
boolean accepted = false;
try {
acceptor = pushGroup.acceptPushRequest(pushRequest);
accepted = acceptor.accepted();
} catch (Throwable t) {
if (debug.on())
debug.log("PushPromiseHandler::applyPushPromise threw exception %s",
(Object)t);
}
if (!accepted) {
// cancel / reject
IOException ex = new IOException("Stream " + streamid + " cancelled by users handler");
if (Log.trace()) {
Log.logTrace("No body subscriber for {0}: {1}", pushRequest,
ex.getMessage());
}
pushStream.cancelImpl(ex);
return;
}
assert accepted && acceptor != null;
CompletableFuture<HttpResponse<T>> pushResponseCF = acceptor.cf();
HttpResponse.BodyHandler<T> pushHandler = acceptor.bodyHandler();
assert pushHandler != null;
pushStream.requestSent();
pushStream.setPushHandler(pushHandler); // TODO: could wrap the handler to throw on acceptPushPromise ?
// setup housekeeping for when the push is received
// TODO: deal with ignoring of CF anti-pattern
CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF();
cf.whenComplete((HttpResponse<T> resp, Throwable t) -> {
t = Utils.getCompletionCause(t);
if (Log.trace()) {
Log.logTrace("Push completed on stream {0} for {1}{2}",
pushStream.streamid, resp,
((t==null) ? "": " with exception " + t));
}
if (t != null) {
pushGroup.pushError(t);
pushResponseCF.completeExceptionally(t);
} else {
pushResponseCF.complete(resp);
}
pushGroup.pushCompleted();
});
}
private OutgoingHeaders<Stream<T>> headerFrame(long contentLength) {
HttpHeadersBuilder h = request.getSystemHeadersBuilder();
if (contentLength > 0) {
h.setHeader("content-length", Long.toString(contentLength));
}
HttpHeaders sysh = filterHeaders(h.build());
HttpHeaders userh = filterHeaders(request.getUserHeaders());
// Filter context restricted from userHeaders
userh = HttpHeaders.of(userh.map(), Utils.CONTEXT_RESTRICTED(client()));
final HttpHeaders uh = userh;
// Filter any headers from systemHeaders that are set in userHeaders
sysh = HttpHeaders.of(sysh.map(), (k,v) -> uh.firstValue(k).isEmpty());
OutgoingHeaders<Stream<T>> f = new OutgoingHeaders<>(sysh, userh, this);
if (contentLength == 0) {
f.setFlag(HeadersFrame.END_STREAM);
endStreamSent = true;
}
return f;
}
private boolean hasProxyAuthorization(HttpHeaders headers) {
return headers.firstValue("proxy-authorization")
.isPresent();
}
// Determines whether we need to build a new HttpHeader object.
//
// Ideally we should pass the filter to OutgoingHeaders refactor the
// code that creates the HeaderFrame to honor the filter.
// We're not there yet - so depending on the filter we need to
// apply and the content of the header we will try to determine
// whether anything might need to be filtered.
// If nothing needs filtering then we can just use the
// original headers.
private boolean needsFiltering(HttpHeaders headers,
BiPredicate<String, String> filter) {
if (filter == Utils.PROXY_TUNNEL_FILTER || filter == Utils.PROXY_FILTER) {
// we're either connecting or proxying
// slight optimization: we only need to filter out
// disabled schemes, so if there are none just
// pass through.
return Utils.proxyHasDisabledSchemes(filter == Utils.PROXY_TUNNEL_FILTER)
&& hasProxyAuthorization(headers);
} else {
// we're talking to a server, either directly or through
// a tunnel.
// Slight optimization: we only need to filter out
// proxy authorization headers, so if there are none just
// pass through.
return hasProxyAuthorization(headers);
}
}
private HttpHeaders filterHeaders(HttpHeaders headers) {
HttpConnection conn = connection();
BiPredicate<String, String> filter = conn.headerFilter(request);
if (needsFiltering(headers, filter)) {
return HttpHeaders.of(headers.map(), filter);
}
return headers;
}
private static HttpHeaders createPseudoHeaders(HttpRequest request) {
HttpHeadersBuilder hdrs = new HttpHeadersBuilder();
String method = request.method();
hdrs.setHeader(":method", method);
URI uri = request.uri();
hdrs.setHeader(":scheme", uri.getScheme());
// TODO: userinfo deprecated. Needs to be removed
hdrs.setHeader(":authority", uri.getAuthority());
// TODO: ensure header names beginning with : not in user headers
String query = uri.getRawQuery();
String path = uri.getRawPath();
if (path == null || path.isEmpty()) {
if (method.equalsIgnoreCase("OPTIONS")) {
path = "*";
} else {
path = "/";
}
}
if (query != null) {
path += "?" + query;
}
hdrs.setHeader(":path", Utils.encode(path));
return hdrs.build();
}
HttpHeaders getRequestPseudoHeaders() {
return requestPseudoHeaders;
}
/** Sets endStreamReceived. Should be called only once. */
void setEndStreamReceived() {
if (debug.on()) debug.log("setEndStreamReceived: streamid=%d", streamid);
assert remotelyClosed == false: "Unexpected endStream already set";
remotelyClosed = true;
responseReceived();
}
/** Tells whether, or not, the END_STREAM Flag has been seen in any frame
* received on this stream. */
private boolean endStreamReceived() {
return remotelyClosed;
}
@Override
CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
if (debug.on()) debug.log("sendHeadersOnly()");
if (Log.requests() && request != null) {
Log.logRequest(request.toString());
}
if (requestPublisher != null) {
requestContentLen = requestPublisher.contentLength();
} else {
requestContentLen = 0;
}
// At this point the stream doesn't have a streamid yet.
// It will be allocated if we send the request headers.
Throwable t = errorRef.get();
if (t != null) {
if (debug.on()) debug.log("stream already cancelled, headers not sent: %s", (Object)t);
return MinimalFuture.failedFuture(t);
}
// sending the headers will cause the allocation of the stream id
OutgoingHeaders<Stream<T>> f = headerFrame(requestContentLen);
connection.sendFrame(f);
CompletableFuture<ExchangeImpl<T>> cf = new MinimalFuture<>();
cf.complete(this); // #### good enough for now
return cf;
}
@Override
void released() {
if (streamid > 0) {
if (debug.on()) debug.log("Released stream %d", streamid);
// remove this stream from the Http2Connection map.
connection.decrementStreamsCount(streamid);
connection.closeStream(streamid);
} else {
if (debug.on()) debug.log("Can't release stream %d", streamid);
}
}
@Override
void completed() {
// There should be nothing to do here: the stream should have
// been already closed (or will be closed shortly after).
}
boolean registerStream(int id, boolean registerIfCancelled) {
boolean cancelled = closed || exchange.multi.requestCancelled();
if (!cancelled || registerIfCancelled) {
this.streamid = id;
connection.putStream(this, streamid);
if (debug.on()) {
debug.log("Stream %d registered (cancelled: %b, registerIfCancelled: %b)",
streamid, cancelled, registerIfCancelled);
}
}
return !cancelled;
}
void signalWindowUpdate() {
RequestSubscriber subscriber = requestSubscriber;
assert subscriber != null;
if (debug.on()) debug.log("Signalling window update");
subscriber.sendScheduler.runOrSchedule();
}
static final ByteBuffer COMPLETED = ByteBuffer.allocate(0);
class RequestSubscriber implements Flow.Subscriber<ByteBuffer> {
// can be < 0 if the actual length is not known.
private final long contentLength;
private volatile long remainingContentLength;
private volatile Subscription subscription;
// Holds the outgoing data. There will be at most 2 outgoing ByteBuffers.
// 1) The data that was published by the request body Publisher, and
// 2) the COMPLETED sentinel, since onComplete can be invoked without demand.
final ConcurrentLinkedDeque<ByteBuffer> outgoing = new ConcurrentLinkedDeque<>();
private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
// A scheduler used to honor window updates. Writing must be paused
// when the window is exhausted, and resumed when the window acquires
// some space. The sendScheduler makes it possible to implement this
// behaviour in an asynchronous non-blocking way.
// See RequestSubscriber::trySend below.
final SequentialScheduler sendScheduler;
RequestSubscriber(long contentLen) {
this.contentLength = contentLen;
this.remainingContentLength = contentLen;
this.sendScheduler =
SequentialScheduler.synchronizedScheduler(this::trySend);
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
if (this.subscription != null) {
throw new IllegalStateException("already subscribed");
}
this.subscription = subscription;
if (debug.on())
debug.log("RequestSubscriber: onSubscribe, request 1");
subscription.request(1);
}
@Override
public void onNext(ByteBuffer item) {
if (debug.on())
debug.log("RequestSubscriber: onNext(%d)", item.remaining());
int size = outgoing.size();
assert size == 0 : "non-zero size: " + size;
onNextImpl(item);
}
private void onNextImpl(ByteBuffer item) {
// Got some more request body bytes to send.
if (requestBodyCF.isDone()) {
// stream already cancelled, probably in timeout
sendScheduler.stop();
subscription.cancel();
return;
}
outgoing.add(item);
sendScheduler.runOrSchedule();
}
@Override
public void onError(Throwable throwable) {
if (debug.on())
debug.log(() -> "RequestSubscriber: onError: " + throwable);
// ensure that errors are handled within the flow.
if (errorRef.compareAndSet(null, throwable)) {
sendScheduler.runOrSchedule();
}
}
@Override
public void onComplete() {
if (debug.on()) debug.log("RequestSubscriber: onComplete");
int size = outgoing.size();
assert size == 0 || size == 1 : "non-zero or one size: " + size;
// last byte of request body has been obtained.
// ensure that everything is completed within the flow.
onNextImpl(COMPLETED);
}
// Attempts to send the data, if any.
// Handles errors and completion state.
// Pause writing if the send window is exhausted, resume it if the
// send window has some bytes that can be acquired.
void trySend() {
try {
// handle errors raised by onError;
Throwable t = errorRef.get();
if (t != null) {
sendScheduler.stop();
if (requestBodyCF.isDone()) return;
subscription.cancel();
requestBodyCF.completeExceptionally(t);
cancelImpl(t);
return;
}
int state = streamState;
do {
// handle COMPLETED;
ByteBuffer item = outgoing.peekFirst();
if (item == null) return;
else if (item == COMPLETED) {
sendScheduler.stop();
complete();
return;
}
// handle bytes to send downstream
while (item.hasRemaining() && state == 0) {
if (debug.on()) debug.log("trySend: %d", item.remaining());
DataFrame df = getDataFrame(item);
if (df == null) {
if (debug.on())
debug.log("trySend: can't send yet: %d", item.remaining());
return; // the send window is exhausted: come back later
}
if (contentLength > 0) {
remainingContentLength -= df.getDataLength();
if (remainingContentLength < 0) {
String msg = connection().getConnectionFlow()
+ " stream=" + streamid + " "
+ "[" + Thread.currentThread().getName() + "] "
+ "Too many bytes in request body. Expected: "
+ contentLength + ", got: "
+ (contentLength - remainingContentLength);
assert streamid > 0;
connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
throw new IOException(msg);
} else if (remainingContentLength == 0) {
assert !endStreamSent : "internal error, send data after END_STREAM flag";
df.setFlag(DataFrame.END_STREAM);
endStreamSent = true;
}
} else {
assert !endStreamSent : "internal error, send data after END_STREAM flag";
}
if ((state = streamState) != 0) {
if (debug.on()) debug.log("trySend: cancelled: %s", String.valueOf(t));
break;
}
if (debug.on())
debug.log("trySend: sending: %d", df.getDataLength());
sendDataFrame(df);
}
if (state != 0) break;
assert !item.hasRemaining();
ByteBuffer b = outgoing.removeFirst();
assert b == item;
} while (outgoing.peekFirst() != null);
if (state != 0) {
t = errorRef.get();
if (t == null) t = new IOException(ResetFrame.stringForCode(streamState));
throw t;
}
if (debug.on()) debug.log("trySend: request 1");
subscription.request(1);
} catch (Throwable ex) {
if (debug.on()) debug.log("trySend: ", ex);
sendScheduler.stop();
subscription.cancel();
requestBodyCF.completeExceptionally(ex);
// need to cancel the stream to 1. tell the server
// we don't want to receive any more data and
// 2. ensure that the operation ref count will be
// decremented on the HttpClient.
cancelImpl(ex);
}
}
private void complete() throws IOException {
long remaining = remainingContentLength;
long written = contentLength - remaining;
if (remaining > 0) {
connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
// let trySend() handle the exception
throw new IOException(connection().getConnectionFlow()
+ " stream=" + streamid + " "
+ "[" + Thread.currentThread().getName() +"] "
+ "Too few bytes returned by the publisher ("