Skip to content
Permalink
Browse files
feat: catching exceptions thrown during the writeAndFlush (#107)
  • Loading branch information
tzssangglass committed Jan 4, 2022
1 parent 6197a8e commit d9b92146affea4ddb19e43d1db9a94fc4aa4b4eb
Showing 4 changed files with 65 additions and 22 deletions.
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.apisix.plugin.runner.handler;

import io.github.api7.A6.Err.Code;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.apisix.plugin.runner.A6ErrResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExceptionCaughtHandler extends ChannelInboundHandlerAdapter {
private final Logger logger = LoggerFactory.getLogger(ExceptionCaughtHandler.class);

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("handle request error: ", cause);
A6ErrResponse errResponse = new A6ErrResponse(Code.SERVICE_UNAVAILABLE);
ctx.writeAndFlush(errResponse);
}
}
@@ -28,6 +28,8 @@

import com.google.common.cache.Cache;
import io.github.api7.A6.Err.Code;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
@@ -118,7 +120,9 @@ private void doFilter(ChannelHandlerContext ctx) {
PluginFilterChain chain = conf.getChain();
chain.filter(currReq, currResp);

ctx.writeAndFlush(currResp);
ChannelFuture future = ctx.writeAndFlush(currResp);
future.addListeners(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);

}

private void handleHttpReqCall(ChannelHandlerContext ctx, HttpRequest request) {
@@ -140,7 +144,8 @@ private void handleHttpReqCall(ChannelHandlerContext ctx, HttpRequest request) {

// if the filter chain is empty, then return the response directly
if (Objects.isNull(chain) || 0 == chain.getFilters().size()) {
ctx.writeAndFlush(currResp);
ChannelFuture future = ctx.writeAndFlush(currResp);
future.addListeners(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
return;
}

@@ -170,15 +175,17 @@ private void handleHttpReqCall(ChannelHandlerContext ctx, HttpRequest request) {
return;
}
ExtraInfoRequest extraInfoRequest = new ExtraInfoRequest(varKey, null);
ctx.writeAndFlush(extraInfoRequest);
ChannelFuture future = ctx.writeAndFlush(extraInfoRequest);
future.addListeners(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
}
}

// fetch the request body
if (requiredBody) {
queue.offer(EXTRA_INFO_REQ_BODY_KEY);
ExtraInfoRequest extraInfoRequest = new ExtraInfoRequest(null, true);
ctx.writeAndFlush(extraInfoRequest);
ChannelFuture future = ctx.writeAndFlush(extraInfoRequest);
future.addListeners(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
}

// no need to fetch the nginx variables or request body, just do filter
@@ -17,6 +17,21 @@

package org.apache.apisix.plugin.runner.server;

import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import com.google.common.cache.Cache;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
@@ -39,21 +54,7 @@
import org.apache.apisix.plugin.runner.handler.PayloadDecoder;
import org.apache.apisix.plugin.runner.handler.BinaryProtocolDecoder;
import org.apache.apisix.plugin.runner.handler.PayloadEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.apisix.plugin.runner.handler.ExceptionCaughtHandler;

@Component
@RequiredArgsConstructor
@@ -123,7 +124,8 @@ protected void initChannel(DomainSocketChannel channel) {
.addAfter("payloadEncoder", "delayedDecoder", new BinaryProtocolDecoder())
.addLast("payloadDecoder", new PayloadDecoder())
.addAfter("payloadDecoder", "prepareConfHandler", createConfigReqHandler(cache, beanProvider))
.addAfter("prepareConfHandler", "hTTPReqCallHandler", createA6HttpHandler(cache));
.addAfter("prepareConfHandler", "hTTPReqCallHandler", createA6HttpHandler(cache))
.addLast("exceptionCaughtHandler", new ExceptionCaughtHandler());

}
});
@@ -289,6 +289,4 @@ void testDoFilter3() {
io.github.api7.A6.HTTPReqCall.Resp.getRootAsResp(response.encode());
Assertions.assertEquals(resp.actionType(), Action.Stop);
}

//TODO add test cases about fetch nginx vars and request body
}

0 comments on commit d9b9214

Please sign in to comment.