Skip to content

Commit

Permalink
Client push promise
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Mar 13, 2016
1 parent 1ec74c2 commit d6673cb
Show file tree
Hide file tree
Showing 13 changed files with 319 additions and 32 deletions.
24 changes: 24 additions & 0 deletions src/main/java/io/vertx/core/http/HttpClientRequest.java
Expand Up @@ -251,4 +251,28 @@ public interface HttpClientRequest extends WriteStream<Buffer>, ReadStream<HttpC
@Fluent @Fluent
HttpClientRequest setTimeout(long timeoutMs); HttpClientRequest setTimeout(long timeoutMs);


/**
* Set a push promise handler for this request.<p/>
*
* The handler is called when the client receives a <i>push promise</i> from the server. The handler can be called
* multiple times, for each push promise.<p/>
*
* The handler is called with a <i>read-only</i> {@link HttpClientRequest}, the following methods can be called:<p/>
*
* <ul>
* <li>{@link HttpClientRequest#method()}</li>
* <li>{@link HttpClientRequest#uri()}</li>
* <li>{@link HttpClientRequest#headers()}</li>
* <li>{@link HttpClientRequest#getHost()}</li>
* </ul>
*
* In addition the handler should call the {@link HttpClientRequest#handler} method to set an handler for
* processing the response.<p/>
*
* @param handler the handler
* @return a reference to this, so the API can be used fluently
*/
@Fluent
HttpClientRequest pushPromiseHandler(Handler<HttpClientRequest> handler);

} }
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/core/http/HttpServerResponse.java
Expand Up @@ -398,7 +398,7 @@ default HttpServerResponse sendFile(String filename, long offset, Handler<AsyncR
* @return a reference to this, so the API can be used fluently * @return a reference to this, so the API can be used fluently
*/ */
@Fluent @Fluent
HttpServerResponse promisePush(HttpMethod method, String path, Handler<AsyncResult<HttpServerResponse>> handler); HttpServerResponse pushPromise(HttpMethod method, String path, Handler<AsyncResult<HttpServerResponse>> handler);


/** /**
* Reset this stream. * Reset this stream.
Expand Down
Expand Up @@ -175,7 +175,7 @@ public class ConnQueue {
this.address = address; this.address = address;


if (options.getProtocolVersion() == HttpVersion.HTTP_2) { if (options.getProtocolVersion() == HttpVersion.HTTP_2) {
pool = new Http2Pool(this); pool = new Http2Pool(this, client);
} else { } else {
pool = new Http1xPool(this); pool = new Http1xPool(this);
} }
Expand Down
22 changes: 19 additions & 3 deletions src/main/java/io/vertx/core/http/impl/Http2Pool.java
Expand Up @@ -47,6 +47,7 @@
import io.vertx.core.impl.ContextImpl; import io.vertx.core.impl.ContextImpl;
import io.vertx.core.net.NetSocket; import io.vertx.core.net.NetSocket;


import java.util.ArrayDeque;
import java.util.Map; import java.util.Map;


/** /**
Expand All @@ -55,9 +56,11 @@
class Http2Pool extends ConnectionManager.Pool { class Http2Pool extends ConnectionManager.Pool {


private VertxClientHandler clientHandler; private VertxClientHandler clientHandler;
private HttpClientImpl client;


public Http2Pool(ConnectionManager.ConnQueue queue) { public Http2Pool(ConnectionManager.ConnQueue queue, HttpClientImpl client) {
super(queue, 1); super(queue, 1);
this.client = client;
} }


public boolean getConnection(HttpClientRequestImpl req, Handler<HttpClientStream> handler, ContextImpl context) { public boolean getConnection(HttpClientRequestImpl req, Handler<HttpClientStream> handler, ContextImpl context) {
Expand Down Expand Up @@ -112,7 +115,7 @@ class Http2ClientStream implements HttpClientStream {
private boolean paused; private boolean paused;
private int numBytes; private int numBytes;


public Http2ClientStream(HttpClientRequestImpl req, public Http2ClientStream(HttpClientRequestBase req,
ContextImpl context, ContextImpl context,
ChannelHandlerContext handlerCtx, ChannelHandlerContext handlerCtx,
Http2Connection conn, Http2Connection conn,
Expand Down Expand Up @@ -177,6 +180,10 @@ private void handleEnd() {
resp.handleEnd(new CaseInsensitiveHeaders()); resp.handleEnd(new CaseInsensitiveHeaders());
} }


void handlePushPromise(HttpClientRequestBase promised) throws Http2Exception {
((HttpClientRequestImpl)req).handlePush(promised);
}

@Override @Override
public void writeHead(HttpMethod method, String uri, MultiMap headers, String hostHeader, boolean chunked) { public void writeHead(HttpMethod method, String uri, MultiMap headers, String hostHeader, boolean chunked) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
Expand Down Expand Up @@ -298,7 +305,7 @@ void handle(Handler<HttpClientStream> handler, HttpClientRequestImpl req) {
handler.handle(stream); handler.handle(stream);
} }


Http2ClientStream createStream(HttpClientRequestImpl req) { Http2ClientStream createStream(HttpClientRequestBase req) {
try { try {
Http2ClientStream stream = new Http2ClientStream(req, context, handlerCtx, connection(), encoder()); Http2ClientStream stream = new Http2ClientStream(req, context, handlerCtx, connection(), encoder());
streams.put(stream.stream.id(), stream); streams.put(stream.stream.id(), stream);
Expand Down Expand Up @@ -353,6 +360,15 @@ public void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2E


@Override @Override
public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId, Http2Headers headers, int padding) throws Http2Exception { public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId, Http2Headers headers, int padding) throws Http2Exception {
Http2ClientStream stream = streams.get(streamId);
HttpMethod method = UriUtils.toVertxMethod(headers.method().toString());
String uri = headers.path().toString();
String host = headers.authority() != null ? headers.authority().toString() : null;
MultiMap headersMap = new Http2HeadersAdaptor(headers);
HttpClientRequestPushPromise req = new HttpClientRequestPushPromise(client, method, uri, host, headersMap);
Http2ClientStream newStream = new Http2ClientStream(req, context, handlerCtx, stream.conn, stream.encoder);
streams.put(promisedStreamId, newStream);
stream.handlePushPromise(req);
} }


@Override @Override
Expand Down
Expand Up @@ -285,11 +285,7 @@ public HttpVersion version() {
public HttpMethod method() { public HttpMethod method() {
if (method == null) { if (method == null) {
String sMethod = headers.method().toString(); String sMethod = headers.method().toString();
try { method = UriUtils.toVertxMethod(sMethod);
method = io.vertx.core.http.HttpMethod.valueOf(sMethod);
} catch (IllegalArgumentException e) {
method = HttpMethod.UNKNOWN;
}
} }
return method; return method;
} }
Expand Down
Expand Up @@ -479,7 +479,7 @@ public void reset(long code) {
ctx.flush(); ctx.flush();
} }


public HttpServerResponse promisePush(HttpMethod method, String path, Handler<AsyncResult<HttpServerResponse>> handler) { public HttpServerResponse pushPromise(HttpMethod method, String path, Handler<AsyncResult<HttpServerResponse>> handler) {
if (push) { if (push) {
throw new IllegalStateException("A push response cannot promise another push"); throw new IllegalStateException("A push response cannot promise another push");
} }
Expand Down
17 changes: 17 additions & 0 deletions src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java
Expand Up @@ -59,6 +59,7 @@ public class HttpClientRequestImpl extends HttpClientRequestBase implements Http
private Handler<Void> continueHandler; private Handler<Void> continueHandler;
private volatile HttpClientStream conn; private volatile HttpClientStream conn;
private Handler<Void> drainHandler; private Handler<Void> drainHandler;
private Handler<HttpClientRequest> pushHandler;
private boolean headWritten; private boolean headWritten;
private boolean completed; private boolean completed;
private ByteBuf pendingChunks; private ByteBuf pendingChunks;
Expand Down Expand Up @@ -333,6 +334,14 @@ public HttpClientRequest putHeader(CharSequence name, Iterable<CharSequence> val
} }
} }


@Override
public HttpClientRequest pushPromiseHandler(Handler<HttpClientRequest> handler) {
synchronized (getLock()) {
pushHandler = handler;
}
return this;
}

void handleDrained() { void handleDrained() {
synchronized (getLock()) { synchronized (getLock()) {
if (!completed && drainHandler != null) { if (!completed && drainHandler != null) {
Expand Down Expand Up @@ -668,4 +677,12 @@ private void checkResponseHandler() {
throw new IllegalStateException("You must set an handler for the HttpClientResponse before connecting"); throw new IllegalStateException("You must set an handler for the HttpClientResponse before connecting");
} }
} }

void handlePush(HttpClientRequest pushedRequest) {
synchronized (getLock()) {
if (pushHandler != null) {
pushHandler.handle(pushedRequest);
}
}
}
} }
@@ -0,0 +1,206 @@
/*
* Copyright (c) 2011-2013 The original author or authors
* ------------------------------------------------------
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*/

package io.vertx.core.http.impl;

import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpMethod;

/**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
class HttpClientRequestPushPromise extends HttpClientRequestBase {

private final HttpMethod method;
private final String uri;
private final String host;
private final MultiMap headers;
private Handler<HttpClientResponse> respHandler;

public HttpClientRequestPushPromise(HttpClientImpl client, HttpMethod method, String uri, String host, MultiMap headers) {
super(client);
this.method = method;
this.uri = uri;
this.host = host;
this.headers = headers;
}

@Override
protected Object getLock() {
return this; //
}

@Override
protected void doHandleResponse(HttpClientResponseImpl resp) {
synchronized (getLock()) {
if (respHandler != null) {
respHandler.handle(resp);
}
}
}

@Override
protected void checkComplete() {
}

@Override
public HttpClientRequest handler(Handler<HttpClientResponse> handler) {
synchronized (getLock()) {
respHandler = handler;
return this;
}
}

@Override
public boolean isChunked() {
return false;
}

@Override
public HttpMethod method() {
return method;
}

@Override
public String uri() {
return uri;
}

@Override
public String getHost() {
return host;
}

@Override
public MultiMap headers() {
return headers;
}

@Override
public HttpClientRequest write(Buffer data) {
throw new IllegalStateException();
}

@Override
public HttpClientRequest setWriteQueueMaxSize(int maxSize) {
throw new IllegalStateException();
}

@Override
public HttpClientRequest drainHandler(Handler<Void> handler) {
throw new IllegalStateException();
}

@Override
public HttpClientRequest pause() {
throw new IllegalStateException();
}

@Override
public HttpClientRequest resume() {
throw new IllegalStateException();
}

@Override
public HttpClientRequest endHandler(Handler<Void> endHandler) {
throw new IllegalStateException();
}

@Override
public HttpClientRequest setChunked(boolean chunked) {
throw new IllegalStateException();
}

@Override
public HttpClientRequest setHost(String host) {
throw new IllegalStateException();
}

@Override
public HttpClientRequest putHeader(String name, String value) {
throw new IllegalStateException();
}

@Override
public HttpClientRequest putHeader(CharSequence name, CharSequence value) {
throw new IllegalStateException();
}

@Override
public HttpClientRequest putHeader(String name, Iterable<String> values) {
throw new IllegalStateException();
}

@Override
public HttpClientRequest putHeader(CharSequence name, Iterable<CharSequence> values) {
throw new IllegalStateException();
}

@Override
public HttpClientRequest write(String chunk) {
throw new IllegalStateException();
}

@Override
public HttpClientRequest write(String chunk, String enc) {
throw new IllegalStateException();
}

@Override
public HttpClientRequest continueHandler(@Nullable Handler<Void> handler) {
throw new IllegalStateException();
}

@Override
public HttpClientRequest sendHead() {
throw new IllegalStateException();
}

@Override
public void end(String chunk) {
throw new IllegalStateException();
}

@Override
public void end(String chunk, String enc) {
throw new IllegalStateException();
}

@Override
public void end(Buffer chunk) {
throw new IllegalStateException();
}

@Override
public HttpClientRequest pushPromiseHandler(Handler<HttpClientRequest> handler) {
throw new IllegalStateException();
}

@Override
public void end() {
throw new IllegalStateException();
}

@Override
public boolean writeQueueFull() {
throw new IllegalStateException();
}
}
Expand Up @@ -20,6 +20,7 @@
import io.vertx.core.Handler; import io.vertx.core.Handler;
import io.vertx.core.MultiMap; import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer; import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpClientResponse; import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpHeaders; import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpVersion; import io.vertx.core.http.HttpVersion;
Expand Down
Expand Up @@ -609,7 +609,7 @@ public void reset(long code) {
} }


@Override @Override
public HttpServerResponse promisePush(io.vertx.core.http.HttpMethod method, String path, Handler<AsyncResult<HttpServerResponse>> handler) { public HttpServerResponse pushPromise(io.vertx.core.http.HttpMethod method, String path, Handler<AsyncResult<HttpServerResponse>> handler) {
handler.handle(Future.failedFuture("Push promise is only supported with HTTP2")); handler.handle(Future.failedFuture("Push promise is only supported with HTTP2"));
return this; return this;
} }
Expand Down

0 comments on commit d6673cb

Please sign in to comment.