Skip to content

Commit

Permalink
Merge pull request #142 from WeBankFinTech/feature/endpoint-weauth
Browse files Browse the repository at this point in the history
Endpoint feature enhancements
  • Loading branch information
junqizhang-dev committed Mar 19, 2020
2 parents 39f1ff9 + 4cbdbb2 commit 9c0e9ef
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 172 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@
public interface EndpointFunctor {

/**
* Execute the function via its String-typed responseBody (argument). First, use ``` to separate
* the arguments, and convert them to your ideal POJO type. Then, call the functions. Lastly,
* use any serialization function to return.
* Execute the callback function via its String-typed responseBody (argument). First, use ```
* to separate arguments and convert them to your ideal POJO type. Then, call the functions.
* Lastly, use any serialization function to return.
*
* @param arg argument String, separated by ```
* @return any serialized object String
*/
String execute(String arg);
String callback(String arg);

/**
* Return the basic description of this endpoint.
Expand Down
147 changes: 0 additions & 147 deletions src/main/java/com/webank/weid/suite/endpoint/EndpointSample.java

This file was deleted.

65 changes: 45 additions & 20 deletions src/main/java/com/webank/weid/suite/endpoint/RpcServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@
package com.webank.weid.suite.endpoint;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -53,8 +57,6 @@ public class RpcServer {
private static final Integer DEFAULT_BOSS_THREAD_NUM = 10;
private static final Integer DEFAULT_WORKER_THREAD_NUM = 20;
private static final Integer UUID_LENGTH = 36;
private static Integer LISTENER_PORT =
Integer.valueOf(PropertyUtils.getProperty("rpc.listener.port"));

/**
* Map structure to store requestName and its registered EndpointFunctor Impl.
Expand All @@ -64,33 +66,55 @@ public class RpcServer {
/**
* The main entrance for RPC server process.
*
* @param args String args
* @throws Exception any exception
*/
public static void main(String[] args) throws Exception {
public static void run() throws Exception {
if (implMap.size() == 0) {
logger.error("Initialization failed, exiting..");
System.exit(1);
}
EndpointDataUtil.loadAllEndpointInfoFromProps();
System.out.println("Trying to receive incoming traffic at Port: " + LISTENER_PORT);
logger.info("Trying to receive incoming traffic at Port: " + LISTENER_PORT);
ExecutorService pool = Executors.newCachedThreadPool();
AioQuickServer<String> server = new AioQuickServer<String>(LISTENER_PORT,
Integer listenerPort;
listenerPort =
Integer.valueOf(PropertyUtils.getProperty("endpoint.listener.port"));
System.out.println("Trying to receive incoming traffic at Port: " + listenerPort);
logger.info("Trying to receive incoming traffic at Port: " + listenerPort);
ExecutorService pool = new ThreadPoolExecutor(10, 200, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024), new ThreadPoolExecutor.AbortPolicy());
AioQuickServer<String> server = new AioQuickServer<String>(listenerPort,
new FixedLengthProtocol(),
new MessageProcessor<String>() {
@Override
public void process(AioSession<String> session, String msg) {
pool.execute(() -> {
String uuid = msg.substring(msg.length() - UUID_LENGTH);
logger.debug("RpcServer thread: " + Thread.currentThread().getId() + Thread
.currentThread().getName() + ", received msg: " + msg
+ ", extracted UUID: "
+ uuid + ", session ID: " + session.getSessionID());
System.out.println(
"RpcServer thread: " + Thread.currentThread().getId() + Thread
.currentThread().getName() + ", received msg: " + msg
+ ", extracted UUID: " + uuid + ", session ID: " + session
.getSessionID());
try {
InetSocketAddress remoteAddress = session.getRemoteAddress();
logger.debug("Remote request: " + remoteAddress.getHostString()
+ ", received msg: " + msg + ", extracted UUID: " + uuid
+ ", session ID: " + session.getSessionID());
System.out.println("Remote request: " + remoteAddress.getHostString()
+ ", received msg: " + msg + ", extracted UUID: " + uuid
+ ", session ID: " + session.getSessionID());
String whitelistedServerStr = PropertyUtils
.getProperty("endpoint.whitelisted.server");
List<String> whitelistedServers;
if (StringUtils.isEmpty(whitelistedServerStr)) {
whitelistedServers = new ArrayList<>();
} else {
whitelistedServers = Arrays.asList(whitelistedServerStr
.split(","));
}
if (!DataToolUtils.isLocalAddress(remoteAddress.getHostName())
&& !whitelistedServers.contains(remoteAddress.getHostName())) {
logger.error("Request from invalid host, ignored.");
System.out.println("Request from invalid host, ignored.");
return;
}
} catch (IOException e) {
logger.error("Failed to track remote address for session ID: " +
session.getSessionID());
}
String bizResult = StringUtils.EMPTY;
try {
bizResult = processClientMessage(msg);
Expand All @@ -114,6 +138,7 @@ public void process(AioSession<String> session, String msg) {
});
}

@Override
public void stateEvent(AioSession<String> session,
StateMachineEnum stateMachineEnum, Throwable throwable) {
}
Expand Down Expand Up @@ -199,6 +224,6 @@ public static String execute(String requestName, String requestBody) {
if (functorImpl == null) {
return StringUtils.EMPTY;
}
return functorImpl.execute(requestBody);
return functorImpl.callback(requestBody);
}
}
31 changes: 31 additions & 0 deletions src/main/java/com/webank/weid/util/DataToolUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@
import java.io.UnsupportedEncodingException;
import java.io.Writer;
import java.math.BigInteger;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.security.InvalidAlgorithmParameterException;
import java.security.NoSuchAlgorithmException;
Expand Down Expand Up @@ -1750,5 +1754,32 @@ public static boolean isUtf8String(String string) {
}
return false;
}

/**
* Check whether the address is local address.
*
* @param host host string
* @return true if yes, false otherwise
*/
public static boolean isLocalAddress(String host) {
InetAddress addr;
try {
addr = InetAddress.getByName(host);
} catch (UnknownHostException e) {
logger.error("Unkown host: " + host);
return false;
}
// Check if the address is a valid special local or loop back
if (addr.isSiteLocalAddress() || addr.isAnyLocalAddress() || addr.isLoopbackAddress()
|| addr.isLinkLocalAddress()) {
return true;
}
// Check if the address is defined on any interface
try {
return NetworkInterface.getByInetAddress(addr) != null;
} catch (SocketException e) {
return false;
}
}
}

2 changes: 1 addition & 1 deletion src/main/resources/weidentity.properties.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ zkp.cpt.array.length=5
#######################################################################################################
# Endpoint Service Integration-side parameters
# Listener port required to be opened for RPC Server, default: 6010
rpc.listener.port=6010
endpoint.listener.port=6010


#######################################################################################################
Expand Down

0 comments on commit 9c0e9ef

Please sign in to comment.