Skip to content

Commit

Permalink
Merge pull request #24 from congcoi123/feature/CR-7_Allows_multiple_U…
Browse files Browse the repository at this point in the history
…DP_channels

CR-7 Allows Multiple UDP Channels
  • Loading branch information
congcoi123 committed Oct 9, 2022
2 parents 16b0482 + 5c10d4a commit dcf9546
Show file tree
Hide file tree
Showing 17 changed files with 249 additions and 51 deletions.
2 changes: 1 addition & 1 deletion configuration.example.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
<Sockets>
<Port name="socket" type="tcp">8032</Port>
<Port name="websocket" type="websocket">8033</Port>
<Port name="datagram" type="udp">8034</Port>
</Sockets>
<Properties>
<Property name="websocket-using-ssl">false</Property>
Expand Down Expand Up @@ -62,6 +61,7 @@
<Worker name="websocket-producer">1</Worker>
<Worker name="websocket-consumer">2</Worker>
<Worker name="internal-processor">2</Worker>
<Worker name="udp-worker">3</Worker>
</Workers>
<Schedules>
<!-- Get the period checking in seconds which server can keep the empty
Expand Down
10 changes: 9 additions & 1 deletion src/main/java/com/tenio/core/api/ServerApi.java
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ default void switchParticipantToSpectator(Player player, Room room) {
* @param player the checking spectator ({@link Player} instance)
* @param room the current spectator's {@link Room}
* @param targetSlot a new position ({@code integer} value) of transformed "participant" in
* its room
* its room
* @throws UnsupportedOperationException this method is not supported at the moment
*/
default void switchSpectatorToParticipant(Player player, Room room, int targetSlot) {
Expand Down Expand Up @@ -287,4 +287,12 @@ default void sendPublicMessage(Player sender, Room room, ServerMessage message)
default void sendPrivateMessage(Player sender, Player recipient, ServerMessage message) {
throw new UnsupportedOperationException("Unsupported at the moment");
}

/**
* Retrieves the current available UDP port.
*
* @return an {@code integer} value of UDP port
*
*/
int getCurrentAvailableUdpPort();
}
5 changes: 5 additions & 0 deletions src/main/java/com/tenio/core/api/implement/ServerApiImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,11 @@ public void removeRoom(Room room, RoomRemoveMode removeRoomMode) {
getRoomManager().removeRoomById(room.getId());
}

@Override
public int getCurrentAvailableUdpPort() {
return server.getUdpChannelManager().getCurrentAvailableUdpPort();
}

private EventManager getEventManager() {
return server.getEventManager();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void load(String file) throws Exception {

socketConfigs.add(port);
}
push(CoreConfigurationType.SOCKET_CONFIGS, socketConfigs);
push(CoreConfigurationType.NETWORK_SOCKET_CONFIGS, socketConfigs);
// Network HTTPs
var attrNetworkHttps = XmlUtility.getNodeList(root, "//Server/Network/Http/Port");
for (int i = 0; i < attrNetworkHttps.getLength(); i++) {
Expand All @@ -128,7 +128,7 @@ public void load(String file) throws Exception {

httpConfigs.add(port);
}
push(CoreConfigurationType.HTTP_CONFIGS, httpConfigs);
push(CoreConfigurationType.NETWORK_HTTP_CONFIGS, httpConfigs);

// Implemented Classes
var attrImplementedClasses = XmlUtility.getNodeList(root, "//Server/Implements/Class");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,28 +112,32 @@ public enum CoreConfigurationType implements ConfigurationType {
/**
* The number of threads using for handlers to accept new incoming client socket on the server.
*/
THREADS_SOCKET_ACCEPTOR("socket-acceptor"),
WORKER_SOCKET_ACCEPTOR("socket-acceptor"),
/**
* The number of threads using for handlers to read new messages from client sockets on the
* server.
*/
THREADS_SOCKET_READER("socket-reader"),
WORKER_SOCKET_READER("socket-reader"),
/**
* The number of threads using for handlers to write new messages to client sockets on the server.
*/
THREADS_SOCKET_WRITER("socket-writer"),
WORKER_SOCKET_WRITER("socket-writer"),
/**
* The number of threads using for handlers of WebSocket producers on the server.
*/
THREADS_WEBSOCKET_PRODUCER("websocket-producer"),
WORKER_WEBSOCKET_PRODUCER("websocket-producer"),
/**
* The number of threads using for handlers of WebSocket consumers on the server.
*/
THREADS_WEBSOCKET_CONSUMER("websocket-consumer"),
WORKER_WEBSOCKET_CONSUMER("websocket-consumer"),
/**
* The number of threads using for handlers to manage internal processes on the server.
*/
THREADS_INTERNAL_PROCESSOR("internal-processor"),
WORKER_INTERNAL_PROCESSOR("internal-processor"),
/**
* The number of UDP channel will be opened on the server.
*/
WORKER_UDP_WORKER("udp-worker"),
/**
* Sets an interval to frequently check removable rooms for removing them.
*
Expand Down Expand Up @@ -235,11 +239,12 @@ public enum CoreConfigurationType implements ConfigurationType {
/**
* The list of socket configurations in the server configuration.
*/
SOCKET_CONFIGS("socket-configs"),
NETWORK_SOCKET_CONFIGS("socket-configs"),
/**
* The list of HTTP configurations in the server configuration.
*/
HTTP_CONFIGS("http-configs");
NETWORK_HTTP_CONFIGS("http-configs");

// Reverse-lookup map for getting a type from a value
private static final Map<String, CoreConfigurationType> lookup =
new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
The MIT License
Copyright (c) 2016-2022 kong <congcoi123@gmail.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/

package com.tenio.core.exception;

import com.tenio.core.configuration.define.CoreConfigurationType;

/**
* When an available Udp channel port is requested, but the list is empty. It might be caused by
* * {@link CoreConfigurationType#WORKER_UDP_WORKER} equals
* * to {@code 0}, or there was some exception occurred while establishing Udp channels.
*/
public final class EmptyUdpChannelsException extends RuntimeException {

private static final long serialVersionUID = 6979513728417343122L;

/**
* Initialization.
*/
public EmptyUdpChannelsException() {
super("The list is empty, please check in configuration.xml file if value of udp-channel is " +
"greater than 0, or make sure there is no exception while establishing udp channels");
}
}
7 changes: 7 additions & 0 deletions src/main/java/com/tenio/core/network/NetworkService.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,13 @@ void setConnectionFilterClass(Class<? extends ConnectionFilter> clazz, int maxCo
*/
void setSocketAcceptorServerAddress(String serverAddress);

/**
* Declares the number of Udp channel will be opened on the server.
*
* @param amountUdpWorkers the number of opening Udp channels
*/
void setSocketAcceptorAmountUdpWorkers(int amountUdpWorkers);

/**
* Sets the number of acceptor workers for the socket (TCP) which are using to accept new coming
* clients.
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/com/tenio/core/network/NetworkServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,11 @@ public void setSocketAcceptorServerAddress(String serverAddress) {
socketService.setAcceptorServerAddress(serverAddress);
}

@Override
public void setSocketAcceptorAmountUdpWorkers(int amountUdpWorkers) {
socketService.setAcceptorAmountUdpWorkers(amountUdpWorkers);
}

@Override
public void setSocketAcceptorWorkers(int workerSize) {
socketService.setAcceptorWorkerSize(workerSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ public interface ZeroSocketService extends Service {
*/
void setAcceptorServerAddress(String serverAddress);

/**
* Declares the number of Udp channel will be opened on the server.
*
* @param amountUdpWorkers the number of opening Udp channels
*/
void setAcceptorAmountUdpWorkers(int amountUdpWorkers);

/**
* Sets size of {@link ByteBuffer} using for an acceptor worker to read/write binaries data
* from/down.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@ public void setAcceptorServerAddress(String serverAddress) {
acceptorEngine.setServerAddress(serverAddress);
}

@Override
public void setAcceptorAmountUdpWorkers(int amountUdpWorkers) {
acceptorEngine.setAmountUdpWorkers(amountUdpWorkers);
}

@Override
public void setAcceptorBufferSize(int bufferSize) {
acceptorEngine.setMaxBufferSize(bufferSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ public interface ZeroAcceptor extends ZeroEngine {
*/
void setServerAddress(String serverAddress);

/**
* Declares the number of Udp channel will be opened on the server.
*
* @param amountUdpWorkers the number of opening Udp channels
*/
void setAmountUdpWorkers(int amountUdpWorkers);

/**
* Declares a list of socket configurations for the network.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ of this software and associated documentation files (the "Software"), to deal
import com.tenio.core.network.zero.engine.ZeroAcceptor;
import com.tenio.core.network.zero.engine.listener.ZeroAcceptorListener;
import com.tenio.core.network.zero.engine.listener.ZeroReaderListener;
import com.tenio.core.server.ServerImpl;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
Expand Down Expand Up @@ -61,6 +62,7 @@ public final class ZeroAcceptorImpl extends AbstractZeroEngine
private ConnectionFilter connectionFilter;
private ZeroReaderListener zeroReaderListener;
private String serverAddress;
private int amountUdpWorkers;
private List<SocketConfig> socketConfigs;

private ZeroAcceptorImpl(EventManager eventManager) {
Expand Down Expand Up @@ -93,8 +95,9 @@ private void initializeSockets() throws ServiceRuntimeException {
private void bindSocket(SocketConfig socketConfig) throws ServiceRuntimeException {
if (socketConfig.getType() == TransportType.TCP) {
bindTcpSocket(socketConfig.getPort());
} else if (socketConfig.getType() == TransportType.UDP) {
bindUdpSocket(socketConfig.getPort());
}
if (amountUdpWorkers > 0) {
bindUdpSocket();
}
}

Expand All @@ -106,36 +109,40 @@ private void bindTcpSocket(int port) throws ServiceRuntimeException {
serverSocketChannel.setOption(StandardSocketOptions.SO_REUSEPORT, true);
}
serverSocketChannel.socket().bind(new InetSocketAddress(serverAddress, port));
info("TCP SOCKET", buildgen("Started at address: ", serverAddress, ", port: ",
serverSocketChannel.socket().getLocalPort()));
// only server socket should interest in this key OP_ACCEPT
serverSocketChannel.register(acceptableSelector, SelectionKey.OP_ACCEPT);
synchronized (boundSockets) {
boundSockets.add(serverSocketChannel);
}

info("TCP SOCKET", buildgen("Started at address: ", serverAddress, ", port: ", port));
} catch (IOException e) {
throw new ServiceRuntimeException(e.getMessage());
}
}

private void bindUdpSocket(int port) throws ServiceRuntimeException {
private void bindUdpSocket() throws ServiceRuntimeException {
try {
var datagramChannel = DatagramChannel.open();
datagramChannel.configureBlocking(false);
if (OsUtility.getOperatingSystemType() != OsUtility.OsType.WINDOWS) {
datagramChannel.setOption(StandardSocketOptions.SO_REUSEPORT, true);
}
datagramChannel.setOption(StandardSocketOptions.SO_BROADCAST, true);
datagramChannel.socket().bind(new InetSocketAddress(serverAddress, port));
// udp datagram is a connectionless protocol, we don't need to create
// bi-direction connection, that why it's not necessary to register it to
// acceptable selector. Just leave it to the reader selector later
zeroReaderListener.acceptDatagramChannel(datagramChannel);
synchronized (boundSockets) {
boundSockets.add(datagramChannel);
for (int i = 0; i < amountUdpWorkers; i++) {
var datagramChannel = DatagramChannel.open();
datagramChannel.configureBlocking(false);
if (OsUtility.getOperatingSystemType() != OsUtility.OsType.WINDOWS) {
datagramChannel.setOption(StandardSocketOptions.SO_REUSEPORT, true);
}
datagramChannel.setOption(StandardSocketOptions.SO_BROADCAST, true);
datagramChannel.socket().bind(new InetSocketAddress(serverAddress, 0));
// udp datagram is a connectionless protocol, we don't need to create
// bi-direction connection, that why it's not necessary to register it to
// acceptable selector. Just leave it to the reader selector later
zeroReaderListener.acceptDatagramChannel(datagramChannel);
int boundPort = datagramChannel.socket().getLocalPort();
ServerImpl.getInstance().getUdpChannelManager().appendUdpPort(boundPort);
info("UDP SOCKET",
buildgen("Started at address: ", serverAddress, ", port: ", boundPort));
boundSockets.add(datagramChannel);
}
}

info("UDP SOCKET", buildgen("Started at address: ", serverAddress, ", port: ", port));
} catch (IOException e) {
throw new ServiceRuntimeException(e.getMessage());
}
Expand Down Expand Up @@ -316,6 +323,11 @@ public void setServerAddress(String serverAddress) {
this.serverAddress = serverAddress;
}

@Override
public void setAmountUdpWorkers(int amountUdpWorkers) {
this.amountUdpWorkers = amountUdpWorkers;
}

@Override
public void setSocketConfigs(List<SocketConfig> socketConfigs) {
this.socketConfigs = socketConfigs;
Expand Down

0 comments on commit dcf9546

Please sign in to comment.