Need help with a load balancer server translating REST requests to RPC servers #306
-
I’m trying to create a server that acts as a load balancer. It should receive requests in the REST pattern and translate them to two other RPC servers, using the round-robin strategy. However, I’m having difficulties getting it to work. It might be something I’m doing wrong. I’ve made two different attempts to make the call with RPCClient:
All instances start to work, and the client can connecto to both servers. But, I can't send any request. The RPC servers are instances of the same code running on different ports. Server code: public class MyRpcServer extends Launcher {
private static int PORT = 8080;
public static final String PROPERTIES_FILE = "rpc-server.properties";
@Inject RpcServer rpcServer;
@SerializeRecord
public static record FooRequest(int id) {}
@SerializeRecord
public static record FooResponse(int status, String body) {}
@Provides
NioReactor reactor() {
return Eventloop.create();
}
@Provides
Config config() {
return Config.create()
.overrideWith(ofClassPathProperties(PROPERTIES_FILE, true))
.overrideWith(ofSystemProperties("config"));
}
@Provides
RpcServer rpcServer(NioReactor reactor, Config config) throws Exception {
PORT = config.get(ConfigConverters.ofInteger(), "http.port");
return RpcServer.builder(reactor)
.withMessageTypes(FooRequest.class, FooResponse.class)
.withHandler(
FooRequest.class, request -> Promise.of(new FooResponse(200, "it is working!!")))
.withListenPort(PORT)
.build();
}
@Override
protected void run() throws Exception {
rpcServer.listen();
System.out.println("Listening to: " + PORT);
awaitShutdown();
}
public static void main(String[] args) throws Exception {
MyRpcServer server = new MyRpcServer();
server.launch(args);
}
} Load balancer code: public class MyLoadBalancer extends HttpServerLauncher {
private static final Pattern PATH_REGEX =
Pattern.compile("/([1-5])/(fooPath|notImplementedPath)");
private static final Promise<HttpResponse> NOT_FOUND = HttpResponse.notFound404().toPromise();
private static final InetSocketAddress ADDRESS_01 = new InetSocketAddress("localhost", 8081);
private static final InetSocketAddress ADDRESS_02 = new InetSocketAddress("localhost", 8082);
@SerializeRecord
public static record FooRequest(int id) {}
@SerializeRecord
public static record FooResponse(int status, String body) {}
@Provides
RpcClient rpcClient(NioReactor reactor) {
var server1 = RpcStrategies.server(ADDRESS_01);
var server2 = RpcStrategies.server(ADDRESS_02);
var roundRobin = RoundRobin.create(server1, server2); // also have tried
// RpcStrategies#roundRobin
return RpcClient.builder(reactor)
.withMessageTypes(FooRequest.class, FooResponse.class)
.withStrategy(roundRobin)
.build();
}
@Provides
AsyncServlet servlet(NioReactor reactor, RpcClient rpcClient) {
return request -> {
Matcher matcher = PATH_REGEX.matcher(request.getPath());
boolean isValidPath = matcher.matches();
if (!isValidPath) return NOT_FOUND;
int id = Integer.parseInt(matcher.group(1));
String path = matcher.group(2);
if (path.equals("fooPath")) {
return rpcClient
.sendRequest(new FooRequest(id)) // the request will come up to here but it
// will wait until timeout
.then(
response -> {
FooResponse foo = (FooResponse) response;
return HttpResponse.ofCode(foo.status()).withBody(foo.body()).toPromise();
});
/*
* also tried: return rpcClient .start() .then(() -> rpcClient .sendRequest(new
* ExtratoRequest(id))) .then( response -> { FooResponse foo = (FooResponse) response;
* return HttpResponse.ofCode(foo.status()) .withBody(foo.body()) .toPromise(); });
* .whenComplete(rpcClient::stop);
*
* but it will give me a null pointer since "rpcClient.start()" will call a "changeStrategy"
* method.
*/
} else {
return NOT_FOUND;
}
};
}
@Override
protected void run() throws Exception {
awaitShutdown();
}
public static void main(String[] args) throws Exception {
MyLoadBalancer launcher = new MyLoadBalancer();
launcher.launch(args);
}
} Req Example that will not responds untill timeout: Any help would be greatly appreciated. Thank you! |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 1 reply
-
Hi, @ThusSpokeBieu First and foremost, it's essential to determine the threading model of the application. ActiveJ offers great flexibility in this regard, but it's crucial to understand how the application operates and what outcomes you aim to achieve. Most components in ActiveJ belong to one event loop or another, each running in its own separate thread. Additionally, there's the main application thread. We have several options:
In the In the To enable the @Override
protected Module getModule() {
return ServiceGraphModule.create();
} If you do so, you should also remove the I also noticed that you use regex to route HTTP requests. You could achieve a similar result with the |
Beta Was this translation helpful? Give feedback.
Hi, @ThusSpokeBieu
First and foremost, it's essential to determine the threading model of the application. ActiveJ offers great flexibility in this regard, but it's crucial to understand how the application operates and what outcomes you aim to achieve. Most components in ActiveJ belong to one event loop or another, each running in its own separate thread. Additionally, there's the main application thread. We have several options: