Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@
<artifactId>netty-transport-native-kqueue</artifactId>
<classifier>osx-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<classifier>linux-aarch_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-tcnative-boringssl-static</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,18 @@

package org.apache.apisix.plugin.runner.codec;

import org.apache.apisix.plugin.runner.codec.impl.FlatBuffersDecoder;
import org.apache.apisix.plugin.runner.codec.impl.FlatBuffersEncoder;
import com.google.common.cache.Cache;
import io.github.api7.A6.PrepareConf.Req;
import org.apache.apisix.plugin.runner.handler.DefaultPayloadHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class PluginRunnerConfiguration {

@Bean
public PluginRunnerDecoder createDecoder() {
return new FlatBuffersDecoder();
}


@Bean
public PluginRunnerEncoder createEncoder() {
return new FlatBuffersEncoder();
public DefaultPayloadHandler createPayloadHandler(Cache<Long, Req> cache) {
return new DefaultPayloadHandler(cache);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.apisix.plugin.runner.codec.frame;

import java.nio.ByteBuffer;

public class FrameCodec {

public static int getDataLength(ByteBuffer payload) {
byte[] bytes = new byte[3];
for (int i = 0; i < 3; i++) {
bytes[i] = payload.get();
}
return byte3ToInt(bytes);
}

public static ByteBuffer getBody(ByteBuffer payload) {
int length = getDataLength(payload);
ByteBuffer buffer = payload.slice();
byte[] dst = new byte[length];
buffer.get(dst, 0, length);
buffer.flip();
return buffer;
}

public static ByteBuffer setBody(ByteBuffer payload, FrameType frameType) {
byte[] data = new byte[payload.remaining()];
payload.get(data);
ByteBuffer buffer = ByteBuffer.allocate(data.length + 4);
buffer.put(frameType.getType());
// data length
byte[] length = intToByte3(data.length);
buffer.put(length);
// data
buffer.put(data);
buffer.flip();
return buffer;
}

private static byte[] intToByte3(int i) {
byte[] targets = new byte[3];
targets[2] = (byte) (i & 0xFF);
targets[1] = (byte) (i >> 8 & 0xFF);
targets[0] = (byte) ((i >> 16 & 0xFF));
return targets;
}

private static int byte3ToInt(byte[] bytes) {
return bytes[2] & 0xFF |
(bytes[1] & 0xFF << 8) |
(bytes[0] & 0xFF << 16);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,22 @@
* limitations under the License.
*/

package org.apache.apisix.plugin.runner.server.config;
package org.apache.apisix.plugin.runner.codec.frame;

import org.apache.apisix.plugin.runner.handler.IOHandler;
public enum FrameType {
RPC_ERROR((byte) 0),

@FunctionalInterface
public interface IOHandlerCustomizer {

void customize(IOHandler handler);

RPC_PREPARE_CONF((byte) 1),

RPC_HTTP_REQ_CALL((byte) 2);

private final byte type;

FrameType(byte type) {
this.type = type;
}

public byte getType() {
return type;
}
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.apisix.plugin.runner.handler;

import com.google.common.cache.Cache;
import com.google.flatbuffers.FlatBufferBuilder;
import io.github.api7.A6.Err.Code;
import io.github.api7.A6.PrepareConf.Req;
import lombok.RequiredArgsConstructor;
import org.apache.apisix.plugin.runner.codec.frame.FrameCodec;
import org.apache.apisix.plugin.runner.filter.FilterBean;
import org.apache.apisix.plugin.runner.filter.FilterChain;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.ObjectProvider;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.stream.Collectors;

@RequiredArgsConstructor
public class DefaultPayloadHandler implements PayloadHandler {

private final Logger logger = LoggerFactory.getLogger(DefaultPayloadHandler.class);

FlatBufferBuilder builder;

private RequestHandler handler;

private final Cache<Long, Req> cache;

public RequestHandler decode(ByteBuffer buffer) {
byte type = buffer.get();
ByteBuffer body = FrameCodec.getBody(buffer);

switch (type) {
case 1:
handler = new PrepareConfHandler(body, cache);
return handler;
case 2:
// FilterChain chain = createFilterChain(null);
handler = new HTTPReqCallHandler(body, cache, null);
return handler;
default:
break;
}

logger.error("receiving unsupport type: {}", type);
return error(Code.BAD_REQUEST);
}

private FilterChain createFilterChain(ObjectProvider<FilterBean> beanProvider) {
List<FilterBean> filterList = beanProvider.orderedStream().collect(Collectors.toList());
FilterChain chain = null;
if (!filterList.isEmpty()) {
for (int i = filterList.size() - 1; i >= 0; i--) {
chain = new FilterChain(filterList.get(i), chain);
}
}
return chain;
}

public ByteBuffer encode(RequestHandler handler) {
ByteBuffer buffer = this.handler.builder().dataBuffer();
return FrameCodec.setBody(buffer, this.handler.type());
}

@Override
public RequestHandler error() {
logger.error("process rpc call error");
return error(Code.SERVICE_UNAVAILABLE);
}

public RequestHandler dispatch(RequestHandler handler) {
builder = new FlatBufferBuilder();
builder = this.handler.handler(builder);
return handler;
}

private RequestHandler error(int code) {
return new ErrHandler(code);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,39 @@
* limitations under the License.
*/

package org.apache.apisix.plugin.runner;
package org.apache.apisix.plugin.runner.handler;

import io.github.api7.A6.PrepareConf.Req;
import com.google.flatbuffers.FlatBufferBuilder;
import io.github.api7.A6.Err.Resp;
import org.apache.apisix.plugin.runner.codec.frame.FrameType;

import java.nio.ByteBuffer;
public class ErrHandler implements RequestHandler {
private static final FrameType TYPE = FrameType.RPC_ERROR;

public class A6ConfigRequest implements A6Request {

private final int confToken;

public A6ConfigRequest(int confToken) {
this.confToken = confToken;
private int code = -1;
private FlatBufferBuilder builder;

public ErrHandler(int code) {
this.code = code;
}

@Override
public boolean isConfigRequest() {
return true;
public FlatBufferBuilder handler(FlatBufferBuilder builder) {
this.builder = builder;
Resp.startResp(builder);
Resp.addCode(builder, code == -1 ? 1 : code);
int orc = Resp.endResp(builder);
builder.finish(orc);
return builder;
}

@Override
public int getConfToken() {
return confToken;
public FlatBufferBuilder builder() {
return this.builder;
}

public static A6ConfigRequest from(ByteBuffer buffer) {
// TODO request id and confToken came from client?
Req req = Req.getRootAsReq(buffer);
return new A6ConfigRequest(0);

@Override
public FrameType type() {
return TYPE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,11 @@
* limitations under the License.
*/

package org.apache.apisix.plugin.runner.codec;
package org.apache.apisix.plugin.runner.handler;

import io.netty.buffer.ByteBuf;
import org.apache.apisix.plugin.runner.A6Request;
import org.apache.apisix.plugin.runner.A6Response;

@FunctionalInterface
public interface PluginRunnerDecoder {

A6Request decode(ByteBuf buf);

public interface Filter {
A6Response filter(A6Request req);
}
Loading