Skip to content

Commit

Permalink
引入注册表,将反射调用与线程池解耦
Browse files Browse the repository at this point in the history
  • Loading branch information
fzzfrjf committed May 19, 2022
1 parent 43a5918 commit 2e1351e
Show file tree
Hide file tree
Showing 12 changed files with 182 additions and 14 deletions.
4 changes: 2 additions & 2 deletions rpc-common/src/main/java/cn/fzzfrjf/entity/RpcResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ public class RpcResponse<T> implements Serializable {

private T data;

public <T> RpcResponse<T> success(T data,String requestId){
public static <T> RpcResponse<T> success(T data,String requestId){
RpcResponse<T> rpcResponse = new RpcResponse<>();
rpcResponse.setCode(ResponseCode.SUCCESS.getCode());
rpcResponse.setRequestId(requestId);
rpcResponse.setData(data);
return rpcResponse;
}

public <T>RpcResponse<T> fail(String requestId){
public static <T>RpcResponse<T> fail(String requestId){
RpcResponse<T> rpcResponse = new RpcResponse();
rpcResponse.setRequestId(requestId);
rpcResponse.setCode(ResponseCode.FAILURE.getCode());
Expand Down
15 changes: 15 additions & 0 deletions rpc-common/src/main/java/cn/fzzfrjf/enumeration/RpcError.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package cn.fzzfrjf.enumeration;


import lombok.AllArgsConstructor;
import lombok.Getter;

@Getter
@AllArgsConstructor
public enum RpcError {

SERVICE_NOT_REGISTERED("服务未能成功注册"),
SERVICE_NOT_FOUND("未能找到服务");

private String message;
}
18 changes: 18 additions & 0 deletions rpc-common/src/main/java/cn/fzzfrjf/exception/RpcException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package cn.fzzfrjf.exception;

import cn.fzzfrjf.enumeration.RpcError;

public class RpcException extends RuntimeException{

public RpcException(RpcError rpcError,String detail){
super(rpcError.getMessage() + detail);
}

public RpcException(RpcError rpcError){
super(rpcError.getMessage());
}

public RpcException(RpcError rpcError,Throwable cause){
super(rpcError.getMessage(),cause);
}
}
2 changes: 1 addition & 1 deletion rpc-server/src/main/java/cn/fzzfrjf/core/CommonServer.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package cn.fzzfrjf.core;

public interface CommonServer {
public void register(Object service ,int port);
public void start(int port);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package cn.fzzfrjf.core;

import cn.fzzfrjf.enumeration.RpcError;
import cn.fzzfrjf.exception.RpcException;
import cn.fzzfrjf.service.ServerPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

public class DefaultServerPublisher implements ServerPublisher {

private static final Logger logger = LoggerFactory.getLogger(DefaultServerPublisher.class);
private final ConcurrentHashMap<String,Object> serviceMap = new ConcurrentHashMap<>();
private final Set<String> registeredService = ConcurrentHashMap.newKeySet();
@Override
public synchronized <T> void publishService(T service) {
String serviceName = service.getClass().getCanonicalName();
if(serviceMap.containsKey(serviceName)) return;
registeredService.add(serviceName);
Class<?>[] interfaces = service.getClass().getInterfaces();
if(interfaces.length == 0){
throw new RpcException(RpcError.SERVICE_NOT_REGISTERED);
}
for (Class<?> anInterface : interfaces) {
serviceMap.put(anInterface.getCanonicalName(),service);
}
logger.info("向接口:{}注册服务:{}",interfaces,serviceName);
}

@Override
public synchronized Object getService(String serviceName) {
Object o = serviceMap.get(serviceName);
if(null == o){
throw new RpcException(RpcError.SERVICE_NOT_FOUND);
}
return o;
}
}
35 changes: 35 additions & 0 deletions rpc-server/src/main/java/cn/fzzfrjf/core/RequestHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package cn.fzzfrjf.core;

import cn.fzzfrjf.entity.RpcRequest;
import cn.fzzfrjf.entity.RpcResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

public class RequestHandler {

private static final Logger logger = LoggerFactory.getLogger(RequestHandler.class);

public Object handle(RpcRequest rpcRequest,Object service){
Object result = null;
try{
result = doMethod(rpcRequest,service);
}catch (InvocationTargetException|IllegalAccessException e){
logger.error("反射调用时发生错误:",e);
}
return result;
}

private Object doMethod(RpcRequest rpcRequest , Object service) throws InvocationTargetException, IllegalAccessException {
Method method;
try{
method = service.getClass().getMethod(rpcRequest.getMethodName(),rpcRequest.getParameterTypes());
}catch (NoSuchMethodException e){
logger.error("调用方法时有错误发生:",e);
return RpcResponse.fail(rpcRequest.getRequestId());
}
return method.invoke(service,rpcRequest.getParameters());
}
}
41 changes: 41 additions & 0 deletions rpc-server/src/main/java/cn/fzzfrjf/core/RequestHandlerThread.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package cn.fzzfrjf.core;

import cn.fzzfrjf.entity.RpcRequest;
import cn.fzzfrjf.entity.RpcResponse;
import cn.fzzfrjf.service.ServerPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;

public class RequestHandlerThread implements Runnable{

private static final Logger logger = LoggerFactory.getLogger(RequestHandlerThread.class);

private final RequestHandler handler;
private final ServerPublisher serverPublisher;
private final Socket socket;

public RequestHandlerThread(RequestHandler handler,ServerPublisher serverPublisher,Socket socket){
this.handler = handler;
this.serverPublisher = serverPublisher;
this.socket = socket;
}

@Override
public void run() {
try(ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream())){
RpcRequest rpcRequest = (RpcRequest) ois.readObject();
Object service = serverPublisher.getService(rpcRequest.getInterfaceName());
Object result = handler.handle(rpcRequest, service);
oos.writeObject(RpcResponse.success(result,rpcRequest.getRequestId()));
oos.flush();
}catch (IOException | ClassNotFoundException e){
logger.error("调用或发送时发生错误:",e);
}
}
}
20 changes: 13 additions & 7 deletions rpc-server/src/main/java/cn/fzzfrjf/core/SocketServer.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cn.fzzfrjf.core;


import cn.fzzfrjf.service.ServerPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -13,26 +14,31 @@ public class SocketServer implements CommonServer{

private final ExecutorService threadPool;
private static final Logger logger = LoggerFactory.getLogger(SocketServer.class);
private final ServerPublisher serverPublisher;

public SocketServer(){

public SocketServer(ServerPublisher serverPublisher){
this.serverPublisher = serverPublisher;
int corePoolSize = 5;
int maximumPoolSize = 50;
int keepAliveTime = 60;
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(100);
threadPool = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime, TimeUnit.SECONDS,workQueue,Executors.defaultThreadFactory());
}


@Override
public void register(Object service, int port) {
public void start(int port){
try(ServerSocket serverSocket = new ServerSocket(port)){
logger.info("服务器正在启动中.....");
Socket socket ;
while( (socket = serverSocket.accept()) != null){
logger.info("服务器成功启动。。。。");
Socket socket;
while((socket = serverSocket.accept()) != null){
logger.info("连接成功,客户端ip为:" + socket.getInetAddress());
threadPool.execute(new WorkThread(service,socket));
threadPool.execute(new RequestHandlerThread(new RequestHandler(),serverPublisher,socket));
}
threadPool.shutdown();
}catch (IOException e){
logger.error("连接时有错误发生:{}",e);
logger.error("服务器启动时发生错误:",e);
}
}
}
2 changes: 1 addition & 1 deletion rpc-server/src/main/java/cn/fzzfrjf/core/WorkThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public void run() {
RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
Method method = service.getClass().getMethod(rpcRequest.getMethodName(),rpcRequest.getParameterTypes());
Object result = method.invoke(service,rpcRequest.getParameters());
objectOutputStream.writeObject(new RpcResponse().success(result,rpcRequest.getRequestId()));
objectOutputStream.writeObject(RpcResponse.success(result,rpcRequest.getRequestId()));
objectOutputStream.flush();
}catch (IOException | ClassNotFoundException | InvocationTargetException |NoSuchMethodException | IllegalAccessException e){
logger.error("调用或发送时有错误发生:",e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package cn.fzzfrjf.service;

public interface ServerPublisher {

<T> void publishService(T service);

Object getService(String serviceName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public static void main(String[] args) {
SocketClient socketClient = new SocketClient();
ClientProxy proxy = new ClientProxy(socketClient,"127.0.0.1",9000);
HelloService service = (HelloService)proxy.getProxy(HelloService.class);
RpcObject rpcObject = new RpcObject(1,"This is SocketClient!");
RpcObject rpcObject = new RpcObject(2,"This is SocketClient!");
String s = service.sayHello(rpcObject);
System.out.println(s);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package cn.fzzfrjf.test;

import cn.fzzfrjf.core.DefaultServerPublisher;
import cn.fzzfrjf.core.SocketServer;
import cn.fzzfrjf.entity.HelloService;
import cn.fzzfrjf.service.HelloServiceImpl;
import cn.fzzfrjf.service.ServerPublisher;

public class SocketServerTest {
public static void main(String[] args) {
SocketServer socketServer = new SocketServer();
ServerPublisher serverPublisher = new DefaultServerPublisher();
HelloService helloService = new HelloServiceImpl();
socketServer.register(helloService,9000);
serverPublisher.publishService(helloService);
SocketServer socketServer = new SocketServer(serverPublisher);
socketServer.start(9000);
}
}

0 comments on commit 2e1351e

Please sign in to comment.