Skip to content

Commit

Permalink
Send priority frame if priority changed after HEADERS frame sent. Add…
Browse files Browse the repository at this point in the history
… handler for priority change. Allo exlclusive stream dependency. Add client and server test against netty.

Signed-off-by: Michal Michalowski <michal.michalowski@openet.com>
  • Loading branch information
mmopenet committed Nov 9, 2018
1 parent 8d4d078 commit 70ba3c6
Show file tree
Hide file tree
Showing 23 changed files with 879 additions and 162 deletions.
17 changes: 17 additions & 0 deletions src/main/asciidoc/dataobjects.adoc
Expand Up @@ -1826,6 +1826,23 @@ Set the request relative URI
+++
|===

[[StreamPriority]]
== StreamPriority

++++
This class represents HTTP/2 stream priority defined in RFC 7540 clause 5.3
++++
'''

[cols=">25%,25%,50%"]
[frame="topbot"]
|===
^|Name | Type ^| Description
|[[exclusive]]`@exclusive`|`Boolean`|-
|[[streamDependency]]`@streamDependency`|`Number (int)`|-
|[[weight]]`@weight`|`Number (short)`|-
|===

[[TCPSSLOptions]]
== TCPSSLOptions

Expand Down
18 changes: 12 additions & 6 deletions src/main/java/io/vertx/core/http/HttpClientRequest.java
Expand Up @@ -389,13 +389,19 @@ default HttpClientRequest writeCustomFrame(HttpFrame frame) {
}

/**
* Sets the priority weight of the associated stream
* @param weight The weight priority weight or this requet's stream
* Sets the priority of the associated stream
* @param streamPriority The priority of this requet's stream
*/
void setWeight(short weight);
@Fluent
default HttpClientRequest setStreamPriority(StreamPriority streamPriority) {
return this;
}

/**
* Sets the dependecy of associated stream
* @param The identifier of the HTTP/2 stream this request's stream depends on
* Gets the priority of the associated stream
* @return The priority of this requet's stream
*/
void setStreamDependency(int streamDependency);
default StreamPriority getStreamPriority() {
return StreamPriority.DEFAULT;
}
}
12 changes: 7 additions & 5 deletions src/main/java/io/vertx/core/http/HttpClientResponse.java
Expand Up @@ -155,13 +155,15 @@ public interface HttpClientResponse extends ReadStream<Buffer> {
HttpClientRequest request();

/**
* @return The identifier of the HTTP/2 stream this request's stream depends on
* @return The priority of associated HTTP/2 stream
*/
int getStreamDependency();
StreamPriority getStreamPriority();

/**
* @return The weight priority weight or this requet's stream
* Registers handler for stream priority changes.
*
* @param handler Handler to be called when stream priority changes.
*/
short getWeight();

@Fluent
HttpClientResponse streamPriorityHandler(Handler<StreamPriority> handler);
}
18 changes: 9 additions & 9 deletions src/main/java/io/vertx/core/http/HttpServerRequest.java
Expand Up @@ -330,17 +330,17 @@ default HttpServerRequest bodyHandler(@Nullable Handler<Buffer> bodyHandler) {
HttpConnection connection();

/**
* @return The identifier of the HTTP/2 stream this request's stream depends on
* @return The priority of associated stream
*/
default int getStreamDependency() {
return 0;
default StreamPriority getStreamPriority() {
return StreamPriority.DEFAULT;
}

/**
* @return The weight priority weight or this requet's stream
* Registers handler for stream priority changes.
*
* @param handler handler to be called when stream priority changes,
*/
default short getWeight() {
return Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
}

@Fluent
HttpServerRequest streamPriorityHandler(Handler<StreamPriority> handler);
}
14 changes: 6 additions & 8 deletions src/main/java/io/vertx/core/http/HttpServerResponse.java
Expand Up @@ -474,13 +474,11 @@ default HttpServerResponse writeCustomFrame(HttpFrame frame) {
}

/**
* Sets the priority weight of the associated stream
* @param weight The weight priority weight or this requet's stream
* Sets the priority of the associated stream
* @param streamPriority The priority for this requet's stream
*/
void setWeight(short weight);
/**
* Sets the dependecy of associated stream
* @param The identifier of the HTTP/2 stream this request's stream depends on
*/
void setStreamDependency(int streamDependency);
@Fluent
default HttpServerResponse setStreamPriority(StreamPriority streamPriority) {
return this;
}
}
122 changes: 122 additions & 0 deletions src/main/java/io/vertx/core/http/StreamPriority.java
@@ -0,0 +1,122 @@
/*
* Copyright (c) 2011-2018 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/

package io.vertx.core.http;

import io.netty.handler.codec.http2.Http2CodecUtil;
import io.vertx.codegen.annotations.DataObject;
import io.vertx.core.json.JsonObject;

/**
* This class represents HTTP/2 stream priority defined in RFC 7540 clause 5.3
*/
@DataObject
public class StreamPriority {

public static final StreamPriority DEFAULT = new StreamPriority();
/**
*
*/
private short weight;
private int streamDependency;
private boolean exclusive;

private StreamPriority() {
this(0, Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT, false);
}

public StreamPriority(JsonObject json) {
this.weight = json.getInteger("weight", (int)Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT).shortValue();
this.streamDependency = json.getInteger("streamDependency", 0);
this.exclusive = json.getBoolean("exclusive", false);
}



public StreamPriority(int streamDependency, short weight, boolean exclusive) {
super();
this.weight = weight;
this.streamDependency = streamDependency;
this.exclusive = exclusive;
}


public StreamPriority(int streamDependency, short weight) {
this(streamDependency, weight, false);
}

public StreamPriority(short weight) {
this(0, weight, false);
}


public StreamPriority(StreamPriority other) {
this.weight = other.weight;
this.streamDependency = other.streamDependency;
this.exclusive = other.exclusive;
}

public short getWeight() {
return weight;
}

public int getStreamDependency() {
return streamDependency;
}

public boolean isExclusive() {
return exclusive;
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (exclusive ? 1231 : 1237);
result = prime * result + streamDependency;
result = prime * result + weight;
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
StreamPriority other = (StreamPriority) obj;
if (exclusive != other.exclusive)
return false;
if (streamDependency != other.streamDependency)
return false;
if (weight != other.weight)
return false;
return true;
}

public JsonObject toJson() {
JsonObject json = new JsonObject();
json.put("weight", weight);
json.put("streamDependency", streamDependency);
json.put("exclusive", exclusive);
return json;
}

@Override
public String toString() {
return "StreamPriority [weight=" + weight + ", streamDependency=" + streamDependency + ", exclusive=" + exclusive + "]";
}



}
43 changes: 13 additions & 30 deletions src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java
Expand Up @@ -134,10 +134,9 @@ private void recycle() {
}

@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endOfStream) throws Http2Exception {
public synchronized void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endOfStream) throws Http2Exception {
Http2ClientStream stream = (Http2ClientStream) streams.get(streamId);
stream.setStreamDependency(streamDependency);
stream.setWeight(weight);
stream.setStreamPriority(new StreamPriority(streamDependency, weight, exclusive));
onHeadersRead(ctx, streamId, headers, padding, endOfStream);
}

Expand Down Expand Up @@ -184,8 +183,6 @@ static class Http2ClientStream extends VertxHttp2Stream<Http2ClientConnection> i
private HttpClientResponseImpl response;
private boolean requestEnded;
private boolean responseEnded;
private int streamDependency = 0;
private short weight = Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;

Http2ClientStream(Http2ClientConnection conn, Http2Stream stream, boolean writable) {
super(conn, stream, writable);
Expand All @@ -206,24 +203,6 @@ public int id() {
return super.id();
}

@Override
public int getStreamDependency() {
return streamDependency;
}

public void setStreamDependency(int streamDependency) {
this.streamDependency = streamDependency;
}

@Override
public short getWeight() {
return weight;
}

public void setWeight(short weight) {
this.weight = weight;
}

@Override
void handleEnd(MultiMap trailers) {
if (conn.metrics != null) {
Expand Down Expand Up @@ -292,6 +271,15 @@ void handleCustomFrame(int type, int flags, Buffer buff) {
response.handleUnknownFrame(new HttpFrameImpl(type, flags, buff));
}


@Override
void handlePriorityChange(StreamPriority streamPriority) {
if(streamPriority != null && !streamPriority.equals(getStreamPriority())) {
setStreamPriority(streamPriority);
response.handlePriorityChange(streamPriority);
}
}

void handleHeaders(Http2Headers headers, boolean end) {
if (response == null || response.statusCode() == 100) {
int status;
Expand Down Expand Up @@ -344,7 +332,7 @@ Handler<HttpClientRequest> pushHandler() {
}

@Override
public void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf content, boolean end, int streamDependency, short weight) {
public void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf content, boolean end) {
Http2Headers h = new DefaultHttp2Headers();
h.method(method != HttpMethod.OTHER ? method.name() : rawMethod);
if (method == HttpMethod.CONNECT) {
Expand All @@ -370,19 +358,14 @@ public void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap
if (conn.metrics != null) {
request.metric(conn.metrics.requestBegin(conn.queueMetric, conn.metric(), conn.localAddress(), conn.remoteAddress(), request));
}
writeHeaders(h, end && content == null, streamDependency, weight);
writeHeaders(h, end && content == null);
if (content != null) {
writeBuffer(content, end);
} else {
handlerContext.flush();
}
}

@Override
public void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf content, boolean end) {
writeHead(method, rawMethod, uri, headers, hostHeader, chunked, content, end, 0, Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT);
}

@Override
public void writeBuffer(ByteBuf buf, boolean end) {
if (buf == null && end) {
Expand Down
14 changes: 12 additions & 2 deletions src/main/java/io/vertx/core/http/impl/Http2ConnectionBase.java
Expand Up @@ -31,6 +31,7 @@
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.GoAway;
import io.vertx.core.http.HttpConnection;
import io.vertx.core.http.StreamPriority;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.net.NetSocket;
Expand Down Expand Up @@ -193,8 +194,17 @@ void onGoAwayReceived(int lastStreamId, long errorCode, ByteBuf debugData) {
// Http2FrameListener

@Override
public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency,
short weight, boolean exclusive) {
public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight, boolean exclusive) {
VertxHttp2Stream stream;
synchronized (this) {
stream = streams.get(streamId);
}
System.out.println(getClass().getName() + ".onPriorityRead(" + new StreamPriority(streamDependency, weight, exclusive) + ") : " + stream);

if (stream != null) {
StreamPriority streamPriority = new StreamPriority(streamDependency, weight, exclusive);
context.executeFromIO(v -> stream.handlePriorityChange(streamPriority));
}
}

@Override
Expand Down
Expand Up @@ -116,8 +116,8 @@ public synchronized void onHeadersRead(ChannelHandlerContext ctx, int streamId,
return;
}
Http2ServerRequestImpl req = createRequest(streamId, headers);
req.setStreamDependency(streamDependency);
req.setWeight(weight);
req.setStreamPriority(new StreamPriority(streamDependency, weight, exclusive));

stream = req;
CharSequence value = headers.get(HttpHeaderNames.EXPECT);
if (options.isHandle100ContinueAutomatically() &&
Expand Down Expand Up @@ -159,7 +159,7 @@ public synchronized void onSettingsRead(ChannelHandlerContext ctx, Http2Settings
super.onSettingsRead(ctx, settings);
}

synchronized void sendPush(int streamId, String host, HttpMethod method, MultiMap headers, String path, Handler<AsyncResult<HttpServerResponse>> completionHandler) {
synchronized void sendPush(int streamId, String host, HttpMethod method, MultiMap headers, String path, StreamPriority streamPriority, Handler<AsyncResult<HttpServerResponse>> completionHandler) {
Http2Headers headers_ = new DefaultHttp2Headers();
if (method == HttpMethod.OTHER) {
throw new IllegalArgumentException("Cannot push HttpMethod.OTHER");
Expand All @@ -184,6 +184,7 @@ public void handle(AsyncResult<Integer> ar) {
Http2Stream promisedStream = handler.connection().stream(promisedStreamId);
boolean writable = handler.encoder().flowController().isWritable(promisedStream);
Push push = new Push(promisedStream, contentEncoding, method, path, writable, completionHandler);
push.setStreamPriority(streamPriority);
streams.put(promisedStreamId, push);
if (maxConcurrentStreams == null || concurrentStreams < maxConcurrentStreams) {
concurrentStreams++;
Expand Down

0 comments on commit 70ba3c6

Please sign in to comment.