Skip to content
Permalink
Browse files
feat: streamlined code, solid codecs (#4)
  • Loading branch information
tzssangglass committed May 21, 2021
1 parent 62c2e2c commit 17a690375652a545026a8ae4d2c474b99438d3a1
Showing 22 changed files with 400 additions and 472 deletions.
@@ -17,18 +17,22 @@

package org.apache.apisix.plugin.runner.codec;

import com.google.common.cache.Cache;
import io.github.api7.A6.PrepareConf.Req;
import org.apache.apisix.plugin.runner.handler.DefaultPayloadHandler;
import org.apache.apisix.plugin.runner.codec.impl.FlatBuffersDecoder;
import org.apache.apisix.plugin.runner.codec.impl.FlatBuffersEncoder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class PluginRunnerConfiguration {

@Bean
public DefaultPayloadHandler createPayloadHandler(Cache<Long, Req> cache) {
return new DefaultPayloadHandler(cache);
public PluginRunnerDecoder createDecoder() {
return new FlatBuffersDecoder();
}

@Bean
public PluginRunnerEncoder createEncoder() {
return new FlatBuffersEncoder();
}

}
@@ -15,11 +15,14 @@
* limitations under the License.
*/

package org.apache.apisix.plugin.runner.handler;
package org.apache.apisix.plugin.runner.codec;

import org.apache.apisix.plugin.runner.A6Request;
import org.apache.apisix.plugin.runner.A6Response;

public interface Filter {
A6Response filter(A6Request req);
import java.nio.ByteBuffer;

@FunctionalInterface
public interface PluginRunnerDecoder {

A6Request decode(ByteBuffer buffer);
}
@@ -15,16 +15,14 @@
* limitations under the License.
*/

package org.apache.apisix.plugin.runner.handler;
package org.apache.apisix.plugin.runner.codec;

import java.nio.ByteBuffer;

public interface PayloadHandler {
RequestHandler decode(ByteBuffer buffer);
import org.apache.apisix.plugin.runner.A6Response;

RequestHandler dispatch(RequestHandler handler);

ByteBuffer encode(RequestHandler handler);
import java.nio.ByteBuffer;

RequestHandler error();
@FunctionalInterface
public interface PluginRunnerEncoder {

ByteBuffer encode(A6Response response);
}
@@ -38,11 +38,11 @@ public static ByteBuffer getBody(ByteBuffer payload) {
return buffer;
}

public static ByteBuffer setBody(ByteBuffer payload, FrameType frameType) {
public static ByteBuffer setBody(ByteBuffer payload, byte type) {
byte[] data = new byte[payload.remaining()];
payload.get(data);
ByteBuffer buffer = ByteBuffer.allocate(data.length + 4);
buffer.put(frameType.getType());
buffer.put(type);
// data length
byte[] length = intToByte3(data.length);
buffer.put(length);
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.apisix.plugin.runner.codec.impl;

import io.github.api7.A6.Err.Code;
import org.apache.apisix.plugin.runner.A6ConfigRequest;
import org.apache.apisix.plugin.runner.A6ErrRequest;
import org.apache.apisix.plugin.runner.A6Request;
import org.apache.apisix.plugin.runner.HttpRequest;
import org.apache.apisix.plugin.runner.codec.PluginRunnerDecoder;
import org.apache.apisix.plugin.runner.codec.frame.FrameCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;

public class FlatBuffersDecoder implements PluginRunnerDecoder {

private final Logger logger = LoggerFactory.getLogger(FlatBuffersDecoder.class);

@Override
public A6Request decode(ByteBuffer buffer) {
byte type = buffer.get();
ByteBuffer body = FrameCodec.getBody(buffer);
switch (type) {
case 1:
return A6ConfigRequest.from(body);
case 2:
return HttpRequest.from(body);
default:
break;
}

logger.error("receive unsupported type: {}", type);
return new A6ErrRequest(Code.BAD_REQUEST);
}
}
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.apisix.plugin.runner.codec.impl;

import org.apache.apisix.plugin.runner.A6Response;
import org.apache.apisix.plugin.runner.codec.PluginRunnerEncoder;
import org.apache.apisix.plugin.runner.codec.frame.FrameCodec;

import java.nio.ByteBuffer;

public class FlatBuffersEncoder implements PluginRunnerEncoder {

@Override
public ByteBuffer encode(A6Response response) {
ByteBuffer buffer = response.encode();
if (null != response.getErrResponse()) {
return FrameCodec.setBody(buffer, (byte) 0);
}
return FrameCodec.setBody(buffer, response.getType());
}
}
@@ -17,16 +17,25 @@

package org.apache.apisix.plugin.runner.handler;

import com.google.common.cache.Cache;
import io.github.api7.A6.PrepareConf.Req;
import lombok.RequiredArgsConstructor;
import org.apache.apisix.plugin.runner.A6ConfigRequest;
import org.apache.apisix.plugin.runner.A6ConfigResponse;
import org.apache.apisix.plugin.runner.A6Request;
import org.apache.apisix.plugin.runner.A6Response;

/**
* Handle APISIX configuration request.
*/
@RequiredArgsConstructor
public class A6ConfigHandler implements Handler {

private final Cache<Long, Req> cache;

@Override
public void handle(A6Request request, A6Response response) {

Req req = ((A6ConfigRequest) request).getReq();
long token = ((A6ConfigResponse) response).getConfToken();
cache.put(token, req);
}
}
@@ -17,71 +17,84 @@

package org.apache.apisix.plugin.runner.handler;

import com.google.common.cache.Cache;
import io.github.api7.A6.Err.Code;
import io.github.api7.A6.PrepareConf.Req;
import org.apache.apisix.plugin.runner.A6ConfigResponse;
import org.apache.apisix.plugin.runner.A6ErrRequest;
import org.apache.apisix.plugin.runner.A6ErrResponse;
import org.apache.apisix.plugin.runner.A6Response;
import org.apache.apisix.plugin.runner.HttpRequest;
import org.apache.apisix.plugin.runner.HttpResponse;
import org.apache.apisix.plugin.runner.filter.FilterBean;
import org.apache.apisix.plugin.runner.filter.FilterChain;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;

@Configuration
public class A6HandlerConfiguration {

private final Logger logger = LoggerFactory.getLogger(A6HandlerConfiguration.class);

@Bean
public A6ConfigHandler createConfigHandler() {
return new A6ConfigHandler();
public A6ConfigHandler createConfigHandler(Cache<Long, Req> cache) {
return new A6ConfigHandler(cache);
}

@Bean
public A6HttpCallHandler createHttpHandler(ObjectProvider<FilterBean> beanProvider) {
public A6HttpCallHandler createHttpHandler(ObjectProvider<FilterBean> beanProvider, Cache<Long, Req> cache) {
List<FilterBean> filterList = beanProvider.orderedStream().collect(Collectors.toList());
FilterChain chain = null;
if (!filterList.isEmpty()) {
for (int i = filterList.size() - 1; i >= 0; i--) {
chain = new FilterChain(filterList.get(i), chain);
}
}
return new A6HttpCallHandler(chain);
return new A6HttpCallHandler(cache, chain);
}

@Bean
public FilterBean testFilter() {
return new FilterBean() {
@Override
public void doFilter(HttpRequest request, HttpResponse response, FilterChain chain) {

}

@Override
public int getOrder() {
return 0;
}
};
}

@Bean
public Dispatcher createDispatcher(A6ConfigHandler configHandler, A6HttpCallHandler httpCallHandler) {
return request -> {
A6Response response;
switch (request.getType()) {
case 0:
return null;
response = new A6ErrResponse(((A6ErrRequest) request).getCode());
return response;
case 1:
response = new A6ConfigResponse(1);
long confToken = ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE);
response = new A6ConfigResponse(confToken);
configHandler.handle(request, response);
return response;
case 2:
response = new HttpResponse(1);
response = new HttpResponse(((HttpRequest) request).getRequestId());
httpCallHandler.handle(request, response);
return response;
default:
return null;
logger.error("can not dispatch type: {}", request.getType());
response = new A6ErrResponse(Code.SERVICE_UNAVAILABLE);
return response;
}
};
}
@@ -14,27 +14,43 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.apisix.plugin.runner.handler;

import com.google.common.cache.Cache;
import io.github.api7.A6.Err.Code;
import io.github.api7.A6.PrepareConf.Req;
import lombok.RequiredArgsConstructor;
import org.apache.apisix.plugin.runner.A6Config;
import org.apache.apisix.plugin.runner.A6ErrResponse;
import org.apache.apisix.plugin.runner.A6Request;
import org.apache.apisix.plugin.runner.A6Response;
import org.apache.apisix.plugin.runner.HttpRequest;
import org.apache.apisix.plugin.runner.HttpResponse;
import org.apache.apisix.plugin.runner.filter.FilterChain;

@RequiredArgsConstructor
public class A6HttpCallHandler implements Handler {

private final Cache<Long, Req> cache;

private final FilterChain chain;

public A6HttpCallHandler(FilterChain chain) {
this.chain = chain;
}


@Override
public void handle(A6Request request, A6Response response) {
HttpRequest req = null;
HttpResponse rsp = null;
HttpRequest req = (HttpRequest) request;
HttpResponse rsp = (HttpResponse) response;

long confToken = ((HttpRequest) request).getConfToken();
io.github.api7.A6.PrepareConf.Req conf = cache.getIfPresent(confToken);
if (null == conf) {
A6ErrResponse errResponse = new A6ErrResponse(Code.CONF_TOKEN_NOT_FOUND);
rsp.setErrResponse(errResponse);
return;
}

A6Config config = new A6Config(conf);
req.setConfig(config);
chain.doFilter(req, rsp);

}
}

0 comments on commit 17a6903

Please sign in to comment.