Skip to content

Commit

Permalink
[FLINK-29812][rest] Remove deprecated Netty API usages
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Nov 2, 2022
1 parent 8c28fd3 commit 03c0f15
Show file tree
Hide file tree
Showing 20 changed files with 121 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ public CompletableFuture<Void> shutdownServer() {

final CompletableFuture<Void> groupShutdownFuture = new CompletableFuture<>();
if (bootstrap != null) {
EventLoopGroup group = bootstrap.group();
EventLoopGroup group = bootstrap.config().group();
if (group != null && !group.isShutdown()) {
group.shutdownGracefully(0L, 0L, TimeUnit.MILLISECONDS)
.addListener(
Expand Down Expand Up @@ -400,6 +400,6 @@ protected void initChannel(SocketChannel channel) throws Exception {

@VisibleForTesting
public boolean isEventGroupShutdown() {
return bootstrap == null || bootstrap.group().isTerminated();
return bootstrap == null || bootstrap.config().group().isTerminated();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public CompletableFuture<Void> shutdown() {
}

if (bootstrap != null) {
EventLoopGroup group = bootstrap.group();
EventLoopGroup group = bootstrap.config().group();
if (group != null && !group.isShutdown()) {
group.shutdownGracefully(0L, 0L, TimeUnit.MILLISECONDS)
.addListener(
Expand Down Expand Up @@ -229,6 +229,6 @@ public CompletableFuture<Void> shutdown() {

@VisibleForTesting
public boolean isEventGroupShutdown() {
return bootstrap == null || bootstrap.group().isTerminated();
return bootstrap == null || bootstrap.config().group().isTerminated();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg)
}

if (bootstrap != null) {
EventLoopGroup group = bootstrap.group();
EventLoopGroup group = bootstrap.config().group();
if (group != null) {
// note: no "quiet period" to not trigger Netty#4357
group.shutdownGracefully(0, 10, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpContent;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaderNames;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObject;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
Expand Down Expand Up @@ -107,20 +107,20 @@ public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
currentDecoder = null;
}

if (currentRequest.getMethod() == HttpMethod.GET
|| currentRequest.getMethod() == HttpMethod.DELETE) {
if (currentRequest.method() == HttpMethod.GET
|| currentRequest.method() == HttpMethod.DELETE) {
// directly delegate to the router
ctx.fireChannelRead(currentRequest);
} else if (currentRequest.getMethod() == HttpMethod.POST) {
} else if (currentRequest.method() == HttpMethod.POST) {
// POST comes in multiple objects. First the request, then the contents
// keep the request and path for the remaining objects of the POST request
currentRequestPath =
new QueryStringDecoder(currentRequest.getUri(), ENCODING).path();
new QueryStringDecoder(currentRequest.uri(), ENCODING).path();
currentDecoder =
new HttpPostRequestDecoder(DATA_FACTORY, currentRequest, ENCODING);
} else {
throw new IOException(
"Unsupported HTTP method: " + currentRequest.getMethod().name());
"Unsupported HTTP method: " + currentRequest.method().name());
}
} else if (currentDecoder != null && msg instanceof HttpContent) {
// received new chunk, give it to the current decoder
Expand Down Expand Up @@ -191,9 +191,9 @@ public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
HttpResponseStatus.INTERNAL_SERVER_ERROR,
Unpooled.wrappedBuffer(bytes));

response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain");
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
response.headers()
.set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
.set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());

ctx.writeAndFlush(response);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaderNames;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;

Expand Down Expand Up @@ -68,9 +68,9 @@ private void sendError(ChannelHandlerContext ctx, String error) {
Unpooled.wrappedBuffer(
error.getBytes(ConfigConstants.DEFAULT_CHARSET)));

response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain");
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
response.headers()
.set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
.set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());

ctx.writeAndFlush(response);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@
import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpChunkedInput;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaderValues;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpUtil;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedFile;
Expand All @@ -65,8 +66,8 @@
import java.util.Date;
import java.util.Locale;

import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.IF_MODIFIED_SINCE;
import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaderNames.CONNECTION;
import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaderNames.IF_MODIFIED_SINCE;
import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.OK;
Expand Down Expand Up @@ -232,10 +233,10 @@ private void respondWithFile(ChannelHandlerContext ctx, HttpRequest request, Str
if (!requestPath.equals("/joboverview.json")) {
StaticFileServerHandler.setDateAndCacheHeaders(response, file);
}
if (HttpHeaders.isKeepAlive(request)) {
response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
if (HttpUtil.isKeepAlive(request)) {
response.headers().set(CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
HttpHeaders.setContentLength(response, fileLength);
HttpUtil.setContentLength(response, fileLength);

// write the initial line and the header.
ctx.write(response);
Expand All @@ -256,7 +257,7 @@ private void respondWithFile(ChannelHandlerContext ctx, HttpRequest request, Str
}

// close the connection, if no keep-alive is needed
if (!HttpHeaders.isKeepAlive(request)) {
if (!HttpUtil.isKeepAlive(request)) {
lastContentFuture.addListener(ChannelFutureListener.CLOSE);
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,11 +212,11 @@ public void shutdown() {
this.serverChannel.close().awaitUninterruptibly();
}
if (bootstrap != null) {
if (bootstrap.group() != null) {
bootstrap.group().shutdownGracefully();
if (bootstrap.config().group() != null) {
bootstrap.config().group().shutdownGracefully();
}
if (bootstrap.childGroup() != null) {
bootstrap.childGroup().shutdownGracefully();
if (bootstrap.config().childGroup() != null) {
bootstrap.config().childGroup().shutdownGracefully();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpContent;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpContentDecompressor;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaderNames;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaderValues;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObject;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpUtil;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
import org.apache.flink.shaded.netty4.io.netty.util.CharsetUtil;
Expand Down Expand Up @@ -124,8 +126,8 @@ protected void initChannel(SocketChannel ch) throws Exception {
*
* <pre>
* HttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/overview");
* request.headers().set(HttpHeaders.Names.HOST, host);
* request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
* request.headers().set(HttpHeaderNames.HOST, host);
* request.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
*
* sendRequest(request);
* </pre>
Expand Down Expand Up @@ -163,8 +165,8 @@ public void sendGetRequest(String path, Duration timeout)

HttpRequest getRequest =
new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, path);
getRequest.headers().set(HttpHeaders.Names.HOST, host);
getRequest.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
getRequest.headers().set(HttpHeaderNames.HOST, host);
getRequest.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);

sendRequest(getRequest, timeout);
}
Expand All @@ -183,8 +185,8 @@ public void sendDeleteRequest(String path, Duration timeout)

HttpRequest getRequest =
new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.DELETE, path);
getRequest.headers().set(HttpHeaders.Names.HOST, host);
getRequest.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
getRequest.headers().set(HttpHeaderNames.HOST, host);
getRequest.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);

sendRequest(getRequest, timeout);
}
Expand All @@ -203,8 +205,8 @@ public void sendPatchRequest(String path, Duration timeout)

HttpRequest getRequest =
new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.PATCH, path);
getRequest.headers().set(HttpHeaders.Names.HOST, host);
getRequest.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
getRequest.headers().set(HttpHeaderNames.HOST, host);
getRequest.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);

sendRequest(getRequest, timeout);
}
Expand Down Expand Up @@ -323,11 +325,11 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Ex
if (msg instanceof HttpResponse) {
HttpResponse response = (HttpResponse) msg;

currentStatus = response.getStatus();
currentType = response.headers().get(HttpHeaders.Names.CONTENT_TYPE);
currentLocation = response.headers().get(HttpHeaders.Names.LOCATION);
currentStatus = response.status();
currentType = response.headers().get(HttpHeaderNames.CONTENT_TYPE);
currentLocation = response.headers().get(HttpHeaderNames.LOCATION);

if (HttpHeaders.isTransferEncodingChunked(response)) {
if (HttpUtil.isTransferEncodingChunked(response)) {
LOG.debug("Content is chunked");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ void shutdown() {
final long start = System.nanoTime();

if (bootstrap != null) {
if (bootstrap.group() != null) {
bootstrap.group().shutdownGracefully();
if (bootstrap.config().group() != null) {
bootstrap.config().group().shutdownGracefully();
}
bootstrap = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ void shutdown() {
}

if (bootstrap != null) {
if (bootstrap.group() != null) {
bootstrap.group().shutdownGracefully();
if (bootstrap.config().group() != null) {
bootstrap.config().group().shutdownGracefully();
}
bootstrap = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline;
import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpContent;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaderNames;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObject;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
Expand Down Expand Up @@ -113,9 +113,9 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms
final HttpRequest httpRequest = (HttpRequest) msg;
LOG.trace(
"Received request. URL:{} Method:{}",
httpRequest.getUri(),
httpRequest.getMethod());
if (httpRequest.getMethod().equals(HttpMethod.POST)) {
httpRequest.uri(),
httpRequest.method());
if (httpRequest.method().equals(HttpMethod.POST)) {
if (HttpPostRequestDecoder.isMultipart(httpRequest)) {
LOG.trace("Initializing multipart file upload.");
checkState(currentHttpPostRequestDecoder == null);
Expand Down Expand Up @@ -189,18 +189,16 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms
if (currentJsonPayload != null) {
currentHttpRequest
.headers()
.set(HttpHeaders.Names.CONTENT_LENGTH, currentJsonPayload.length);
.set(HttpHeaderNames.CONTENT_LENGTH, currentJsonPayload.length);
currentHttpRequest
.headers()
.set(
HttpHeaders.Names.CONTENT_TYPE,
RestConstants.REST_CONTENT_TYPE);
.set(HttpHeaderNames.CONTENT_TYPE, RestConstants.REST_CONTENT_TYPE);
ctx.fireChannelRead(currentHttpRequest);
ctx.fireChannelRead(
httpContent.replace(Unpooled.wrappedBuffer(currentJsonPayload)));
} else {
currentHttpRequest.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 0);
currentHttpRequest.headers().remove(HttpHeaders.Names.CONTENT_TYPE);
currentHttpRequest.headers().set(HttpHeaderNames.CONTENT_LENGTH, 0);
currentHttpRequest.headers().remove(HttpHeaderNames.CONTENT_TYPE);
ctx.fireChannelRead(currentHttpRequest);
ctx.fireChannelRead(LastHttpContent.EMPTY_LAST_CONTENT);
}
Expand Down Expand Up @@ -260,7 +258,7 @@ private void reset() {
}

public static FileUploads getMultipartFileUploads(ChannelHandlerContext ctx) {
return Optional.ofNullable(ctx.channel().attr(UPLOADED_FILES).getAndRemove())
return Optional.ofNullable(ctx.channel().attr(UPLOADED_FILES).getAndSet(null))
.orElse(FileUploads.EMPTY);
}
}

0 comments on commit 03c0f15

Please sign in to comment.