diff --git a/driver/api/src/main/java/eu/cloudnetservice/driver/channel/ChannelMessage.java b/driver/api/src/main/java/eu/cloudnetservice/driver/channel/ChannelMessage.java index bf6cb98aa8..74e1fc74c0 100644 --- a/driver/api/src/main/java/eu/cloudnetservice/driver/channel/ChannelMessage.java +++ b/driver/api/src/main/java/eu/cloudnetservice/driver/channel/ChannelMessage.java @@ -17,38 +17,40 @@ package eu.cloudnetservice.driver.channel; import com.google.common.base.Preconditions; -import eu.cloudnetservice.driver.DriverEnvironment; import eu.cloudnetservice.driver.event.events.channel.ChannelMessageReceiveEvent; import eu.cloudnetservice.driver.inject.InjectionLayer; import eu.cloudnetservice.driver.network.buffer.DataBuf; import eu.cloudnetservice.driver.provider.CloudMessenger; -import eu.cloudnetservice.driver.service.ServiceEnvironmentType; -import java.util.ArrayList; +import java.io.Closeable; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; import java.util.Collection; +import java.util.HashSet; +import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import lombok.NonNull; import org.jetbrains.annotations.ApiStatus; import org.jetbrains.annotations.Contract; import org.jetbrains.annotations.Nullable; /** - * Represents a message object which can be sent over the network with specific targets in mind. Unlike direct packet - * communication, channel messages are not bound to a specific messaging channels but can rather get sent to all - * components which are somewhere connected in the network. This means that it is possible to send a channel message to - * a service which is running on another node than the current service which is sending the channel message. + * Represents a message object that can be sent over the network with specific targets in mind. Unlike direct packet + * communication, channel messages are not bound to specific messaging channels but can rather get sent to all + * components that are somewhere connected in the network. This means that it is possible to send a channel message to a + * service which is running on another node than the service which is sending the channel message. *

* A channel message has two main identification points. One is the channel to which the message gets sent. The channel - * is a string object which is generally used to collect multiple types of channel message to a collection of message - * types. On the other hand a channel message contains a message object which should be unique within in the network and - * is used to identify a channel message in a group messages sent to the same channel. + * is a string generally used to group channel messages together. This is, for example, useful to identify all channel + * messages that are sent by a specific module. Further narrowing of the message type is done by using the message key, + * which should uniquely identify the specific message. Each channel message must be composed of a unique channel and + * message to distinguish it from other messages being sent in the cluster. *

* The message contains a {@link DataBuf} containing the actual content of the message. There is no real way to identify - * which types are in the buffer or not, therefore it is crucial that a channel message gets identified via its channel - * and/or message. + * which types are in the buffer or not, therefore, it is crucial that a channel message gets identified via its channel + * and message keys. *

- * If targets were given that are not locatable in the network they will get ignored silently. On the other hand this - * means that if you try to send a channel message to a non-existing target, the send method will block until the future - * wait timeout (30 seconds by default) expired before returning. + * If targets were given that are not locatable in the network, they will get ignored silently. *

* Note: there is no guarantee that the sender of a channel message is the actual component sending the message, as the * message can be modified on its way to the receiver. @@ -75,7 +77,7 @@ public record ChannelMessage( @NonNull DataBuf content, @NonNull ChannelMessageSender sender, @NonNull Collection targets -) { +) implements AutoCloseable { /** * Constructs a new, empty builder for a ChannelMessage. @@ -88,7 +90,7 @@ public record ChannelMessage( } /** - * Constructs a new builder which contains all needed data to respond to a channel message. As the channel message + * Constructs a new builder which contains all necessary data to respond to a channel message. As the channel message * will get directly handled by the waiting future, there is no need to actually set the channel and message of the * returned builder. The new builder will target the sender of the given input and has no data set. * @@ -98,13 +100,16 @@ public record ChannelMessage( */ @Contract("_ -> new") public static @NonNull Builder buildResponseFor(@NonNull ChannelMessage input) { - return builder().channel("").message("").target(input.sender.type(), input.sender.name()); + return builder().channel("").message("").target(input.sender.toTarget()); } /** - * Sends this channel message using the current messenger of the environment. This is in fact just a shortcut method - * for {@link CloudMessenger#sendChannelMessage(ChannelMessage)}. This method will not wait for the target component - * to respond (it doesn't even expect a response) but for the handling component to send the message. + * Sends this channel message using the current messenger of the environment. This is a shortcut method for + * {@link CloudMessenger#sendChannelMessage(ChannelMessage)}. This method will not wait for the target component to + * respond (it doesn't even expect a response) but for the handling component to send the message. + *

+ * Note: once the channel message was sent, the backing buffer gets released. Therefore, the caller must acquire the + * content buffer if this channel message is sent multiple times. */ public void send() { this.messenger().sendChannelMessage(this); @@ -113,9 +118,24 @@ public void send() { /** * Sends this channel message as a query and returns a future which waits for target component(s) to respond. This * method is a shortcut for {@link CloudMessenger#sendChannelMessageQueryAsync(ChannelMessage)}. The future will be - * completed when the target component responds or the query future times out (after 30 seconds). + * completed when the target component responds or the query future times out. + *

+ * Note: it is not possible for CloudNet to detect when a channel message query response was consumed. Therefore, it + * is crucial that the caller closes the responses to prevent memory leaks. Example: + *

+   * {@code
+   * ChannelMessage message = ...;
+   * message.sendQueryAsync().thenAccept(responses -> {
+   *   for (var response : responses) {
+   *     try (response) {
+   *       // do something with the response
+   *     }
+   *   }
+   * }
+   * }
+   * 
* - * @return a future completed with all responses of all target components of this channel message. + * @return a future completed with all responses of all components targeted by this channel message. */ public @NonNull CompletableFuture> sendQueryAsync() { return this.messenger().sendChannelMessageQueryAsync(this); @@ -124,34 +144,79 @@ public void send() { /** * Sends this channel message as a query and returns a future which waits for target component(s) to respond. Only the * first response of any target will get sent back to this component. This is in particular useful if there is only - * one target, or you are only expecting one component of the target components to respond. This is in fact just a - * shortcut method for {@link CloudMessenger#sendSingleChannelMessageQueryAsync(ChannelMessage)}. The future will be - * completed when one target component responds or the query future times out (after 30 seconds). + * one target, or you are only expecting one of the target components to respond. This is a shortcut method for + * {@link CloudMessenger#sendSingleChannelMessageQueryAsync(ChannelMessage)}. The future will be completed with the + * first received response of any target component (possibly null if no target responded). + *

+ * Note: it is not possible for CloudNet to detect when a channel message query response was consumed. Therefore, it + * is crucial that the caller closes the response to prevent memory leaks. Example: + *

+   * {@code
+   * ChannelMessage message = ...;
+   * message.sendSingleQueryAsync().thenAccept(response -> {
+   *   if (response != null) {
+   *     try (response) {
+   *       // do something with the response
+   *     }
+   *   }
+   * }
+   * }
+   * 
* - * @return a future completed with the first response of any target of this channel message. + * @return a future completed with the first received response of any target component or null if no target responded. */ public @NonNull CompletableFuture sendSingleQueryAsync() { return this.messenger().sendSingleChannelMessageQueryAsync(this); } /** - * Sends this channel message as a query and suspends the calling thread until all responses is available or the query - * timeout of 30 seconds is exceeded. This method is a shortcut for + * Sends this channel message as a query and blocks until all target components have responded to the query or the + * query timeout is exceeded.This method is a shortcut for * {@link CloudMessenger#sendChannelMessageQuery(ChannelMessage)}. + *

+ * Note: it is not possible for CloudNet to detect when a channel message query response was consumed. Therefore, it + * is crucial that the caller closes the responses to prevent memory leaks. Example: + *

+   * {@code
+   * ChannelMessage message = ...;
+   * Collection responses = message.sendQuery();
+   * for (var response : responses) {
+   *   try (response) {
+   *     // do something with the response
+   *   }
+   * }
+   * }
+   * 
* * @return all responses of all components this channel message is targeting. + * @throws CompletionException if an exception occurred while waiting for the query responses. */ public @NonNull Collection sendQuery() { return this.messenger().sendChannelMessageQuery(this); } /** - * Sends this channel message as a query and returns and blocks until one of the target component responded to this - * message or the query timeout of 30 seconds is exceeded. This is in particular useful if there is only one target, - * or you are only expecting one component of the target components to respond. This is in fact just a shortcut method - * for {@link CloudMessenger#sendSingleChannelMessageQueryAsync(ChannelMessage)}. + * Sends this channel message as a query and blocks until one of the target component responded to this message or the + * query timeout is exceeded. This is in particular useful if there is only one target, or you are only expecting one + * of the target components to respond. This is a shortcut method for + * {@link CloudMessenger#sendSingleChannelMessageQuery(ChannelMessage)}. + *

+ * Note: it is not possible for CloudNet to detect when a channel message query response was consumed. Therefore, it + * is crucial that the caller closes the response to prevent memory leaks. Example: + *

+   * {@code
+   * ChannelMessage message = ...;
+   * ChannelMessage response = message.sendSingleQuery();
+   * if (response != null) {
+   *   try (response) {
+   *     // do something with the response
+   *   }
+   * }
+   * }
+   * 
* - * @return the first response of any component this message is targeting. + * @return the first response of any component this message is targeting, null if no target responded. + * @throws CompletionException if an exception occurred while waiting for the query response. */ public @Nullable ChannelMessage sendSingleQuery() { return this.messenger().sendSingleChannelMessageQuery(this); @@ -162,10 +227,19 @@ public void send() { * * @return the current messenger of the environment. */ + @ApiStatus.Internal private @NonNull CloudMessenger messenger() { return InjectionLayer.boot().instance(CloudMessenger.class); } + /** + * {@inheritDoc} + */ + @Override + public void close() { + this.content.close(); + } + /** * A builder for a channel message. This class should be used over direct constructor access as allows better * customization and validation of a channel message which gets created. Required properties are: @@ -175,11 +249,25 @@ public void send() { *
  • at least one target for the message * *

    - * If no sender of the message is given the current network component will be used as the sender of the message. + * If no sender for the message is given, the current network component will be used as the sender of the message. + * The {@link #build()} method can only be called once for a channel message builder instance. + * + * @since 4.0 */ - public static final class Builder { + public static final class Builder implements Closeable { + + private static final VarHandle BUILD_CALLED; + + static { + try { + var lookup = MethodHandles.lookup(); + BUILD_CALLED = lookup.findVarHandle(Builder.class, "buildCalled", boolean.class); + } catch (NoSuchFieldException | IllegalAccessException exception) { + throw new ExceptionInInitializerError(exception); + } + } - private final Collection targets = new ArrayList<>(); + private final Collection targets = new HashSet<>(); private String channel; private String message; @@ -190,41 +278,61 @@ public static final class Builder { private DataBuf content; private ChannelMessageSender sender; + // internal marker to ensure that build() is only called once per builder instance + // this is due to the content data buf, it cannot be shared between multiple channel message instances + @SuppressWarnings("FieldMayBeFinal") // modified by BUILD_CALLED in build() + private volatile boolean buildCalled = false; + /** - * Sets the sender of this message. If no sender is given the current component will be used as the sender. Note - * that if you change the sender and try to receive a query response it will send the response to the given sender, - * not this component. + * Constructs a new builder instance. Use {@link ChannelMessage#builder()} instead. + */ + private Builder() { + } + + /** + * Sets the sender of this message. If no sender is given, the current component will be used as the sender. * * @param sender the sender of this message. * @return the same builder as used to call the method, for chaining. - * @throws NullPointerException if the given sender is null. - * @see ChannelMessageSender#self() + * @throws NullPointerException if the given sender is null. + * @throws IllegalStateException if the {@link #build()} method was already called on this builder. */ + @Contract("_ -> this") public @NonNull Builder sender(@NonNull ChannelMessageSender sender) { + this.assertValidState(); this.sender = sender; return this; } /** - * Sets the channel of this message. Might be empty but should be unique to identify within the network. + * Sets the channel of this message. The channel is primarily intended to group channel message of, for example, the + * same module. This makes it much easier for receivers to assess whether a message should get handled by them. It + * can be empty but should be unique to identify for the receiver. * - * @param channel the channel. + * @param channel the channel of this message. * @return the same builder as used to call the method, for chaining. - * @throws NullPointerException if the given channel is null. + * @throws NullPointerException if the given channel is null. + * @throws IllegalStateException if the {@link #build()} method was already called on this builder. */ + @Contract("_ -> this") public @NonNull Builder channel(@NonNull String channel) { + this.assertValidState(); this.channel = channel; return this; } /** - * Sets the message key of this message. Might be empty but should be unique to identify within the network. + * Sets the message key of this message. The key is primarily intended to uniquely identify one specific message to + * the receiver. It can be empty but should be unique to identify for the receiver. * * @param message the message key. * @return the same builder as used to call the method, for chaining. - * @throws NullPointerException if the given message is null. + * @throws NullPointerException if the given message is null. + * @throws IllegalStateException if the {@link #build()} method was already called on this builder. */ + @Contract("_ -> this") public @NonNull Builder message(@NonNull String message) { + this.assertValidState(); this.message = message; return this; } @@ -235,172 +343,182 @@ public static final class Builder { * * @param sync if the message should get send sync. * @return the same builder as used to call the method, for chaining. + * @throws IllegalStateException if the {@link #build()} method was already called on this builder. */ + @Contract("_ -> this") public @NonNull Builder sendSync(boolean sync) { + this.assertValidState(); this.sendSync = sync; return this; } /** - * Sets if the channel message should be prioritized over other channel messages. + * Sets if the channel message should get prioritized processing on the receiving components. *

    - * USE WITH CAUTION! This can cause other packet to get read and handled delayed. Use this option - * only if you know what you're doing and are absolutely sure that the packet is urgent for CloudNet to work for as - * expected. Otherwise, don't touch this method. + * USE WITH CAUTION! This can cause other lags and delays in the network handling of the + * receivers. Use this option only if you know what you're doing and are sure that the packet is urgent for CloudNet + * to work for as expected. Otherwise, don't touch this method. * - * @param prioritized if the channel message is prioritized + * @param prioritized if the channel message should get prioritized processing on the receiving components. * @return the same builder as used to call the method, for chaining. + * @throws IllegalStateException if the {@link #build()} method was already called on this builder. */ + @Contract("_ -> this") @ApiStatus.Experimental public @NonNull Builder prioritized(boolean prioritized) { + this.assertValidState(); this.prioritized = prioritized; return this; } /** - * Sets the content of this message. If no content was given an empty buffer will be used. + * Sets the content of this message. If no content was given, an empty buffer will be used. Note that any previously + * supplied content buffer won't be released when this method is called multiple times on the same builder. * - * @param dataBuf the content. + * @param dataBuf the content of the message. * @return the same builder as used to call the method, for chaining. + * @throws IllegalStateException if the {@link #build()} method was already called on this builder. */ + @Contract("_ -> this") public @NonNull Builder buffer(@Nullable DataBuf dataBuf) { + this.assertValidState(); this.content = dataBuf; return this; } /** - * Adds the given target as a target of this message. + * Adds the given channel message target as a target of this message. * * @param target the target to add. * @return the same builder as used to call the method, for chaining. - * @throws NullPointerException if the given target is null. + * @throws NullPointerException if the given target is null. + * @throws IllegalStateException if the {@link #build()} method was already called on this builder. */ + @Contract("_ -> this") public @NonNull Builder target(@NonNull ChannelMessageTarget target) { + this.assertValidState(); this.targets.add(target); return this; } /** - * Adds a channel message target to this message. You may not target an environment using this method. - * - * @param type the type of the receiver. - * @param name the name of the receiver. - * @return the same builder as used to call the method, for chaining. - * @throws IllegalArgumentException if type is {@link ChannelMessageTarget.Type#ENVIRONMENT} - * @throws NullPointerException if the given target type is null. - * @see ChannelMessageTarget#of(ChannelMessageTarget.Type, String) - */ - public @NonNull Builder target(@NonNull ChannelMessageTarget.Type type, @Nullable String name) { - return this.target(ChannelMessageTarget.of(type, name)); - } - - /** - * Adds a channel message target to this message. You may not target an environment using this method. + * Targets all components within the network. * - * @param environment the driver environment to target. - * @param name the name of the target, might be null to target all components with the given environment. * @return the same builder as used to call the method, for chaining. - * @throws IllegalArgumentException if type is {@link ChannelMessageTarget.Type#ENVIRONMENT} - * @throws NullPointerException if the given environment is null. - * @see ChannelMessageTarget#of(ChannelMessageTarget.Type, String) + * @throws IllegalStateException if the {@link #build()} method was already called on this builder. */ - public @NonNull Builder target(@NonNull DriverEnvironment environment, @Nullable String name) { - return this.target(environment == DriverEnvironment.NODE - ? ChannelMessageTarget.Type.NODE - : ChannelMessageTarget.Type.SERVICE, name); + @Contract(" -> this") + public @NonNull Builder targetAll() { + return this.target(ChannelMessageTarget.all()); } /** - * Targets all components with the given type. You may not target an environment using this method. + * Targets all nodes within the network. * - * @param type the type of the receivers to target. * @return the same builder as used to call the method, for chaining. - * @throws IllegalArgumentException if type is {@link ChannelMessageTarget.Type#ENVIRONMENT} - * @throws NullPointerException if the given target type is null. - * @see ChannelMessageTarget#of(ChannelMessageTarget.Type, String) + * @throws IllegalStateException if the {@link #build()} method was already called on this builder. */ - public @NonNull Builder targetAll(@NonNull ChannelMessageTarget.Type type) { - return this.target(type, null); + @Contract(" -> this") + public @NonNull Builder targetNodes() { + return this.target(ChannelMessageTarget.allNodes()); } /** - * Targets all components within the network. + * Targets all services within the network. * * @return the same builder as used to call the method, for chaining. - * @throws IllegalArgumentException if type is {@link ChannelMessageTarget.Type#ENVIRONMENT} - * @see ChannelMessageTarget#of(ChannelMessageTarget.Type, String) + * @throws IllegalStateException if the {@link #build()} method was already called on this builder. */ - public @NonNull Builder targetAll() { - return this.target(ChannelMessageTarget.Type.ALL, null); + @Contract(" -> this") + public @NonNull Builder targetServices() { + return this.target(ChannelMessageTarget.allServices()); } /** - * Targets all services within the network. + * Targets a specific node within the network. * + * @param nodeId the id of the node to target. * @return the same builder as used to call the method, for chaining. - * @throws IllegalArgumentException if type is {@link ChannelMessageTarget.Type#ENVIRONMENT} - * @see ChannelMessageTarget#of(ChannelMessageTarget.Type, String) + * @throws NullPointerException if the given node id is null. + * @throws IllegalStateException if the {@link #build()} method was already called on this builder. */ - public @NonNull Builder targetServices() { - return this.targetAll(ChannelMessageTarget.Type.SERVICE); + @Contract("_ -> this") + public @NonNull Builder targetNode(@NonNull String nodeId) { + return this.target(ChannelMessageTarget.node(nodeId)); } /** * Targets a specific service in the network. * - * @param name the name of the service to target. + * @param serviceName the name of the service to target. * @return the same builder as used to call the method, for chaining. + * @throws NullPointerException if the given service name is null. + * @throws IllegalStateException if the {@link #build()} method was already called on this builder. */ - public @NonNull Builder targetService(@Nullable String name) { - return this.target(ChannelMessageTarget.Type.SERVICE, name); + @Contract("_ -> this") + public @NonNull Builder targetService(@NonNull String serviceName) { + return this.target(ChannelMessageTarget.service(serviceName)); } /** * Targets all services of the given task within the network. * - * @param name the name of the task to target. + * @param taskName the name of the task to target. * @return the same builder as used to call the method, for chaining. + * @throws NullPointerException if the given task name is null. + * @throws IllegalStateException if the {@link #build()} method was already called on this builder. */ - public @NonNull Builder targetTask(@Nullable String name) { - return this.target(ChannelMessageTarget.Type.TASK, name); + @Contract("_ -> this") + public @NonNull Builder targetServicesOfTask(@NonNull String taskName) { + return this.target(ChannelMessageTarget.servicesByTask(taskName)); } /** - * Targets a specific node within the network. + * Targets all services of the given group within the network. * - * @param name the name of the node to target. + * @param groupName the name of the group to target. * @return the same builder as used to call the method, for chaining. + * @throws NullPointerException if the given group name is null. + * @throws IllegalStateException if the {@link #build()} method was already called on this builder. */ - public @NonNull Builder targetNode(@Nullable String name) { - return this.target(ChannelMessageTarget.Type.NODE, name); + @Contract("_ -> this") + public @NonNull Builder targetServicesOfGroup(@NonNull String groupName) { + return this.target(ChannelMessageTarget.servicesByGroup(groupName)); } /** - * Targets all nodes within the network. + * Targets all services with the given environment within the network. * + * @param environmentName the name of the environment to target. * @return the same builder as used to call the method, for chaining. + * @throws NullPointerException if the given environment name is null. + * @throws IllegalStateException if the {@link #build()} method was already called on this builder. */ - public @NonNull Builder targetNodes() { - return this.targetAll(ChannelMessageTarget.Type.NODE); + @Contract("_ -> this") + public @NonNull Builder targetServicesOfEnvironment(@NonNull String environmentName) { + return this.target(ChannelMessageTarget.servicesByEnvironment(environmentName)); } /** - * Targets all services with the given environment within the network. + * Targets all services that have the given property key associated with any value within the network. * - * @param environment the environment to target. + * @param propertyKey the key of the property that must be associated on target services. * @return the same builder as used to call the method, for chaining. - * @throws NullPointerException if the given environment is null. + * @throws NullPointerException if the given property key is null. + * @throws IllegalStateException if the {@link #build()} method was already called on this builder. */ - public @NonNull Builder targetEnvironment(@NonNull ServiceEnvironmentType environment) { - return this.target(ChannelMessageTarget.environment(environment)); + @Contract("_ -> this") + public @NonNull Builder targetServicesWithProperty(@NonNull String propertyKey) { + return this.target(ChannelMessageTarget.servicesWithProperty(propertyKey)); } /** - * Builds a channel message from this builder within the contract given in the Builder class java docs. + * Builds a channel message from this builder. * * @return the created channel message from this builder. * @throws NullPointerException if no message or channel is provided. * @throws IllegalArgumentException if no target was specified. + * @throws IllegalStateException if this method was called previously. */ @Contract(" -> new") public @NonNull ChannelMessage build() { @@ -408,14 +526,52 @@ public static final class Builder { Preconditions.checkNotNull(this.message, "No message provided"); Preconditions.checkArgument(!this.targets.isEmpty(), "No targets provided"); + // ensure a valid state *after* the other preconditions, the caller + // might recover or close this builder if any previous condition fails + if (!BUILD_CALLED.compareAndSet(this, false, true)) { + throw new IllegalStateException("ChannelMessage already built by this builder"); + } + + var content = Objects.requireNonNullElseGet(this.content, DataBuf::empty); + var sender = Objects.requireNonNullElseGet(this.sender, ChannelMessageSender::self); return new ChannelMessage( this.sendSync, this.prioritized, this.channel, this.message, - this.content == null ? DataBuf.empty() : this.content, - this.sender == null ? ChannelMessageSender.self() : this.sender, + content, + sender, this.targets); } + + /** + * Closes the content buffer held by this builder. No further actions can be performed on this builder after this + * message has been called. This method should be called in special cases when a channel message (and therefore the + * content buffer) is no longer needed. + * + * @throws IllegalStateException if the {@link #build()} method was already called on this builder. + */ + @Override + public void close() { + if (!BUILD_CALLED.compareAndSet(this, false, true)) { + throw new IllegalStateException("ChannelMessage already built by this builder"); + } + + if (this.content != null) { + this.content.release(); + this.content = null; + } + } + + /** + * Ensures that the {@link #build()} method was not yet called on this builder. + * + * @throws IllegalStateException if the {@link #build()} method was already called. + */ + private void assertValidState() { + if (this.buildCalled) { + throw new IllegalStateException("ChannelMessage already built by this builder"); + } + } } } diff --git a/driver/api/src/main/java/eu/cloudnetservice/driver/channel/ChannelMessageSender.java b/driver/api/src/main/java/eu/cloudnetservice/driver/channel/ChannelMessageSender.java index 5c10225f29..26145d7dea 100644 --- a/driver/api/src/main/java/eu/cloudnetservice/driver/channel/ChannelMessageSender.java +++ b/driver/api/src/main/java/eu/cloudnetservice/driver/channel/ChannelMessageSender.java @@ -25,11 +25,11 @@ /** * Represents a sender of a channel message. A channel message sender is not required to be the actual component sending - * the message nor is there a requirement for the name to match the driver environment when creating a new target. But + * the message, nor is there a requirement for the name to match the driver environment when creating a new target. But * it is strongly recommended to not mismatch information as it may lead to confusing results on the receiver site. *

    - * Note: It is not recommended using the constructor directly. Consider using either {@link #self()} if you want a jvm - * static sender representing the current network component or {@link #of(String, DriverEnvironment)} if you want to + * Note: It is not recommended using the constructor directly. Consider using either {@link #self()} if you want a + * sender instance representing the current network component or {@link #of(String, DriverEnvironment)} if you want to * create a sender for another network component. * * @param name the name of the new sender. @@ -68,7 +68,7 @@ public record ChannelMessageSender(@NonNull String name, @NonNull DriverEnvironm * * @param serviceInfoSnapshot the service to check. * @return true if this sender represents the given service, false otherwise. - * @throws NullPointerException if the given snapshot is null. + * @throws NullPointerException if the given service snapshot to compare to is null. */ public boolean is(@NonNull ServiceInfoSnapshot serviceInfoSnapshot) { return this.type.equals(DriverEnvironment.WRAPPER) && this.name.equals(serviceInfoSnapshot.name()); @@ -79,7 +79,7 @@ public boolean is(@NonNull ServiceInfoSnapshot serviceInfoSnapshot) { * * @param node the node to check. * @return true if this sender represents the given node, false otherwise. - * @throws NullPointerException if the given input is null. + * @throws NullPointerException if the given node to compare to is null. */ public boolean is(@NonNull NetworkClusterNode node) { return this.type.equals(DriverEnvironment.NODE) && this.name.equals(node.uniqueId()); @@ -93,9 +93,7 @@ public boolean is(@NonNull NetworkClusterNode node) { * @return a new {@link ChannelMessageTarget} based on the information of this sender. */ public @NonNull ChannelMessageTarget toTarget() { - var type = this.type.equals(DriverEnvironment.NODE) - ? ChannelMessageTarget.Type.NODE - : ChannelMessageTarget.Type.SERVICE; - return new ChannelMessageTarget(type, this.name); + var isNode = this.type.equals(DriverEnvironment.NODE); + return isNode ? ChannelMessageTarget.node(this.name) : ChannelMessageTarget.service(this.name); } } diff --git a/driver/api/src/main/java/eu/cloudnetservice/driver/channel/ChannelMessageTarget.java b/driver/api/src/main/java/eu/cloudnetservice/driver/channel/ChannelMessageTarget.java index dca143207d..2e0efdd3d9 100644 --- a/driver/api/src/main/java/eu/cloudnetservice/driver/channel/ChannelMessageTarget.java +++ b/driver/api/src/main/java/eu/cloudnetservice/driver/channel/ChannelMessageTarget.java @@ -16,42 +16,25 @@ package eu.cloudnetservice.driver.channel; -import com.google.common.base.Preconditions; +import eu.cloudnetservice.driver.cluster.NetworkClusterNode; +import eu.cloudnetservice.driver.document.property.DocProperty; +import eu.cloudnetservice.driver.service.GroupConfiguration; import eu.cloudnetservice.driver.service.ServiceEnvironmentType; -import lombok.EqualsAndHashCode; +import eu.cloudnetservice.driver.service.ServiceId; +import eu.cloudnetservice.driver.service.ServiceTask; +import java.util.Objects; import lombok.NonNull; -import lombok.ToString; +import org.jetbrains.annotations.ApiStatus; import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.UnknownNullability; /** - * Represents a receiver of a channel message. A channel message can be sent to a variety of receiver types including: - *

      - *
    • ALL: includes all network components which are within the network - *
    • NODE: includes all nodes which are within the network - *
    • SERVICE: includes all services which are within the network - *
    • TASK: includes all services of the given task - *
    • GROUP: includes all services which are in the given group - *
    • ENVIRONMENT: includes all services which are using the given environment. See {@link #environment(ServiceEnvironmentType)} - *
    - *

    - * A channel message target can optionally take a name of the target. If no name is provided its interpreted as - * "all components of the given type". It is illegal to create a target with no name given for the types - * {@link Type#TASK} and {@link Type#GROUP}. However, a creation of these types in combination with null as the - * name is a valid operation and will not fail. - *

    - * Note: Types are not overriding meaning that setting a target for all services and (for example) specifically the - * service Abc-1 is valid and will cause the service to receive the channel message twice. - *

    - * Do not extend this class to get access to it's constructors, use {@link #of(Type, String)} or for targeting a specific - * environment {@link #environment(ServiceEnvironmentType)} instead. + * Identifies one or more targets of a channel message, optionally with a {@link #name()} for further specification of a + * target. A channel message target can, for example, represent all components in the network or only a single node or + * service. For services, there are special targets for filtering, for example, based on the task they are based on. * - * @see ChannelMessage - * @see ChannelMessageSender * @since 4.0 */ -@ToString -@EqualsAndHashCode public final class ChannelMessageTarget { private static final ChannelMessageTarget ALL = new ChannelMessageTarget(Type.ALL, null); @@ -60,150 +43,271 @@ public final class ChannelMessageTarget { private final Type type; private final String name; - private final ServiceEnvironmentType environment; /** - * Constructs a new ChannelMessageTarget with no environment as its target. + * Constructs a new channel message target using the given type and optional name. For internal use only. * - * @param type the type of the target. - * @param name the optional name of the target. + * @param type the type of component targeted by this channel message target. + * @param name an optional name to further narrow the target. The meaning of the name is derived from the type. * @throws NullPointerException if the given type is null. */ - protected ChannelMessageTarget(@NonNull Type type, @Nullable String name) { + private ChannelMessageTarget(@NonNull Type type, @Nullable String name) { this.type = type; this.name = name; - this.environment = null; } /** - * Constructs a new ChannelMessageTarget targeting a specific service environment. No name will be set. + * Get a channel message target that targets all components in the network. * - * @param environment the environment of the target. + * @return a channel message target that targets all components in the network. + */ + public static @NonNull ChannelMessageTarget all() { + return ALL; + } + + /** + * Get a channel message target that targets all node components in the network. + * + * @return a channel message target that targets all node components in the network. + */ + public static @NonNull ChannelMessageTarget allNodes() { + return ALL_NODES; + } + + /** + * Get a channel message target that targets all service (wrapper) components in the network. + * + * @return a channel message target that targets all service components in the network. + */ + public static @NonNull ChannelMessageTarget allServices() { + return ALL_SERVICES; + } + + /** + * Get a channel message target that targets the given node in the network. + * + * @param node the node to target. + * @return a channel message target that targets the given node in the network. + * @throws NullPointerException if the given node to target is null. + */ + public static @NonNull ChannelMessageTarget node(@NonNull NetworkClusterNode node) { + return node(node.uniqueId()); + } + + /** + * Get a channel message target that targets the node with the given name in the network. + * + * @param nodeId the id of the node to target. + * @return a channel message target that targets the node with the given name in the network. + * @throws NullPointerException if the given node id is null. + */ + public static @NonNull ChannelMessageTarget node(@NonNull String nodeId) { + return new ChannelMessageTarget(Type.NODE, nodeId); + } + + /** + * Get a channel message target that targets the given service. Note that services are identified by their name and + * not by their unique id, therefore a channel message might be sent to a different service if the actual target + * restarts. + * + * @param serviceId the service id of the service to target. + * @return a channel message target that targets the given service. + * @throws NullPointerException if the given service id is null. + */ + public static @NonNull ChannelMessageTarget service(@NonNull ServiceId serviceId) { + return service(serviceId.name()); + } + + /** + * Get a channel message target that targets the service identified by the given name. Note that services are + * identified by their name and not by their unique id, therefore a channel message might be sent to a different + * service if the actual target restarts. + * + * @param serviceName the name of the service to target. + * @return a channel message target that targets the service identified by the given name. + * @throws NullPointerException if the given service name is null. + */ + public static @NonNull ChannelMessageTarget service(@NonNull String serviceName) { + return new ChannelMessageTarget(Type.SERVICE, serviceName); + } + + /** + * Get a channel message target that targets all services that are based on the given service task. + * + * @param task the service task to target. + * @return a channel message target that targets all services that are based on the given service task. + * @throws NullPointerException if the given service task is null. + */ + public static @NonNull ChannelMessageTarget servicesByTask(@NonNull ServiceTask task) { + return servicesByTask(task.name()); + } + + /** + * Get a channel message target that targets all services that are based on the service task identified by the given + * name. + * + * @param taskName the name of the task to target. + * @return a channel message target that targets all services that are based on the given service task. + * @throws NullPointerException if the given service task name is null. + */ + public static @NonNull ChannelMessageTarget servicesByTask(@NonNull String taskName) { + return new ChannelMessageTarget(Type.SERVICES_BY_TASK, taskName); + } + + /** + * Get a channel message target that targets all services that are in the given group. + * + * @param group the group to target. + * @return a channel message target that targets all services that are in the given group. + * @throws NullPointerException if the given group is null. + */ + public static @NonNull ChannelMessageTarget servicesByGroup(@NonNull GroupConfiguration group) { + return servicesByGroup(group.name()); + } + + /** + * Get a channel message target that targets all services that are in the group identified by the given name. + * + * @param groupName the name of the group to target. + * @return a channel message target that targets all services that are in the given group. + * @throws NullPointerException if the given group name is null. + */ + public static @NonNull ChannelMessageTarget servicesByGroup(@NonNull String groupName) { + return new ChannelMessageTarget(Type.SERVICES_BY_GROUP, groupName); + } + + /** + * Get a channel message target that targets all services that are using the given service environment. + * + * @param environment the environment to target. + * @return a channel message target that targets all services that are using the given service environment. * @throws NullPointerException if the given environment is null. */ - protected ChannelMessageTarget(@NonNull ServiceEnvironmentType environment) { - this.type = Type.ENVIRONMENT; - this.name = null; - this.environment = environment; + public static @NonNull ChannelMessageTarget servicesByEnvironment(@NonNull ServiceEnvironmentType environment) { + return servicesByEnvironment(environment.name()); } /** - * Internal method used for automated instantiation of this class. + * Get a channel message target that targets all services that are using the service environment identified by the + * given name. * - * @param type the type of the target. - * @param name the optional name of the target. - * @param environment the optional environment of the target. - * @throws NullPointerException if the given type is null. + * @param environmentName the name of the environment to target. + * @return a channel message target that targets all services that are using the given service environment. + * @throws NullPointerException if the given environment name is null. */ - protected ChannelMessageTarget( - @NonNull Type type, - @Nullable String name, - @Nullable ServiceEnvironmentType environment - ) { - this.type = type; - this.name = name; - this.environment = environment; + public static @NonNull ChannelMessageTarget servicesByEnvironment(@NonNull String environmentName) { + return new ChannelMessageTarget(Type.SERVICES_BY_ENV, environmentName); } /** - * Constructs a new ChannelMessageTarget targeting a specific service environment. + * Get a channel message target that targets all services that have the given property associated with any value. * - * @param type the environment to target. - * @return the created ChannelMessageTarget. - * @throws NullPointerException if the given type is null. + * @param property the property that must be associated on target services. + * @return a channel message target that targets all services that have the given property associated with any value. + * @throws NullPointerException if the given property is null. */ - public static @NonNull ChannelMessageTarget environment(@NonNull ServiceEnvironmentType type) { - return new ChannelMessageTarget(type); + public static @NonNull ChannelMessageTarget servicesWithProperty(@NonNull DocProperty property) { + return servicesWithProperty(property.key()); } /** - * Constructs a new ChannelMessageTarget of the given type with the optional name set. + * Get a channel message target that targets all services that have the property identified by the given key + * associated with any value. * - * @param type the type of the new channel message target. - * @param name optional name of the component with the given type to target. - * @return the created ChannelMessageTarget. - * @throws NullPointerException if given type is null. - * @throws IllegalArgumentException if type is ENVIRONMENT. For that purpose use {@link #environment(ServiceEnvironmentType)} - * instead. + * @param propertyKey the key of the property that must be associated on target services. + * @return a channel message target that targets all services that have the given property associated with any value. + * @throws NullPointerException if the given property key is null. */ - public static @NonNull ChannelMessageTarget of(@NonNull Type type, @Nullable String name) { - Preconditions.checkArgument(type != Type.ENVIRONMENT, "Unable to target environment using name"); - // check if we have a constant value for the type - if (name == null) { - switch (type) { - case ALL: - return ALL; - case NODE: - return ALL_NODES; - case SERVICE: - return ALL_SERVICES; - default: - break; - } - } - // create a new target for the type - return new ChannelMessageTarget(type, name); + public static @NonNull ChannelMessageTarget servicesWithProperty(@NonNull String propertyKey) { + return new ChannelMessageTarget(Type.SERVICES_WITH_PROPERTY, propertyKey); } /** - * Returns the type of network component this ChannelMessageTarget targets. Can never be null. + * Get the type of component that is targeted. For internal differentiation only. * - * @return the type of network component this ChannelMessageTarget targets. + * @return the type of component that is targeted. */ + @ApiStatus.Internal public @NonNull Type type() { return this.type; } /** - * Returns the name of the network component this ChannelMessageTarget targets. This may not be null if the target - * type is TASK or GROUP, but will always be null if the type is ENVIRONMENT. If a name is given when this channel - * message target has its type set to ALL it will silently be ignored. + * Get an optional name of the target component. Can be null if the target type does not demand for a name (for + * example, when targeting all services). For internal differentiation only. * - * @return the name of the network component this ChannelMessageTarget targets. + * @return an optional name of the target component. */ - public @UnknownNullability String name() { + @ApiStatus.Internal + @UnknownNullability("only null when the associated target type does not demand a name") + public String name() { return this.name; } /** - * Returns the service environment this ChannelMessageTarget targets. This may not be null if the target type is - * ENVIRONMENT. In all other cases the environment can be non-null but will silently get ignored. - * - * @return the service environment this ChannelMessageTarget targets. + * {@inheritDoc} + */ + @Override + public boolean equals(@Nullable Object other) { + return other instanceof ChannelMessageTarget otherTarget + && this.type == otherTarget.type + && Objects.equals(this.name, otherTarget.name); + } + + /** + * {@inheritDoc} */ - public @UnknownNullability ServiceEnvironmentType environment() { - return this.environment; + @Override + public int hashCode() { + return Objects.hash(this.type, this.name); } /** - * Represents a type of network component this channel message targets. + * {@inheritDoc} */ + @Override + public @NonNull String toString() { + return this.name == null + ? "ChannelMessageTarget[type=" + this.type + ']' + : "ChannelMessageTarget[type=" + this.type + ", name=" + this.name + ']'; + } + + /** + * The possible types of targets a channel message can have. Intended for internal differentiation only. + * + * @since 4.0 + */ + @ApiStatus.Internal public enum Type { /** - * Represents all network components in the network. A set name or environment will always get ignored. + * All available components. No name is associated with this target. */ ALL, /** - * Represents a node within the network. A name can optionally be present to target a specific node. + * One or multiple nodes in the network. A name can be supplied to select one particular target node. */ NODE, /** - * Represents a service within the network. A name can optionally be present to target a specific node. + * One or multiple services in the network. A name can be supplied to select one particular target service. */ SERVICE, + + /** + * All services in the network that are based on the provided task (identified by the required name). + */ + SERVICES_BY_TASK, /** - * Represents all services within the network which are using the given task as its base. A name must be present. + * All services in the network that are in on the provided group (identified by the required name). */ - TASK, + SERVICES_BY_GROUP, /** - * Represents all services within the network which are in the given group. A name must be present. + * All services in the network that are using the provided environment (identified by the required name). */ - GROUP, + SERVICES_BY_ENV, /** - * Represents all services within the network which are using the given environment. An environment must be - * present. + * All services in the network that have the given property key with any value (identified by the required name). */ - ENVIRONMENT + SERVICES_WITH_PROPERTY, } } diff --git a/driver/api/src/main/java/eu/cloudnetservice/driver/network/buffer/DataBuf.java b/driver/api/src/main/java/eu/cloudnetservice/driver/network/buffer/DataBuf.java index d807c7b09a..0077c52ca9 100644 --- a/driver/api/src/main/java/eu/cloudnetservice/driver/network/buffer/DataBuf.java +++ b/driver/api/src/main/java/eu/cloudnetservice/driver/network/buffer/DataBuf.java @@ -25,34 +25,29 @@ import org.jetbrains.annotations.Nullable; /** - * Represents an immutable buffer which is essentially a wrapper around some kind of readable buffer. By default, - * CloudNet wraps the netty ByteBuf meaning that read operations are deferred to an underlying byte array of nio byte - * buffer. + * Represents an immutable buffer, which is essentially a wrapper around some kind of readable buffer. By default, + * CloudNet wraps netty buffer instances and delegates each method call to them. *

    - * However, a data buf does not allow (in comparison to other wrappers) the random access to bytes at specific positions - * (for example using a netty ByteBuf {@code buf.getByte(index)} would be possible, but isn't in this buffer). But it - * must be possible for a reader to store the current position of the buffer and return to it (for example after - * reading). This is done by starting a transaction using {@link #startTransaction()}, reading or writing to the buffer - * and restoring the previous position by using {@link #redoTransaction()}. Note: This will not remove bytes written to - * the buffer, other write operations will however start from the original index and override the written bytes. + * However, a data buf does not allow (in comparison to other wrappers) the random access to bytes at specific + * positions. But it must be possible for a reader to store the current position of the buffer and return to it (for + * example, after reading). This is done by starting a transaction using {@link #startTransaction()}, reading or writing + * to the buffer and restoring the previous position by using {@link #redoTransaction()}. Note: This will not remove + * bytes written to the buffer, other writes will, however, start from the original index and override the written + * bytes. *

    * Other operations should work as expected on a buffer, reading should always start from the head of the buffer, * reflecting the operation over all other readers. If one reader reads a byte from the buffer, the next one will start * at the second byte in the buffer, not the first one. *

    - * Buffers are not required to be thread safe, they can but should be treated specially in these cases. Concurrent read - * and/or write operations will therefore produce (by default) different results when spread over threads. - *

    - * Buffers should avoid memory leaks by ensuring to release their content after the last byte of the buffer was read. - * The behaviour can be influenced by acquiring them using {@link #acquire()}. The buffer will only be released if every - * place that acquired the buffer has released it using {@link #release()} or {@link #close()}. In rare cases it might - * be necessary to release a buffer even if it's acquired, use {@link #forceRelease()} in that case. + * Buffers are not required to be thread safe, they should be treated specially in these cases. Concurrent read and/or + * write operations might therefore produce (by default) unspecified results when data buffers are accessed + * concurrently. *

    * To prevent exceptions during reading, it's worth noting that using {@code readableBytes() > 0} it is possible to * verify that there are still bytes left in the buffer to read. *

    - * It is not recommended using any constructor to create an instance of a data buf - you should obtain a factory for - * them and create your instance using the given factory methods. + * It is not recommended using any constructor to create an instance of a data buf - you should get a factory instance + * for them and create your instance using the given factory methods. * * @see DataBufFactory * @see Mutable @@ -282,22 +277,22 @@ public interface DataBuf extends AutoCloseable { * index will go back to 0. * * @return the same instance as used to call the method, for chaining. + * @throws IllegalStateException if this buffer was released. * @throws IndexOutOfBoundsException if an illegal action was made to buffer moving the reader or writer index. */ @NonNull DataBuf redoTransaction(); /** - * Converts this immutable buffer to a mutable one. There is no need to copy the underlying byte tracker, meaning that - * all writes will be reflected into this buffer and vise-versa. + * Converts this immutable buffer to a mutable one. The underlying memory is not shared between this buffer and the + * newly constructed mutable one. The returned buffer range starts at the current reader position of this buffer. * * @return a mutable variant of this buffer. + * @throws IllegalStateException if this buffer was released. */ @NonNull DataBuf.Mutable asMutable(); - // direct memory access - /** * Get if the current buffer is still accessible or if it was released already. * @@ -306,9 +301,11 @@ public interface DataBuf extends AutoCloseable { boolean accessible(); /** - * Get the amount of acquires that this data buf has. Initially a data buf is acquired once. + * Get the amount of times this buffer was acquired. A number greater than zero indicates that this buffer is + * accessible and not released, a number equal or less than zero indicates that this buffer was released and is + * inaccessible. * - * @return the amount of acquires. A value smaller or equal to zero means that the buffer was released. + * @return the amount of times this buffer was acquired. */ int acquires(); @@ -317,25 +314,28 @@ public interface DataBuf extends AutoCloseable { * but only release the buffer if there were more release than acquire calls. * * @return the same instance as used to call the method, for chaining. + * @throws IllegalStateException if this buffer was released or was acquired too many times. */ @NonNull DataBuf acquire(); /** - * Explicitly releases all data associated with this buffer making it unavailable for further reads. This method only - * decreases the acquire count of the buffer in case it was acquired at least once. + * Closes this buffer. In case the acquire count is exactly {@code 1}, the buffer content will be released and this + * buffer becomes inaccessible. If the acquire count is greater than {@code 1}, the acquire count is decreased by one + * and the buffer stays accessible. If the buffer was already released, this method does nothing. */ void release(); /** - * Explicitly releases all data associated with this buffer making it unavailable for further reads. This method does - * not check if anyone acquired the buffer, it will be released in any case. + * Forcibly closes the buffer, ignoring the current acquire count. The buffer will always be inaccessible after this + * method was invoked. */ void forceRelease(); /** - * Explicitly releases all data associated with this buffer making it unavailable for further reads. This method does - * nothing if releasing was disables before calling this method. + * Closes this buffer. In case the acquire count is exactly {@code 1}, the buffer content will be released and this + * buffer becomes inaccessible. If the acquire count is greater than {@code 1}, the acquire count is decreased by one + * and the buffer stays accessible. If the buffer was already released, this method does nothing. */ @Override void close(); @@ -352,6 +352,7 @@ interface Mutable extends DataBuf { * * @param b the boolean to write. * @return the same buffer used to call the method, for chaining. + * @throws IllegalStateException if this buffer was released. */ @NonNull DataBuf.Mutable writeBoolean(boolean b); @@ -361,6 +362,7 @@ interface Mutable extends DataBuf { * * @param integer the integer to write into the buffer. * @return the same buffer used to call the method, for chaining. + * @throws IllegalStateException if this buffer was released. */ @NonNull DataBuf.Mutable writeInt(int integer); @@ -370,6 +372,7 @@ interface Mutable extends DataBuf { * * @param b the byte to write into the buffer. * @return the same buffer used to call the method, for chaining. + * @throws IllegalStateException if this buffer was released. */ @NonNull DataBuf.Mutable writeByte(byte b); @@ -379,6 +382,7 @@ interface Mutable extends DataBuf { * * @param s the short to write into the buffer. * @return the same buffer used to call the method, for chaining. + * @throws IllegalStateException if this buffer was released. */ @NonNull DataBuf.Mutable writeShort(short s); @@ -388,6 +392,7 @@ interface Mutable extends DataBuf { * * @param l the long to write into the buffer. * @return the same buffer used to call the method, for chaining. + * @throws IllegalStateException if this buffer was released. */ @NonNull DataBuf.Mutable writeLong(long l); @@ -397,6 +402,7 @@ interface Mutable extends DataBuf { * * @param f the float to write into the buffer. * @return the same buffer used to call the method, for chaining. + * @throws IllegalStateException if this buffer was released. */ @NonNull DataBuf.Mutable writeFloat(float f); @@ -406,6 +412,7 @@ interface Mutable extends DataBuf { * * @param d the double to write into the buffer. * @return the same buffer used to call the method, for chaining. + * @throws IllegalStateException if this buffer was released. */ @NonNull DataBuf.Mutable writeDouble(double d); @@ -415,62 +422,67 @@ interface Mutable extends DataBuf { * * @param c the char to write into the buffer. * @return the same buffer used to call the method, for chaining. + * @throws IllegalStateException if this buffer was released. */ @NonNull DataBuf.Mutable writeChar(char c); /** - * Writes the given byte array into the buffer, prefixed by an integer containing the amount of bytes following in + * Writes the given byte array into the buffer, prefixed by an integer containing the number of bytes following in * the array. *

    * This method call is equivalent to {@code writeByteArray(b, b.length)}. * * @param b the byte array to write into the buffer. * @return the same buffer used to call the method, for chaining. + * @throws IllegalStateException if this buffer was released. */ @NonNull DataBuf.Mutable writeByteArray(byte[] b); /** - * Writes the given byte array into the buffer, prefixed by an integer containing the amount of bytes following in + * Writes the given byte array into the buffer, prefixed by an integer containing the number of bytes following in * the array. * * @param b the byte array to write into the buffer. - * @param amount the amount of bytes of the array to write into the buffer. + * @param amount the number of bytes to copy from the given byte array into this buffer. * @return the same buffer used to call the method, for chaining. + * @throws IllegalStateException if this buffer was released. */ @NonNull DataBuf.Mutable writeByteArray(byte[] b, int amount); /** - * Writes the unique id into the buffer by first writing the most significant bits of the id followed by the last - * significant bits of the id. + * Writes the unique id into the buffer. * * @param uuid the id to write into the buffer. * @return the same buffer used to call the method, for chaining. + * @throws IllegalStateException if this buffer was released. */ @NonNull DataBuf.Mutable writeUniqueId(@NonNull UUID uuid); /** - * Writes the string into the buffer. This method does the same thing as {@link #writeByteArray(byte[])}. The string - * gets converted into it's byte array representation and then written into the buffer like that. + * Writes the string into the buffer. * * @param string the string to write into the buffer. * @return the same buffer used to call the method, for chaining. + * @throws IllegalStateException if this buffer was released. */ @NonNull DataBuf.Mutable writeString(@NonNull String string); /** * Writes all data of the given data buffer into this data buffer starting at the current reader index of the given - * buffer. + * buffer. The reader and writer index of the given buffer are not modified by this method. The given buffer is + * released after being written into this buffer. *

    * Buffers are not expected to be cross-implementation-compatible. For instance, a netty buffer can only be written * to netty buffers. * * @param buf the buffer to write into this buffer. * @return the same buffer used to call the method, for chaining. + * @throws IllegalStateException if either this or the given buffer were released. */ @NonNull DataBuf.Mutable writeDataBuf(@NonNull DataBuf buf); @@ -480,21 +492,23 @@ interface Mutable extends DataBuf { * * @param obj the object to write into the buffer. * @return the same buffer used to call the method, for chaining. + * @throws IllegalStateException if this buffer was released. * @see ObjectMapper#writeObject(Mutable, Object) */ @NonNull DataBuf.Mutable writeObject(@Nullable Object obj); /** - * Writes the given object null-safe into this buffer. It appends a boolean before the actual object data (if the - * object is present) whether the data is present. The writer consumer is only called when the data is present and - * can then safely proceed to write all the required data into the buffer. The supplied buffer is the same buffer - * used for calling the method. + * Writes the given object null-safe into this buffer. It appends a boolean before the actual object data to + * indicate if the object is non-null. The writer consumer is only called when the data is present and can then + * safely proceed to write all the required data into the buffer. The supplied buffer is the same buffer used for + * calling the method. * * @param object the object which should be safely written into this buffer. * @param handlerWhenNonNull the writer of the object when it's non-null. * @param the generic type of the object being written. * @return the same buffer used to call the method, for chaining. + * @throws IllegalStateException if this buffer was released. */ @NonNull DataBuf.Mutable writeNullable( @@ -502,23 +516,22 @@ DataBuf.Mutable writeNullable( @NonNull BiConsumer handlerWhenNonNull); /** - * Ensures that this buffer has at least the given amount of bytes unused for writing data. If the buffer already - * has the amount of bytes present, this method returns immediately. + * Ensures that this buffer has at least the given number of bytes available for writing data. If the buffer already + * has the number of bytes present, this method returns immediately. * - * @param bytes the bytes that must be available in the buffer. + * @param bytes the number of bytes that should be available for writing. * @return this buffer, for chaining. * @throws IllegalArgumentException if the given byte count is negative. + * @throws IllegalStateException if this buffer was released. */ @NonNull DataBuf.Mutable ensureWriteable(int bytes); - // utility for reading - /** - * Converts this buffer into an immutable version of it. The underlying buffer is not expected to be clones, that - * means that writes to this buffer are still reflected into the immutable version of it and vise-versa. + * Wraps the underlying buffer into a read-only variant. The underlying memory, lifetime and reader/writer positions + * are shared between this buffer and the read-only variant. * - * @return an immutable version of this buffer. + * @return an immutable wrap of this buffer. */ @NonNull DataBuf asImmutable(); diff --git a/driver/api/src/main/java/eu/cloudnetservice/driver/network/chunk/ChunkSessionInformation.java b/driver/api/src/main/java/eu/cloudnetservice/driver/network/chunk/ChunkSessionInformation.java index 2d91280c80..dbed6d7b6c 100644 --- a/driver/api/src/main/java/eu/cloudnetservice/driver/network/chunk/ChunkSessionInformation.java +++ b/driver/api/src/main/java/eu/cloudnetservice/driver/network/chunk/ChunkSessionInformation.java @@ -19,6 +19,7 @@ import com.google.common.base.Utf8; import eu.cloudnetservice.driver.network.buffer.DataBuf; import eu.cloudnetservice.driver.network.buffer.DataBufable; +import java.io.Closeable; import java.util.UUID; import lombok.NonNull; import org.jetbrains.annotations.ApiStatus; @@ -31,7 +32,7 @@ * * @since 4.0 */ -public final class ChunkSessionInformation implements DataBufable { +public final class ChunkSessionInformation implements DataBufable, Closeable { private int chunkSize; private UUID sessionUniqueId; @@ -142,6 +143,14 @@ public int chunkSize() { return this.transferInformation; } + /** + * {@inheritDoc} + */ + @Override + public void close() { + this.transferInformation.close(); + } + /** * {@inheritDoc} */ diff --git a/driver/api/src/main/java/eu/cloudnetservice/driver/provider/CloudMessenger.java b/driver/api/src/main/java/eu/cloudnetservice/driver/provider/CloudMessenger.java index 019fee5de9..2a3df993b7 100644 --- a/driver/api/src/main/java/eu/cloudnetservice/driver/provider/CloudMessenger.java +++ b/driver/api/src/main/java/eu/cloudnetservice/driver/provider/CloudMessenger.java @@ -19,33 +19,27 @@ import eu.cloudnetservice.driver.channel.ChannelMessage; import java.util.Collection; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import lombok.NonNull; import org.jetbrains.annotations.Nullable; /** * The main messaging api for communication in any form between components in the CloudNet cluster aside from sending - * raw packets. The main difference between the raw packet api (network component based) and this api is, that this - * network point will search the route to the target component rather than only accepting direct writes to a specific - * target component. + * raw packets. The main difference between the raw packet api (network-component-based) and this api is that this api + * searches the route to the target component rather than only accepting direct writes to a specific target component. *

    * The target component search is only one layer deep, meaning that you can only send a channel message to another * component in the network known to the handling node, or its parent component (for services). Any other communication - * form would break the normal CloudNet cluster structure. Channel messages can get send to: + * form would break the normal CloudNet cluster structure. Channel messages can be sent to: *

      - *
    1. Services: in this case the handling node tries either to send the message directly to the service (if it is running - * on the local node) or via the parent node of the service (which must be connected!). - *
    2. Nodes: in this case the handling node sends the channel message directly to the connected node. There is no - * second layer check, all nodes must be connected to all nodes (as per the CloudNet cluster contract). This means - * that if (for example) Node-3 is only connected to Node-2 (which is connected to Node-1), and Node-1 receives a - * channel message for Node-3 this will not work. This will work: - *
        - *
      1. Node-1 gets a message for Node-2 (or the other way around). - *
      2. Node-2 gets a message for Node-3 (or the other way around). - *
      + *
    3. Services: in this case, the handling node tries either to send the message directly to the service (if it is + * running on the local node) or via the node that is handling the service (which is connected to the handling node as + * required by the CloudNet cluster structure). + *
    4. Nodes: in this case, the handling node sends the channel message directly to the connected node. This is + * possible as all nodes must be connected to all other nodes (as per the CloudNet cluster contract). This means that + * if (for example) Node-3 is only connected to Node-2 (which is connected to Node-1), and Node-1 receives a + * channel message for Node-3, the message cannot be routed to the target node. *
    - *

    - * A channel message received by a network component should always be acknowledged by the handling participant if it is - * a query message to prevent possible deadlocks on the sender side. * * @see ChannelMessage * @since 4.0 @@ -53,7 +47,11 @@ public interface CloudMessenger { /** - * Sends the given channel message to all of its targets without waiting for a response from them. + * Sends the given channel message to all of its targets. This method will not wait for the target component to + * respond (it doesn't even expect a response) but for the handling component to send the message. + *

    + * Note: once the channel message was sent, the backing buffer gets released. Therefore, the caller must acquire the + * content buffer if the given channel message is sent multiple times. * * @param channelMessage the channel message to send. * @throws NullPointerException if the given channel message is null. @@ -61,56 +59,121 @@ public interface CloudMessenger { void sendChannelMessage(@NonNull ChannelMessage channelMessage); /** - * Sends the given channel message to all of its targets and waits for all responses to be present or the query to - * time out. + * Sends the given channel message as a query and blocks until all target components have responded to the query or + * the query timeout is exceeded. + *

    + * Note: it is not possible for CloudNet to detect when a channel message query response was consumed. Therefore, it + * is crucial that the caller closes the responses to prevent memory leaks. Example: + *

    +   * {@code
    +   * ChannelMessage message = ...;
    +   * Collection responses = messenger.sendChannelMessageQuery(message);
    +   * for (var response : responses) {
    +   *   try (response) {
    +   *     // do something with the response
    +   *   }
    +   * }
    +   * }
    +   * 
    * * @param channelMessage the channel message to send. - * @return all responses from all network components which responded in time. + * @return all responses of all components the given channel message is targeting. * @throws NullPointerException if the given channel message is null. + * @throws CompletionException if an exception occurred while waiting for the query responses. */ @NonNull Collection sendChannelMessageQuery(@NonNull ChannelMessage channelMessage); /** - * Sends the given channel message to all of its targets and waits for all responses to be present or the query to - * time out. This method will then peek the first response out of the returned array, or return null if no components - * answered to the request. + * Sends the given channel message as a query and blocks until one of the target component responded to the message or + * the query timeout is exceeded. This is in particular useful if there is only one target, or you are only expecting + * one of the target components to respond. + *

    + * Note: it is not possible for CloudNet to detect when a channel message query response was consumed. Therefore, it + * is crucial that the caller closes the response to prevent memory leaks. Example: + *

    +   * {@code
    +   * ChannelMessage message = ...;
    +   * ChannelMessage response = messenger.sendSingleChannelMessageQuery(message);
    +   * if (response != null) {
    +   *   try (response) {
    +   *     // do something with the response
    +   *   }
    +   * }
    +   * }
    +   * 
    * * @param channelMessage the channel message to send. - * @return the first response to the given channel message, can be null if no target responded. + * @return the first response of any component the given message is targeting, null if no target responded. * @throws NullPointerException if the given channel message is null. + * @throws CompletionException if an exception occurred while waiting for the query response. */ @Nullable ChannelMessage sendSingleChannelMessageQuery(@NonNull ChannelMessage channelMessage); /** - * Sends the given channel message to all of its targets without waiting for a response from them. + * Sends the given channel message to all of its targets. This method will not wait for the target component to + * respond (it doesn't even expect a response) but for the handling component to send the message. + *

    + * Note: once the channel message was sent, the backing buffer gets released. Therefore, the caller must acquire the + * content buffer if the given channel message is sent multiple times. * * @param channelMessage the channel message to send. - * @return a task completed when all channel messages were sent. + * @return a future completed when the given channel message was sent. * @throws NullPointerException if the given channel message is null. */ @NonNull CompletableFuture sendChannelMessageAsync(@NonNull ChannelMessage channelMessage); /** - * Sends the given channel message to all of its targets and waits for all responses to be present or the query to - * time out. + * Sends the given channel message as a query and returns a future which waits for target component(s) to respond. The + * future will be completed when the target component responds or the query future times out. + *

    + * Note: it is not possible for CloudNet to detect when a channel message query response was consumed. Therefore, it + * is crucial that the caller closes the responses to prevent memory leaks. Example: + *

    +   * {@code
    +   * ChannelMessage message = ...;
    +   * messenger.sendChannelMessageQueryAsync(message).thenAccept(responses -> {
    +   *   for (var response : responses) {
    +   *     try (response) {
    +   *       // do something with the response
    +   *     }
    +   *   }
    +   * }
    +   * }
    +   * 
    * * @param message the channel message to send. - * @return a task completed with all responses from all network components which responded in time. + * @return a future completed with all responses from all target network components. * @throws NullPointerException if the given channel message is null. */ @NonNull CompletableFuture> sendChannelMessageQueryAsync(@NonNull ChannelMessage message); /** - * Sends the given channel message to all of its targets and waits for all responses to be present or the query to - * time out. This method will then peek the first response out of the returned array, or return null if no components - * answered to the request. + * Sends the given channel message as a query and returns a future which waits for target component(s) to respond. + * Only the first response of any target will get sent back to this component. This is in particular useful if there + * is only one target, or you are only expecting one of the target components to respond. The future will be completed + * with the first received response of any target component (possibly null if no target responded). + *

    + * Note: it is not possible for CloudNet to detect when a channel message query response was consumed. Therefore, it + * is crucial that the caller closes the response to prevent memory leaks. Example: + *

    +   * {@code
    +   * ChannelMessage message = ...;
    +   * messenger.sendSingleChannelMessageQueryAsync(message).thenAccept(response -> {
    +   *   if (response != null) {
    +   *     try (response) {
    +   *       // do something with the response
    +   *     }
    +   *   }
    +   * }
    +   * }
    +   * 
    * * @param channelMessage the channel message to send. - * @return a task completed with the first response to the given channel message, can be null if no target responded. + * @return a future completed with the first received response of any target component or null if no target responded. * @throws NullPointerException if the given channel message is null. */ @NonNull diff --git a/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/chunk/ChunkedSessionRegistry.java b/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/chunk/ChunkedSessionRegistry.java index e20501d8c4..73b97cc72c 100644 --- a/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/chunk/ChunkedSessionRegistry.java +++ b/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/chunk/ChunkedSessionRegistry.java @@ -16,13 +16,12 @@ package eu.cloudnetservice.driver.impl.network.chunk; -import eu.cloudnetservice.driver.network.chunk.ChunkSessionInformation; import eu.cloudnetservice.driver.network.chunk.ChunkedPacketHandler; import jakarta.inject.Singleton; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Function; +import java.util.function.Supplier; import lombok.NonNull; /** @@ -59,20 +58,18 @@ public void registerSession(@NonNull UUID sessionId, @NonNull ChunkedPacketHandl /** * Gets the currently active session or creates a new session using the given instance factory. Sessions are unique by - * their session id. Concurrently accessing this method to create a new session will return the same session instance + * their session id. Accessing this method concurrently to create a new session will return the same session instance * to all callers. * - * @param sessionInformation the information of the session to get or create the session of. - * @param sessionFactory the factory to call if no session is currently associated with the session id. + * @param sessionId the id of the session to get the current handler of. + * @param sessionFactory the factory to call if no session is currently associated with the session id. * @return the existing registered or newly created transfer session for the provided session id. - * @throws NullPointerException if the given session information or session factory is null. + * @throws NullPointerException if the given session id or session factory is null. */ public @NonNull ChunkedPacketHandler getOrCreateSession( - @NonNull ChunkSessionInformation sessionInformation, - @NonNull Function sessionFactory + @NonNull UUID sessionId, + @NonNull Supplier sessionFactory ) { - return this.runningSessions.computeIfAbsent( - sessionInformation.sessionUniqueId(), - _ -> sessionFactory.apply(sessionInformation)); + return this.runningSessions.computeIfAbsent(sessionId, _ -> sessionFactory.get()); } } diff --git a/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/chunk/DefaultFileChunkPacketSender.java b/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/chunk/DefaultFileChunkPacketSender.java index 401d50f552..d8da07e644 100644 --- a/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/chunk/DefaultFileChunkPacketSender.java +++ b/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/chunk/DefaultFileChunkPacketSender.java @@ -71,7 +71,6 @@ public DefaultFileChunkPacketSender( if (bytesRead == backingArray.length) { // if the bytes read is the same size as the backing array, then a full chunk of data has been read from the // backing file. this usually indicates that the chunk is not the last chunk in the transfer - this.chunkSessionInformation.transferInformation().acquire(); var chunkPacket = ChunkedPacket.createFullChunk(chunkIndex++, backingArray, this.chunkSessionInformation); this.packetSplitter.accept(chunkPacket); } else { @@ -85,7 +84,7 @@ public DefaultFileChunkPacketSender( // close all allocated resources used for the transfer this.source.close(); - this.chunkSessionInformation.transferInformation().release(); + this.chunkSessionInformation.close(); return TransferStatus.SUCCESS; } diff --git a/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/chunk/DefaultFileChunkedPacketHandler.java b/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/chunk/DefaultFileChunkedPacketHandler.java index 876c7d4c56..d9acfe58ed 100644 --- a/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/chunk/DefaultFileChunkedPacketHandler.java +++ b/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/chunk/DefaultFileChunkedPacketHandler.java @@ -146,6 +146,7 @@ public boolean handleChunkPart(int chunkPosition, @NonNull DataBuf dataBuf) { stream = Files.newInputStream(this.tempFilePath, StandardOpenOption.DELETE_ON_CLOSE); closeStream = this.writeCompleteHandler.handleSessionComplete(this.chunkSessionInformation, stream); } finally { + this.chunkSessionInformation.close(); if (closeStream) { stream.close(); } diff --git a/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/chunk/builder/DefaultChunkedFileQueryBuilder.java b/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/chunk/builder/DefaultChunkedFileQueryBuilder.java index a1adfd196f..9759ca0565 100644 --- a/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/chunk/builder/DefaultChunkedFileQueryBuilder.java +++ b/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/chunk/builder/DefaultChunkedFileQueryBuilder.java @@ -84,7 +84,7 @@ public class DefaultChunkedFileQueryBuilder implements ChunkedFileQueryBuilder { */ @Override public @NonNull ChunkedFileQueryBuilder requestFromNode(@NonNull String nodeId) { - this.dataSource = ChannelMessageTarget.of(ChannelMessageTarget.Type.NODE, nodeId); + this.dataSource = ChannelMessageTarget.node(nodeId); return this; } @@ -93,7 +93,7 @@ public class DefaultChunkedFileQueryBuilder implements ChunkedFileQueryBuilder { */ @Override public @NonNull ChunkedFileQueryBuilder requestFromService(@NonNull String serviceName) { - this.dataSource = ChannelMessageTarget.of(ChannelMessageTarget.Type.SERVICE, serviceName); + this.dataSource = ChannelMessageTarget.service(serviceName); return this; } @@ -141,14 +141,16 @@ public class DefaultChunkedFileQueryBuilder implements ChunkedFileQueryBuilder { return channelMessage .sendSingleQueryAsync() .thenCompose(response -> { - var responseData = response.content(); - if (responseData.readBoolean()) { - // transfer started successfully + try (response) { + var transferStarted = response.content().readBoolean(); + Preconditions.checkState(transferStarted, "chunked data transfer wasn't initiated"); return responseFuture; - } else { - // transfer couldn't be started for some reason + } + }) + .whenComplete((_, thrown) -> { + if (thrown != null) { sessionRegistry.completeSession(sessionId); - throw new IllegalStateException("unable to start chunked data transfer"); + sessionInfo.close(); } }); } diff --git a/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/chunk/network/ChunkedPacketListener.java b/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/chunk/network/ChunkedPacketListener.java index 2f4a389bc8..ba6db71636 100644 --- a/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/chunk/network/ChunkedPacketListener.java +++ b/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/chunk/network/ChunkedPacketListener.java @@ -59,10 +59,21 @@ public void handle(@NonNull NetworkChannel channel, @NonNull Packet packet) thro var chunkIndex = packetContent.readInt(); // get or create a new local session for the transfer - var sessionHandler = this.sessionRegistry.getOrCreateSession(sessionInfo, this.handlerFactory); + var holder = new Object() { + boolean factoryCalled = false; + }; + var sessionId = sessionInfo.sessionUniqueId(); + var sessionHandler = this.sessionRegistry.getOrCreateSession(sessionId, () -> { + holder.factoryCalled = true; + return this.handlerFactory.apply(sessionInfo); + }); + if (!holder.factoryCalled) { + sessionInfo.close(); + } + var transferComplete = sessionHandler.handleChunkPart(chunkIndex, packetContent); if (transferComplete) { - this.sessionRegistry.completeSession(sessionInfo.sessionUniqueId()); + this.sessionRegistry.completeSession(sessionId); } } } diff --git a/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/NettyNetworkHandler.java b/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/NettyNetworkHandler.java index cda71edd48..cf1660e381 100644 --- a/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/NettyNetworkHandler.java +++ b/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/NettyNetworkHandler.java @@ -76,9 +76,25 @@ public void channelReadComplete(@NonNull ChannelHandlerContext ctx) { protected void messageReceived(@NonNull ChannelHandlerContext ctx, @NonNull BasePacket msg) { // post directly if the packet has a high priority if (msg.prioritized()) { - this.doHandlePacket(msg); + this.handlePacket(msg); } else { - this.packetDispatcher().execute(() -> this.doHandlePacket(msg)); + this.packetDispatcher().execute(() -> this.handlePacket(msg)); + } + } + + /** + * Handles the incoming packet and posts it either to the associated waiting query handler or directly into the packet + * registry, calling all associated handlers. This method applies exception handling which is not done by + * {@link #doHandlePacket(BasePacket)}. + * + * @param packet the packet hto handle. + * @throws NullPointerException if the given packet is null. + */ + private void handlePacket(@NonNull BasePacket packet) { + try { + this.doHandlePacket(packet); + } catch (Exception exception) { + LOGGER.error("Exception whilst handling packet {}", packet, exception); } } @@ -88,32 +104,34 @@ protected void messageReceived(@NonNull ChannelHandlerContext ctx, @NonNull Base * * @param packet the packet to handle. * @throws NullPointerException if the given packet is null. + * @throws Exception if an exception occurs while handling the given packet. */ - protected void doHandlePacket(@NonNull BasePacket packet) { - try { - var uuid = packet.uniqueId(); - if (uuid != null) { - var task = this.channel.queryPacketManager().waitingHandler(uuid); - if (task != null) { - // complete the waiting task - task.complete(packet); - - // don't post a query response packet to another handler at all - // the packet might be inbound - we might be expected to respond - return; + protected void doHandlePacket(@NonNull BasePacket packet) throws Exception { + var queryId = packet.uniqueId(); + if (queryId != null) { + // the received packet is a query packet, either a response or a request. this only + // handles if the received query message is a response. the packet content should + // not be released here, as the content might be processed async by the handler + var queryFuture = this.channel.queryPacketManager().waitingHandler(queryId); + if (queryFuture != null) { + var didComplete = queryFuture.complete(packet); + if (!didComplete) { + packet.content().release(); } - } - // check if any handler can handle the incoming packet - if (this.channel.handler().handlePacketReceive(this.channel, packet) - && this.channel.packetRegistry().handlePacket(this.channel, packet)) { return; } + } - // release the packet content now, there are no handlers that are accepting the message - packet.content().forceRelease(); - } catch (Exception exception) { - LOGGER.error("Exception whilst handling packet {}", packet, exception); + // post the packet to a packet handler and release the packet content after. a handler + // must acquire the packet content if async processing is being done + try { + var packetHandlingAllowed = this.channel.handler().handlePacketReceive(this.channel, packet); + if (packetHandlingAllowed) { + this.channel.packetRegistry().handlePacket(this.channel, packet); + } + } finally { + packet.content().release(); } } diff --git a/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/NettyUtil.java b/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/NettyUtil.java index 8247052152..02fe1aa2ec 100644 --- a/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/NettyUtil.java +++ b/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/NettyUtil.java @@ -60,13 +60,16 @@ public final class NettyUtil { static { // check if resource leak detection should be enabled for debugging purposes // if that is not the case leak detection will be disabled completely - var enableLeakDetection = Boolean.getBoolean("cloudnet.net.leak-detection-enabled"); - if (enableLeakDetection) { + var devMode = Boolean.getBoolean("cloudnet.dev"); + var leakDetectionEnabledValue = System.getProperty("cloudnet.net.leak-detection-enabled"); + if ("true".equals(leakDetectionEnabledValue) || (devMode && leakDetectionEnabledValue == null)) { ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID); System.setProperty("io.netty5.buffer.leakDetectionEnabled", "true"); + System.setProperty("io.netty5.buffer.lifecycleTracingEnabled", "true"); } else { ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.DISABLED); System.setProperty("io.netty5.buffer.leakDetectionEnabled", "false"); + System.setProperty("io.netty5.buffer.lifecycleTracingEnabled", "false"); } // select the ssl provider to use for netty. this uses the jdk provider in case it was explicitly selected diff --git a/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/buffer/NettyImmutableDataBuf.java b/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/buffer/NettyImmutableDataBuf.java index c3167eb279..3ec199bd2b 100644 --- a/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/buffer/NettyImmutableDataBuf.java +++ b/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/buffer/NettyImmutableDataBuf.java @@ -16,10 +16,13 @@ package eu.cloudnetservice.driver.impl.network.netty.buffer; +import com.google.common.base.Preconditions; import eu.cloudnetservice.driver.impl.network.netty.NettyUtil; import eu.cloudnetservice.driver.impl.network.object.DefaultObjectMapper; import eu.cloudnetservice.driver.network.buffer.DataBuf; import io.netty5.buffer.Buffer; +import io.netty5.buffer.internal.InternalBufferUtils; +import io.netty5.buffer.internal.ResourceSupport; import java.lang.reflect.Type; import java.nio.charset.StandardCharsets; import java.util.UUID; @@ -32,13 +35,10 @@ * * @since 4.0 */ -public class NettyImmutableDataBuf implements DataBuf { +public sealed class NettyImmutableDataBuf implements DataBuf permits NettyMutableDataBuf { protected final Buffer buffer; - // the amount of times this buffer was acquired - protected int acquires = 1; - // transaction offset data protected int readOffset; protected int writeOffset; @@ -47,9 +47,11 @@ public class NettyImmutableDataBuf implements DataBuf { * Constructs a new netty immutable data buf instance. * * @param buffer the netty buffer to wrap. - * @throws NullPointerException if the given buffer is null. + * @throws NullPointerException if the given buffer is null. + * @throws IllegalArgumentException if the given buffer does not extend {@code ResourceSupport}. */ public NettyImmutableDataBuf(@NonNull Buffer buffer) { + Preconditions.checkArgument(buffer instanceof ResourceSupport, "buffer must extend ResourceSupport"); this.buffer = buffer; } @@ -58,7 +60,7 @@ public NettyImmutableDataBuf(@NonNull Buffer buffer) { */ @Override public boolean readBoolean() { - return this.hotRead(Buffer::readBoolean); + return this.buffer.readBoolean(); } /** @@ -66,7 +68,7 @@ public boolean readBoolean() { */ @Override public byte readByte() { - return this.hotRead(Buffer::readByte); + return this.buffer.readByte(); } /** @@ -74,7 +76,7 @@ public byte readByte() { */ @Override public int readInt() { - return this.hotRead(Buffer::readInt); + return this.buffer.readInt(); } /** @@ -82,7 +84,7 @@ public int readInt() { */ @Override public short readShort() { - return this.hotRead(Buffer::readShort); + return this.buffer.readShort(); } /** @@ -90,7 +92,7 @@ public short readShort() { */ @Override public long readLong() { - return this.hotRead(Buffer::readLong); + return this.buffer.readLong(); } /** @@ -98,7 +100,7 @@ public long readLong() { */ @Override public float readFloat() { - return this.hotRead(Buffer::readFloat); + return this.buffer.readFloat(); } /** @@ -106,7 +108,7 @@ public float readFloat() { */ @Override public double readDouble() { - return this.hotRead(Buffer::readDouble); + return this.buffer.readDouble(); } /** @@ -114,7 +116,7 @@ public double readDouble() { */ @Override public char readChar() { - return this.hotRead(Buffer::readChar); + return this.buffer.readChar(); } /** @@ -122,11 +124,10 @@ public char readChar() { */ @Override public byte[] readByteArray() { - return this.hotRead(buf -> { - var bytes = new byte[NettyUtil.readVarInt(buf)]; - buf.readBytes(bytes, 0, bytes.length); - return bytes; - }); + var buf = this.buffer; + var bytes = new byte[NettyUtil.readVarInt(buf)]; + buf.readBytes(bytes, 0, bytes.length); + return bytes; } /** @@ -151,15 +152,11 @@ public byte[] readByteArray() { */ @Override public @NonNull DataBuf readDataBuf() { - return this.hotRead(buf -> { - // copy out the data - var length = NettyUtil.readVarInt(buf); - var content = new NettyImmutableDataBuf(buf.copy(buf.readerOffset(), length)); - - // skip the amount of bytes we're read and return the content - buf.skipReadableBytes(length); - return content; - }); + var buf = this.buffer; + var length = NettyUtil.readVarInt(buf); + var content = new NettyImmutableDataBuf(buf.copy(buf.readerOffset(), length)); + buf.skipReadableBytes(length); + return content; } /** @@ -167,11 +164,10 @@ public byte[] readByteArray() { */ @Override public byte[] toByteArray() { - return this.hotRead(buf -> { - var bytes = new byte[buf.readableBytes()]; - buf.readBytes(bytes, 0, bytes.length); - return bytes; - }); + var buf = this.buffer; + var bytes = new byte[buf.readableBytes()]; + buf.readBytes(bytes, 0, bytes.length); + return bytes; } /** @@ -222,7 +218,6 @@ public int readableBytes() { public @NonNull DataBuf startTransaction() { this.readOffset = this.buffer.readerOffset(); this.writeOffset = this.buffer.writerOffset(); - return this; } @@ -232,7 +227,6 @@ public int readableBytes() { @Override public @NonNull DataBuf redoTransaction() { this.buffer.readerOffset(this.readOffset); - // we can only set the writer offset if the backing buffer is not read-only if (!this.buffer.readOnly()) { this.buffer.writerOffset(this.writeOffset); } @@ -245,8 +239,8 @@ public int readableBytes() { */ @Override public @NonNull DataBuf.Mutable asMutable() { - // we need to copy the underlying buffer when the wrapped one is read only, if not we can just use the given buffer - return this.buffer.readOnly() ? new NettyMutableDataBuf(this.buffer.copy()) : new NettyMutableDataBuf(this.buffer); + var buf = this.buffer.copy(false); + return new NettyMutableDataBuf(buf); } /** @@ -262,7 +256,8 @@ public boolean accessible() { */ @Override public int acquires() { - return this.acquires; + var resourceSupport = this.bufferAsResourceSupport(); + return InternalBufferUtils.countBorrows(resourceSupport); } /** @@ -270,7 +265,8 @@ public int acquires() { */ @Override public @NonNull DataBuf acquire() { - this.acquires++; + var resourceSupport = this.bufferAsResourceSupport(); + InternalBufferUtils.acquire(resourceSupport); return this; } @@ -279,12 +275,10 @@ public int acquires() { */ @Override public void release() { - // release one acquire - this.acquires--; - - // check if the buffer is no longer acquired somewhere - if (this.acquires <= 0 && this.buffer.isAccessible()) { + try { this.buffer.close(); + } catch (IllegalStateException _) { + // possible double-free error due to a race, ignore } } @@ -293,12 +287,13 @@ public void release() { */ @Override public void forceRelease() { - // set acquires to 0 to indicate that the buffer was released - this.acquires = 0; - - // actually release the buffer if needed - if (this.buffer.isAccessible()) { - this.buffer.close(); + try { + var buffer = this.buffer; + while (buffer.isAccessible()) { + buffer.close(); + } + } catch (IllegalStateException _) { + // possible double-free error due to a race, ignore } } @@ -310,6 +305,14 @@ public void close() { this.release(); } + /** + * {@inheritDoc} + */ + @Override + public @NonNull String toString() { + return "NettyImmutableDataBuf[buffer=" + this.buffer + "]"; + } + /** * Get the wrapped netty byte buf of this buffer, for internal use only. * @@ -320,21 +323,11 @@ public void close() { } /** - * Reads from this buffer, releasing it when the end of the input has been reached and releasing is enabled to prevent - * memory leaks. + * Gets the backing buffer instance cast to a {@code ResourceSupport} instance. * - * @param reader the function which reads the requested data from the buffer. - * @param the type of data to read. - * @return the data read from the buffer. - * @throws NullPointerException if the given reader function is null. + * @return the backing buffer instance cast to a {@code ResourceSupport} instance. */ - protected @NonNull T hotRead(@NonNull Function reader) { - var result = reader.apply(this.buffer); - if (this.buffer.readableBytes() <= 0) { - // try to release the buffer in case the end of the data was reached - this.release(); - } - - return result; + private @NonNull ResourceSupport bufferAsResourceSupport() { + return (ResourceSupport) this.buffer; } } diff --git a/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/buffer/NettyMutableDataBuf.java b/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/buffer/NettyMutableDataBuf.java index a867e13b59..ae7d0d2680 100644 --- a/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/buffer/NettyMutableDataBuf.java +++ b/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/buffer/NettyMutableDataBuf.java @@ -16,6 +16,7 @@ package eu.cloudnetservice.driver.impl.network.netty.buffer; +import com.google.common.base.Preconditions; import eu.cloudnetservice.driver.impl.network.netty.NettyUtil; import eu.cloudnetservice.driver.impl.network.object.DefaultObjectMapper; import eu.cloudnetservice.driver.network.buffer.DataBuf; @@ -31,15 +32,17 @@ * * @since 4.0 */ -public class NettyMutableDataBuf extends NettyImmutableDataBuf implements DataBuf.Mutable { +public final class NettyMutableDataBuf extends NettyImmutableDataBuf implements DataBuf.Mutable { /** * Constructs a new mutable data buf instance. * * @param buffer the netty buffer to wrap. - * @throws NullPointerException if the given buffer is null. + * @throws NullPointerException if the given buffer is null. + * @throws IllegalArgumentException if the given buffer is read-only. */ public NettyMutableDataBuf(@NonNull Buffer buffer) { + Preconditions.checkArgument(!buffer.readOnly(), "buffer must not be read-only"); super(buffer); } @@ -208,4 +211,12 @@ public NettyMutableDataBuf(@NonNull Buffer buffer) { public @NonNull DataBuf asImmutable() { return new NettyImmutableDataBuf(this.buffer); } + + /** + * {@inheritDoc} + */ + @Override + public @NonNull String toString() { + return "NettyMutableDataBuf[buffer=" + this.buffer + "]"; + } } diff --git a/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/codec/NettyPacketDecoder.java b/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/codec/NettyPacketDecoder.java index 3c8ca05c15..a9b46026ce 100644 --- a/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/codec/NettyPacketDecoder.java +++ b/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/netty/codec/NettyPacketDecoder.java @@ -50,12 +50,6 @@ public final class NettyPacketDecoder extends ByteToMessageDecoder { */ @Override protected void decode(@NonNull ChannelHandlerContext ctx, @NonNull Buffer in) { - // validates that the channel associated to this decoder call is still active and actually - // transferred data before beginning to read. - if (!ctx.channel().isActive() || in.readableBytes() <= 0) { - return; - } - try { // read the required base data from the buffer var channel = NettyUtil.readVarInt(in); @@ -64,14 +58,15 @@ protected void decode(@NonNull ChannelHandlerContext ctx, @NonNull Buffer in) { // extract the body var bodyLength = NettyUtil.readVarInt(in); - var body = new NettyImmutableDataBuf(in.copy(in.readerOffset(), bodyLength)); + var bodyBuffer = in.copy(in.readerOffset(), bodyLength); + var body = new NettyImmutableDataBuf(bodyBuffer); in.skipReadableBytes(bodyLength); // construct the packet var packet = new BasePacket(channel, prioritized, body); packet.uniqueId(queryUniqueId); + bodyBuffer.touch(packet.toString()); // hint to the constructed packet for leak debugging - // register the packet for further downstream handling ctx.fireChannelRead(packet); } catch (Exception exception) { LOGGER.error("Exception while decoding packet", exception); diff --git a/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/rpc/listener/RPCPacketListener.java b/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/rpc/listener/RPCPacketListener.java index 8200a1214a..0774e12886 100644 --- a/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/rpc/listener/RPCPacketListener.java +++ b/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/rpc/listener/RPCPacketListener.java @@ -63,39 +63,33 @@ public void handle(@NonNull NetworkChannel channel, @NonNull Packet packet) thro var content = packet.content(); var resultExpected = packet.uniqueId() != null; - try { - var rpcDepth = content.readInt(); - if (rpcDepth <= 0) { - // depth must be at least one (single call) or more (chained call) - if (resultExpected) { - var resultContent = DataBufFactory.defaultFactory() - .createWithExpectedSize(1) - .writeByte(RPCInvocationResult.STATUS_BAD_REQUEST) - .writeString("invalid chain length"); - this.sendResponseData(channel, packet, resultContent); - } - return; + var rpcDepth = content.readInt(); + if (rpcDepth <= 0) { + // depth must be at least one (single call) or more (chained call) + if (resultExpected) { + var resultContent = DataBufFactory.defaultFactory() + .createWithExpectedSize(1) + .writeByte(RPCInvocationResult.STATUS_BAD_REQUEST) + .writeString("invalid chain length"); + this.sendResponseData(channel, packet, resultContent); } + return; + } - if (rpcDepth > 1) { - // RPC chain, start executing the first step - this.executeRPCChainStep(rpcDepth, 1, resultExpected, content, packet, channel, null); - } else { - // single method rpc, execute & respond if requested - var targetClassName = content.readString(); - var invocationContext = this.buildContext(content, null); - var handlingTask = this.postRPCRequestToHandler(targetClassName, invocationContext); - if (resultExpected) { - this.waitForInvocationCompletion(handlingTask, result -> { - var resultContent = this.serializeHandlingResult(result); - this.sendResponseData(channel, packet, resultContent); - }); - } + if (rpcDepth > 1) { + // RPC chain, start executing the first step + this.executeRPCChainStep(rpcDepth, 1, resultExpected, content, packet, channel, null); + } else { + // single method rpc, execute & respond if requested + var targetClassName = content.readString(); + var invocationContext = this.buildContext(content, null); + var handlingTask = this.postRPCRequestToHandler(targetClassName, invocationContext); + if (resultExpected) { + this.waitForInvocationCompletion(handlingTask, result -> { + var resultContent = this.serializeHandlingResult(result); + this.sendResponseData(channel, packet, resultContent); + }); } - } finally { - // specifically release the buffer here to prevent memory leaks, especially if we didn't consume - // the whole buffer content (for example due to an exception during handling) - content.forceRelease(); } } @@ -112,10 +106,8 @@ private void waitForInvocationCompletion( @NonNull Consumer callback ) { if (invocationTask == null) { - // nothing to wait for callback.accept(null); } else { - // wait for the completion of the method invocationTask.whenComplete((result, _) -> callback.accept(result)); } } @@ -239,7 +231,7 @@ private void sendResponseData(@NonNull NetworkChannel channel, @NonNull Packet r * @return the result of the method invocation, or null if no handler for the given class is registered. * @throws NullPointerException if either the given class name or invocation context is null. */ - // note: do not change this method name, it's used by RPCExceptionUtil.serializeHandlingException + // impl note: do not change this method name, it's used by RPCExceptionUtil.serializeHandlingException // to determine where the internal handling frame cutoff should be private @Nullable CompletableFuture postRPCRequestToHandler( @NonNull String targetClassName, diff --git a/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/rpc/rpc/RPCResultMapper.java b/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/rpc/rpc/RPCResultMapper.java index 4d8d5185f8..7fa18c9562 100644 --- a/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/rpc/rpc/RPCResultMapper.java +++ b/driver/impl/src/main/java/eu/cloudnetservice/driver/impl/network/rpc/rpc/RPCResultMapper.java @@ -44,27 +44,31 @@ record RPCResultMapper( @Override public @UnknownNullability T apply(@UnknownNullability Packet response) { var responseData = response.content(); - var status = responseData.readByte(); - return switch (status) { - case RPCInvocationResult.STATUS_OK -> this.objectMapper.readObject(responseData, this.expectedResultType); - case RPCInvocationResult.STATUS_ERROR -> { - RPCExceptionUtil.rethrowHandlingException(responseData); - yield null; // never reached, but must be there for the compiler to be happy - } - case RPCInvocationResult.STATUS_BAD_REQUEST -> { - var detailMessage = responseData.readString(); - var exceptionMessage = String.format("RPC couldn't be processed due to bad input data: %s", detailMessage); - throw new RPCExecutionException(exceptionMessage); - } - case RPCInvocationResult.STATUS_SERVER_ERROR -> { - var detailMessage = responseData.readString(); - var exceptionMessage = String.format("RPC couldn't be processed due to a server error: %s", detailMessage); - throw new RPCExecutionException(exceptionMessage); - } - default -> { - var exceptionMessage = String.format("Server responded with unknown status code: %d", status); - throw new RPCExecutionException(exceptionMessage); - } - }; + try { + var status = responseData.readByte(); + return switch (status) { + case RPCInvocationResult.STATUS_OK -> this.objectMapper.readObject(responseData, this.expectedResultType); + case RPCInvocationResult.STATUS_ERROR -> { + RPCExceptionUtil.rethrowHandlingException(responseData); + yield null; // never reached, but must be there for the compiler to be happy + } + case RPCInvocationResult.STATUS_BAD_REQUEST -> { + var detailMessage = responseData.readString(); + var exceptionMessage = String.format("RPC couldn't be processed due to bad input data: %s", detailMessage); + throw new RPCExecutionException(exceptionMessage); + } + case RPCInvocationResult.STATUS_SERVER_ERROR -> { + var detailMessage = responseData.readString(); + var exceptionMessage = String.format("RPC couldn't be processed due to a server error: %s", detailMessage); + throw new RPCExecutionException(exceptionMessage); + } + default -> { + var exceptionMessage = String.format("Server responded with unknown status code: %d", status); + throw new RPCExecutionException(exceptionMessage); + } + }; + } finally { + responseData.forceRelease(); + } } } diff --git a/modules/bridge/impl/src/main/java/eu/cloudnetservice/modules/bridge/impl/node/player/NodePlayerExecutor.java b/modules/bridge/impl/src/main/java/eu/cloudnetservice/modules/bridge/impl/node/player/NodePlayerExecutor.java index 3a30fb4467..89eab03ed8 100644 --- a/modules/bridge/impl/src/main/java/eu/cloudnetservice/modules/bridge/impl/node/player/NodePlayerExecutor.java +++ b/modules/bridge/impl/src/main/java/eu/cloudnetservice/modules/bridge/impl/node/player/NodePlayerExecutor.java @@ -153,9 +153,9 @@ public void spoofCommandExecution(@NonNull String command, boolean redirectToSer if (this.targetUniqueId.equals(GLOBAL_UNIQUE_ID)) { // target all proxies if this is the global executor message = ChannelMessage.builder() - .targetEnvironment(ServiceEnvironmentType.VELOCITY) - .targetEnvironment(ServiceEnvironmentType.BUNGEECORD) - .targetEnvironment(ServiceEnvironmentType.WATERDOG_PE); + .targetServicesOfEnvironment(ServiceEnvironmentType.VELOCITY.name()) + .targetServicesOfEnvironment(ServiceEnvironmentType.BUNGEECORD.name()) + .targetServicesOfEnvironment(ServiceEnvironmentType.WATERDOG_PE.name()); } else { // get the player associated with this provider //noinspection ConstantConditions - This can never be null here (only for the global unique id which is handeled already) diff --git a/modules/bridge/impl/src/main/java/eu/cloudnetservice/modules/bridge/impl/platform/helper/ProxyPlatformHelper.java b/modules/bridge/impl/src/main/java/eu/cloudnetservice/modules/bridge/impl/platform/helper/ProxyPlatformHelper.java index 710ecfab54..96f7a8993b 100644 --- a/modules/bridge/impl/src/main/java/eu/cloudnetservice/modules/bridge/impl/platform/helper/ProxyPlatformHelper.java +++ b/modules/bridge/impl/src/main/java/eu/cloudnetservice/modules/bridge/impl/platform/helper/ProxyPlatformHelper.java @@ -44,15 +44,20 @@ public ProxyPlatformHelper(@NonNull ComponentInfo componentInfo) { public @NonNull LocalPlayerPreLoginEvent.Result sendChannelMessagePreLogin( @NonNull NetworkPlayerProxyInfo playerInfo ) { - var result = this.toCurrentNode() + var response = this.toCurrentNode() .message("proxy_player_pre_login") .channel(BridgeManagement.BRIDGE_PLAYER_CHANNEL_NAME) .buffer(DataBuf.empty().writeObject(playerInfo)) .build() .sendSingleQuery(); - return result == null - ? LocalPlayerPreLoginEvent.Result.allowed() - : result.content().readObject(LocalPlayerPreLoginEvent.Result.class); + return switch (response) { + case null -> LocalPlayerPreLoginEvent.Result.allowed(); + case ChannelMessage channelMessage -> { + try (channelMessage) { + yield channelMessage.content().readObject(LocalPlayerPreLoginEvent.Result.class); + } + } + }; } public void sendChannelMessageLoginSuccess( @@ -85,7 +90,8 @@ public void sendChannelMessageDisconnected(@NonNull UUID playerUniqueId) { .send(); } - @NonNull ChannelMessage.Builder toCurrentNode() { + @NonNull + ChannelMessage.Builder toCurrentNode() { return ChannelMessage.builder().targetNode(this.componentInfo.nodeUniqueId()); } } diff --git a/modules/npcs/impl/src/main/java/eu/cloudnetservice/modules/npc/impl/platform/PlatformNPCManagement.java b/modules/npcs/impl/src/main/java/eu/cloudnetservice/modules/npc/impl/platform/PlatformNPCManagement.java index 882b866c7d..791722a274 100644 --- a/modules/npcs/impl/src/main/java/eu/cloudnetservice/modules/npc/impl/platform/PlatformNPCManagement.java +++ b/modules/npcs/impl/src/main/java/eu/cloudnetservice/modules/npc/impl/platform/PlatformNPCManagement.java @@ -18,7 +18,6 @@ import eu.cloudnetservice.driver.ComponentInfo; import eu.cloudnetservice.driver.channel.ChannelMessage; -import eu.cloudnetservice.driver.channel.ChannelMessageTarget; import eu.cloudnetservice.driver.event.EventManager; import eu.cloudnetservice.driver.network.buffer.DataBuf; import eu.cloudnetservice.driver.provider.CloudServiceProvider; @@ -34,8 +33,8 @@ import eu.cloudnetservice.modules.npc.platform.PlatformSelectorEntity; import eu.cloudnetservice.wrapper.configuration.WrapperConfiguration; import java.util.Collection; -import java.util.Collections; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; @@ -89,14 +88,22 @@ public PlatformNPCManagement( .targetNode(componentInfo.nodeUniqueId()) .build() .sendSingleQuery(); - return response == null ? null : response.content().readObject(NPCConfiguration.class); + return switch (response) { + case null -> null; + case ChannelMessage channelMessage -> { + try (channelMessage) { + yield channelMessage.content().readObject(NPCConfiguration.class); + } + } + }; } @Override public void createNPC(@NonNull NPC npc) { this.channelMessage(NPC_CREATE) .buffer(DataBuf.empty().writeObject(npc)) - .build().send(); + .build() + .send(); } @Override @@ -110,24 +117,48 @@ public void deleteNPC(@NonNull WorldPosition position) { public int deleteAllNPCs(@NonNull String group) { var response = this.channelMessage(NPC_BULK_DELETE) .buffer(DataBuf.empty().writeString(group)) - .build().sendSingleQuery(); - return response == null ? 0 : response.content().readInt(); + .build() + .sendSingleQuery(); + return switch (response) { + case null -> 0; + case ChannelMessage channelMessage -> { + try (channelMessage) { + yield channelMessage.content().readInt(); + } + } + }; } @Override public int deleteAllNPCs() { var response = this.channelMessage(NPC_ALL_DELETE) .buffer(DataBuf.empty().writeObject(this.npcs.keySet())) - .build().sendSingleQuery(); - return response == null ? 0 : response.content().readInt(); + .build() + .sendSingleQuery(); + return switch (response) { + case null -> 0; + case ChannelMessage channelMessage -> { + try (channelMessage) { + yield channelMessage.content().readInt(); + } + } + }; } @Override public @NonNull Collection npcs(@NonNull Collection groups) { var response = this.channelMessage(NPC_GET_NPCS_BY_GROUP) .buffer(DataBuf.empty().writeObject(groups)) - .build().sendSingleQuery(); - return response == null ? Collections.emptySet() : response.content().readObject(NPC.COLLECTION_NPC); + .build() + .sendSingleQuery(); + return switch (response) { + case null -> Set.of(); + case ChannelMessage channelMessage -> { + try (channelMessage) { + yield channelMessage.content().readObject(NPC.COLLECTION_NPC); + } + } + }; } @Override @@ -186,7 +217,7 @@ public void handleInternalNPCConfigUpdate(@NonNull NPCConfiguration configuratio return ChannelMessage.builder() .channel(NPC_CHANNEL_NAME) .message(message) - .target(ChannelMessageTarget.Type.NODE, this.componentInfo.nodeUniqueId()); + .targetNode(this.componentInfo.nodeUniqueId()); } public void initialize() { diff --git a/modules/signs/impl/src/main/java/eu/cloudnetservice/modules/signs/impl/platform/PlatformSignManagement.java b/modules/signs/impl/src/main/java/eu/cloudnetservice/modules/signs/impl/platform/PlatformSignManagement.java index 237ae8f49f..608798b33a 100644 --- a/modules/signs/impl/src/main/java/eu/cloudnetservice/modules/signs/impl/platform/PlatformSignManagement.java +++ b/modules/signs/impl/src/main/java/eu/cloudnetservice/modules/signs/impl/platform/PlatformSignManagement.java @@ -20,7 +20,6 @@ import static eu.cloudnetservice.driver.service.ServiceEnvironmentType.PE_SERVER; import eu.cloudnetservice.driver.channel.ChannelMessage; -import eu.cloudnetservice.driver.channel.ChannelMessageTarget; import eu.cloudnetservice.driver.event.EventManager; import eu.cloudnetservice.driver.network.buffer.DataBuf; import eu.cloudnetservice.driver.provider.CloudServiceProvider; @@ -108,7 +107,14 @@ protected PlatformSignManagement( .targetNode(wrapperConfig.serviceConfiguration().serviceId().nodeUniqueId()) .build() .sendSingleQuery(); - return response == null ? null : response.content().readObject(SignsConfiguration.class); + return switch (response) { + case null -> null; + case ChannelMessage channelMessage -> { + try (channelMessage) { + yield channelMessage.content().readObject(SignsConfiguration.class); + } + } + }; } @Override @@ -129,8 +135,16 @@ public void deleteSign(@NonNull WorldPosition position) { public int deleteAllSigns(@NonNull String group, @Nullable String templatePath) { var response = this.channelMessage(SIGN_BULK_DELETE) .buffer(DataBuf.empty().writeString(group).writeNullable(templatePath, DataBuf.Mutable::writeString)) - .build().sendSingleQuery(); - return response == null ? 0 : response.content().readInt(); + .build() + .sendSingleQuery(); + return switch (response) { + case null -> 0; + case ChannelMessage channelMessage -> { + try (channelMessage) { + yield channelMessage.content().readInt(); + } + } + }; } @Override @@ -147,7 +161,14 @@ public int deleteAllSigns() { .buffer(DataBuf.empty().writeObject(groups)) .build() .sendSingleQuery(); - return response == null ? Set.of() : response.content().readObject(Sign.COLLECTION_TYPE); + return switch (response) { + case null -> Set.of(); + case ChannelMessage channelMessage -> { + try (channelMessage) { + yield channelMessage.content().readObject(Sign.COLLECTION_TYPE); + } + } + }; } @Override @@ -188,8 +209,8 @@ public void handleInternalSignRemove(@NonNull WorldPosition position) { @Override protected @NonNull ChannelMessage.Builder channelMessage(@NonNull String message) { - return super.channelMessage(message) - .target(ChannelMessageTarget.Type.NODE, this.wrapperConfig.serviceConfiguration().serviceId().nodeUniqueId()); + var owningNodeId = this.wrapperConfig.serviceConfiguration().serviceId().nodeUniqueId(); + return super.channelMessage(message).targetNode(owningNodeId); } public int removeAllMissingSigns() { diff --git a/modules/syncproxy/api/src/main/java/eu/cloudnetservice/modules/syncproxy/config/SyncProxyConfiguration.java b/modules/syncproxy/api/src/main/java/eu/cloudnetservice/modules/syncproxy/config/SyncProxyConfiguration.java index 30a4d5b69e..9f1330a669 100644 --- a/modules/syncproxy/api/src/main/java/eu/cloudnetservice/modules/syncproxy/config/SyncProxyConfiguration.java +++ b/modules/syncproxy/api/src/main/java/eu/cloudnetservice/modules/syncproxy/config/SyncProxyConfiguration.java @@ -92,9 +92,10 @@ public record SyncProxyConfiguration( .targetNode(nodeUniqueId) .build() .sendSingleQuery(); - if (response != null) { - return response.content().readObject(SyncProxyConfiguration.class); + try (response) { + return response.content().readObject(SyncProxyConfiguration.class); + } } return null; diff --git a/node/impl/src/main/java/eu/cloudnetservice/node/impl/cluster/defaults/RemoteNodeServer.java b/node/impl/src/main/java/eu/cloudnetservice/node/impl/cluster/defaults/RemoteNodeServer.java index f1873a842a..c7b8e9e43e 100644 --- a/node/impl/src/main/java/eu/cloudnetservice/node/impl/cluster/defaults/RemoteNodeServer.java +++ b/node/impl/src/main/java/eu/cloudnetservice/node/impl/cluster/defaults/RemoteNodeServer.java @@ -240,8 +240,12 @@ public void updateNodeInfoSnapshot(@Nullable NodeInfoSnapshot snapshot) { .buffer(DataBuf.empty().writeString(commandLine)) .build() .sendSingleQueryAsync() - .thenApply(message -> message.content().>readObject(COLLECTION_STRING)) - .exceptionally($ -> Set.of()) + .thenApply(response -> { + try (response) { + return response.content().>readObject(COLLECTION_STRING); + } + }) + .exceptionally(_ -> Set.of()) .join(); } diff --git a/node/impl/src/main/java/eu/cloudnetservice/node/impl/cluster/util/NodeDisconnectHandler.java b/node/impl/src/main/java/eu/cloudnetservice/node/impl/cluster/util/NodeDisconnectHandler.java index f1bd43a85b..e8825b32bd 100644 --- a/node/impl/src/main/java/eu/cloudnetservice/node/impl/cluster/util/NodeDisconnectHandler.java +++ b/node/impl/src/main/java/eu/cloudnetservice/node/impl/cluster/util/NodeDisconnectHandler.java @@ -17,7 +17,6 @@ package eu.cloudnetservice.node.impl.cluster.util; import eu.cloudnetservice.driver.channel.ChannelMessage; -import eu.cloudnetservice.driver.channel.ChannelMessageTarget; import eu.cloudnetservice.driver.event.EventManager; import eu.cloudnetservice.driver.event.events.service.CloudServiceLifecycleChangeEvent; import eu.cloudnetservice.driver.impl.network.NetworkConstants; @@ -61,7 +60,7 @@ public NodeDisconnectHandler( var builder = ChannelMessage.builder(); // iterate over all local services - if the service is connected append it as target for (var service : services) { - builder.target(ChannelMessageTarget.Type.SERVICE, service.serviceId().name()); + builder.targetService(service.serviceId().name()); } // for chaining return builder; diff --git a/node/impl/src/main/java/eu/cloudnetservice/node/impl/network/listener/AuthorizationResponsePacketListener.java b/node/impl/src/main/java/eu/cloudnetservice/node/impl/network/listener/AuthorizationResponsePacketListener.java index 0d8f917cfa..85257a0478 100644 --- a/node/impl/src/main/java/eu/cloudnetservice/node/impl/network/listener/AuthorizationResponsePacketListener.java +++ b/node/impl/src/main/java/eu/cloudnetservice/node/impl/network/listener/AuthorizationResponsePacketListener.java @@ -65,9 +65,10 @@ public AuthorizationResponsePacketListener( @Override public void handle(@NonNull NetworkChannel channel, @NonNull Packet packet) { - // check if the auth was successful - if (packet.content().readBoolean()) { - // search for the node to which the auth succeeded + // for fields an order see AuthorizationResponsePacket + var packetContent = packet.content(); + var isAuthSuccess = packetContent.readBoolean(); + if (isAuthSuccess) { var server = this.configuration.clusterConfig().nodes().stream() .filter(node -> node.listeners().stream().anyMatch(host -> channel.serverAddress().equals(host))) .map(node -> this.nodeServerProvider.node(node.uniqueId())) @@ -75,39 +76,36 @@ public void handle(@NonNull NetworkChannel channel, @NonNull Packet packet) { .findFirst() .orElse(null); if (server != null) { - // check if this was a reconnection from the point of view of the other node - if (packet.content().readBoolean()) { - // handle the data sync - var syncData = packet.content().readDataBuf(); - this.dataSyncRegistry.handle(syncData, syncData.readBoolean()); + var wasReconnect = packetContent.readBoolean(); + if (wasReconnect) { + try (var syncData = packetContent.readDataBuf()) { + var forceApply = syncData.readBoolean(); + this.dataSyncRegistry.handle(syncData, forceApply); + } - // check if there are pending packets for the node + // flush the packets that were queued for the node that reconnected if (server.channel() instanceof QueuedNetworkChannel queuedChannel) { queuedChannel.drainPacketQueue(channel); } - // update the current local snapshot - var local = this.nodeServerProvider.localNode(); - local.updateLocalSnapshot(); + // sync the locally stored cluster data to the node + var localNodeServer = this.nodeServerProvider.localNode(); + localNodeServer.updateLocalSnapshot(); - // acknowledge the packet - var data = this.dataSyncRegistry.prepareClusterData( - true, - DataSyncHandler::alwaysForceApply); - channel.sendPacketSync(new ServiceSyncAckPacket(local.nodeInfoSnapshot(), data)); + var syncData = this.dataSyncRegistry.prepareClusterData(true, DataSyncHandler::alwaysForceApply); + channel.sendPacketSync(new ServiceSyncAckPacket(localNodeServer.nodeInfoSnapshot(), syncData)); - // close the old channel - // little hack to prevent some disconnect handling firring in the channel if the state was not set before + // closes the old channel, preventing disconnection handling by setting the state + // of the channel to 'disconnected' before actually closing the channel server.state(NodeServerState.DISCONNECTED); server.channel().close(); } - // update the node status + + // re-initialize the node data server.channel(channel); server.state(NodeServerState.READY); - // add the packet listeners channel.packetRegistry().removeListeners(NetworkConstants.INTERNAL_AUTHORIZATION_CHANNEL); this.networkUtil.addDefaultPacketListeners(channel.packetRegistry()); - // we are good to go :) return; } } diff --git a/node/impl/src/main/java/eu/cloudnetservice/node/impl/network/listener/ChannelMessagePacketListener.java b/node/impl/src/main/java/eu/cloudnetservice/node/impl/network/listener/ChannelMessagePacketListener.java index 028d82a16c..45248112d2 100644 --- a/node/impl/src/main/java/eu/cloudnetservice/node/impl/network/listener/ChannelMessagePacketListener.java +++ b/node/impl/src/main/java/eu/cloudnetservice/node/impl/network/listener/ChannelMessagePacketListener.java @@ -24,10 +24,11 @@ import eu.cloudnetservice.driver.network.buffer.DataBuf; import eu.cloudnetservice.driver.network.protocol.Packet; import eu.cloudnetservice.driver.network.protocol.PacketListener; -import eu.cloudnetservice.node.impl.provider.NodeMessenger; +import eu.cloudnetservice.node.impl.provider.NodeCloudMessenger; import jakarta.inject.Inject; import jakarta.inject.Singleton; -import java.util.Set; +import java.util.ArrayList; +import java.util.Objects; import java.util.concurrent.TimeUnit; import lombok.NonNull; import org.jetbrains.annotations.Nullable; @@ -39,13 +40,13 @@ public final class ChannelMessagePacketListener implements PacketListener { private static final Logger LOGGER = LoggerFactory.getLogger(ChannelMessagePacketListener.class); - private final NodeMessenger messenger; + private final NodeCloudMessenger messenger; private final EventManager eventManager; private final ComponentInfo componentInfo; @Inject public ChannelMessagePacketListener( - @NonNull NodeMessenger messenger, + @NonNull NodeCloudMessenger messenger, @NonNull EventManager eventManager, @NonNull ComponentInfo componentInfo ) { @@ -56,104 +57,88 @@ public ChannelMessagePacketListener( @Override public void handle(@NonNull NetworkChannel channel, @NonNull Packet packet) { - var comesFromWrapper = packet.content().readBoolean(); - var message = packet.content().readObject(ChannelMessage.class); + // see content and order in ChannelMessagePacket + var isQuery = packet.uniqueId() != null; + var packetContent = packet.content(); + var comesFromWrapper = packetContent.readBoolean(); + var channelMessage = packetContent.readObject(ChannelMessage.class); - // check if we should handle the message locally - var handleLocally = message.targets().stream().anyMatch(target -> switch (target.type()) { + var handleLocally = channelMessage.targets().stream().anyMatch(target -> switch (target.type()) { case ALL -> true; case NODE -> target.name() == null || target.name().equals(this.componentInfo.componentName()); default -> false; }); - if (handleLocally) { - // mark the index of the data buf & call the receive event - message.content().acquire().startTransaction(); - // call the receive event - var responseTask = this.eventManager - .callEvent(new ChannelMessageReceiveEvent(message, channel, packet.uniqueId() != null)) - .queryResponse(); - // reset the index - message.content().redoTransaction(); - - // wait for the response to become available if given before resuming + // message should be handled locally with 2 possible outcomes: + // 1. a response future gets provided to the handler, we then need to wait until the handler finishes + // whatever is required to provide a response and resume handling (redirecting) the message after + // 2. no response future gets provided, this happens when: + // * no listener is registered locally for the message + // * a listener is registered but has no response for the query + // * the channel message is not a query + // in these cases we just resume the message handling instantly + var messageContent = channelMessage.content(); + messageContent.startTransaction(); // to reset after listeners were called + + var event = this.eventManager.callEvent(new ChannelMessageReceiveEvent(channelMessage, channel, isQuery)); + var responseTask = event.queryResponse(); if (responseTask != null) { - responseTask.thenAccept(response -> this.resumeHandling(packet, channel, message, response, comesFromWrapper)); + responseTask + .whenComplete((localResponse, _) -> { + messageContent.redoTransaction(); // redo after processing finished + this.resumeHandling(packet, channel, channelMessage, localResponse, comesFromWrapper); + }) + .exceptionally((thrown) -> { + LOGGER.warn("Exception during async channel message processing of {}", channelMessage, thrown); + return null; + }); return; + } else { + messageContent.redoTransaction(); } } - // resume instantly - this.resumeHandling(packet, channel, message, null, comesFromWrapper); + this.resumeHandling(packet, channel, channelMessage, null, comesFromWrapper); } + /** + * Resumes the processing of a channel message, redirecting the message to other targets in the cluster and sending + * back a response in case the original request was a query. + * + * @param packet the packet that contained the initially handled channel message. + * @param channel the channel on which the initially handled channel message was received. + * @param message the original channel message that was received. + * @param localResponse the query response generated by listeners on the current node. + * @param comesFromWrapper if the handled channel message was sent by a wrapper. + * @throws NullPointerException if the given packet, channel or initial channel message is null. + */ private void resumeHandling( @NonNull Packet packet, @NonNull NetworkChannel channel, @NonNull ChannelMessage message, - @Nullable ChannelMessage initialResponse, + @Nullable ChannelMessage localResponse, boolean comesFromWrapper ) { - // do not redirect the channel message to the cluster to prevent infinite loops - if (packet.uniqueId() != null) { + var isQuery = packet.uniqueId() != null; + if (isQuery) { this.messenger.sendChannelMessageQueryAsync(message, comesFromWrapper) .orTimeout(20, TimeUnit.SECONDS) - .handle((result, exception) -> { - // check if the handling was successful - DataBuf responseContent; - if (exception == null) { - // respond with the result or just the single initial response if given - if (result == null) { - responseContent = initialResponse == null - ? DataBuf.empty().writeBoolean(false) - : DataBuf.empty().writeObject(Set.of(initialResponse)); - } else { - // add the initial response if given before writing - if (initialResponse != null) { - result.add(initialResponse); - } - - // serialize the response - if (result.isEmpty()) { - responseContent = DataBuf.empty().writeBoolean(false); - } else { - responseContent = DataBuf.empty().writeObject(result); - } - } - } else { - // just respond with nothing when an exception was thrown - responseContent = DataBuf.empty().writeBoolean(false); - LOGGER.error("Unable to relay channel message {} into cluster", message, exception); + .whenComplete((responses, _) -> { + responses = Objects.requireNonNullElseGet(responses, ArrayList::new); + if (localResponse != null) { + responses.add(localResponse); } - // send the results to the sender - channel.sendPacket(packet.constructResponse(responseContent)); + var responseBuffer = DataBuf.empty().writeObject(responses); + var responsePacket = packet.constructResponse(responseBuffer); + channel.sendPacket(responsePacket); + }) + .exceptionally(thrown -> { + LOGGER.debug("Exception while forwarding channel message {} into the cluster", message, thrown); return null; - }).whenComplete((_, exception) -> { - // log any internal errors - if (exception != null) { - LOGGER.error("Unable to encode/send response to channel message {}", message, exception); - } - - if (initialResponse != null) { - initialResponse.content().release(); - } }); } else { this.messenger.sendChannelMessage(message, comesFromWrapper); - if (initialResponse != null) { - initialResponse.content().release(); - } - } - - // force release of the current message - // this is an edge case that should not happen, but basically the handlers did not read - // all the content from the buffer, or the buffer simply had no content - // checking if the buffer was acquired only once ensures that no-one acquired the buffer - // during the read process and wants to use the buffer later on - var messageContent = message.content(); - if (messageContent.acquires() == 1) { - messageContent.release(); } } } diff --git a/node/impl/src/main/java/eu/cloudnetservice/node/impl/network/listener/ServiceSyncAckPacketListener.java b/node/impl/src/main/java/eu/cloudnetservice/node/impl/network/listener/ServiceSyncAckPacketListener.java index e58c364970..45a9ce72e4 100644 --- a/node/impl/src/main/java/eu/cloudnetservice/node/impl/network/listener/ServiceSyncAckPacketListener.java +++ b/node/impl/src/main/java/eu/cloudnetservice/node/impl/network/listener/ServiceSyncAckPacketListener.java @@ -46,30 +46,36 @@ public ServiceSyncAckPacketListener( @Override public void handle(@NonNull NetworkChannel channel, @NonNull Packet packet) throws Exception { - // read the cluster node snapshot - var snapshot = packet.content().readObject(NodeInfoSnapshot.class); - var syncData = packet.content().readDataBuf(); - // select the node server and validate that it is in the right state for the packet - var server = this.nodeServerProvider.node(snapshot.node().uniqueId()); - if (server != null && server.state() == NodeServerState.SYNCING) { - // remove this listener - channel.packetRegistry().removeListeners(NetworkConstants.INTERNAL_SERVICE_SYNC_ACK_CHANNEL); - // sync the data between the nodes - this.dataSyncRegistry.handle(syncData, syncData.readBoolean()); - if (server.channel() instanceof QueuedNetworkChannel queuedChannel) { - queuedChannel.drainPacketQueue(channel); - } + try { + var packetContent = packet.content(); + var snapshot = packetContent.readObject(NodeInfoSnapshot.class); + var server = this.nodeServerProvider.node(snapshot.node().uniqueId()); + if (server != null && server.state() == NodeServerState.SYNCING) { + try (var syncData = packet.content().readDataBuf()) { + var forceApply = syncData.readBoolean(); + this.dataSyncRegistry.handle(syncData, forceApply); + } + + // flush the packets that were queued for the node that reconnected + if (server.channel() instanceof QueuedNetworkChannel queuedChannel) { + queuedChannel.drainPacketQueue(channel); + } - // close the old channel - // little hack to prevent some disconnect handling firring in the channel if the state was not set before - server.state(NodeServerState.DISCONNECTED); - server.channel().close(); - // mark the node as ready - server.channel(channel); - server.updateNodeInfoSnapshot(snapshot); - server.state(NodeServerState.READY); - // re-select the head node - this.nodeServerProvider.selectHeadNode(); + // closes the old channel, preventing disconnection handling by setting the state + // of the channel to 'disconnected' before actually closing the channel + server.state(NodeServerState.DISCONNECTED); + server.channel().close(); + + // mark the node as ready and re-select the head node. this ensures that the + // current node uses the same node as the head node as all other nodes in the cluster + server.channel(channel); + server.updateNodeInfoSnapshot(snapshot); + server.state(NodeServerState.READY); + this.nodeServerProvider.selectHeadNode(); + } + } finally { + // the packet is only sent once, this listener can be removed + channel.packetRegistry().removeListeners(NetworkConstants.INTERNAL_SERVICE_SYNC_ACK_CHANNEL); } } } diff --git a/node/impl/src/main/java/eu/cloudnetservice/node/impl/provider/NodeCloudMessenger.java b/node/impl/src/main/java/eu/cloudnetservice/node/impl/provider/NodeCloudMessenger.java new file mode 100644 index 0000000000..57b2347873 --- /dev/null +++ b/node/impl/src/main/java/eu/cloudnetservice/node/impl/provider/NodeCloudMessenger.java @@ -0,0 +1,385 @@ +/* + * Copyright 2019-2024 CloudNetService team & contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package eu.cloudnetservice.node.impl.provider; + +import com.google.common.collect.Iterables; +import dev.derklaro.aerogel.auto.annotation.Provides; +import eu.cloudnetservice.driver.channel.ChannelMessage; +import eu.cloudnetservice.driver.channel.ChannelMessageTarget; +import eu.cloudnetservice.driver.impl.network.standard.ChannelMessagePacket; +import eu.cloudnetservice.driver.network.NetworkChannel; +import eu.cloudnetservice.driver.provider.CloudMessenger; +import eu.cloudnetservice.driver.service.ServiceInfoSnapshot; +import eu.cloudnetservice.node.cluster.NodeServerProvider; +import eu.cloudnetservice.node.impl.service.defaults.provider.EmptySpecificCloudServiceProvider; +import eu.cloudnetservice.node.service.CloudService; +import eu.cloudnetservice.node.service.CloudServiceManager; +import eu.cloudnetservice.utils.base.concurrent.CountingTask; +import eu.cloudnetservice.utils.base.concurrent.TaskUtil; +import io.leangen.geantyref.TypeFactory; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.stream.Collectors; +import lombok.NonNull; +import org.jetbrains.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Messenger implementation that relays channel messages to various targets within the cluster. + * + * @since 4.0 + */ +@Singleton +@Provides(CloudMessenger.class) +public class NodeCloudMessenger implements CloudMessenger { + + protected static final Type CHANNEL_MESSAGE_LIST_TYPE = + TypeFactory.parameterizedClass(List.class, ChannelMessage.class); + protected static final long DEFAULT_QUERY_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(20); + + private static final Logger LOGGER = LoggerFactory.getLogger(NodeCloudMessenger.class); + + protected final NodeServerProvider nodeServerProvider; + protected final CloudServiceManager cloudServiceManager; + + @Inject + public NodeCloudMessenger( + @NonNull NodeServerProvider nodeServerProvider, + @NonNull CloudServiceManager cloudServiceManager + ) { + this.nodeServerProvider = nodeServerProvider; + this.cloudServiceManager = cloudServiceManager; + } + + /** + * {@inheritDoc} + */ + @Override + public void sendChannelMessage(@NonNull ChannelMessage channelMessage) { + this.sendChannelMessage(channelMessage, true); + } + + /** + * {@inheritDoc} + */ + @Override + public @NonNull Collection sendChannelMessageQuery(@NonNull ChannelMessage channelMessage) { + var done = new AtomicBoolean(); + return this.sendChannelMessageQueryAsync(channelMessage, true) + .thenApply(responses -> { + // it might be that the timeout defined in the next step was already hit, therefore + // this method already returned to the caller before a response is received. in this + // case, we release the response we got immediately as it would leak otherwise + var didComplete = done.compareAndSet(false, true); + if (didComplete) { + return responses; + } else { + responses.forEach(ChannelMessage::close); + throw new IllegalStateException("received responses after downstream already completed"); + } + }) + // hack: get a new incomplete future here so that the previous thenApply step runs as well. + // the following orTimeout completes the future it's called on, so the releasing step would never run + .thenApply(Function.identity()) + .orTimeout(DEFAULT_QUERY_TIMEOUT_MS, TimeUnit.MILLISECONDS) + .whenComplete((_, _) -> done.set(true)) + .join(); + } + + /** + * {@inheritDoc} + */ + @Override + public @Nullable ChannelMessage sendSingleChannelMessageQuery(@NonNull ChannelMessage channelMessage) { + var done = new AtomicBoolean(); + return this.sendSingleChannelMessageQueryAsync(channelMessage) + .thenApply(response -> { + // it might be that the timeout defined in the next step was already hit, therefore + // this method already returned to the caller before a response is received. in this + // case, we release the response we got immediately as it would leak otherwise + var didComplete = done.compareAndSet(false, true); + if (didComplete) { + return response; + } else { + response.close(); + throw new IllegalStateException("received response after downstream already completed"); + } + }) + // hack: get a new incomplete future here so that the previous thenApply step runs as well. + // the following orTimeout completes the future it's called on, so the releasing step would never run + .thenApply(Function.identity()) + .orTimeout(DEFAULT_QUERY_TIMEOUT_MS, TimeUnit.MILLISECONDS) + .whenComplete((_, _) -> done.set(true)) + .join(); + } + + /** + * {@inheritDoc} + */ + @Override + public @NonNull CompletableFuture sendChannelMessageAsync(@NonNull ChannelMessage channelMessage) { + return TaskUtil.supplyAsync(() -> { + this.sendChannelMessage(channelMessage); + return null; + }); + } + + /** + * {@inheritDoc} + */ + @NonNull + @Override + public CompletableFuture> sendChannelMessageQueryAsync(@NonNull ChannelMessage message) { + return this.sendChannelMessageQueryAsync(message, true); + } + + /** + * {@inheritDoc} + */ + @NonNull + @Override + public CompletableFuture sendSingleChannelMessageQueryAsync(@NonNull ChannelMessage channelMessage) { + return this.sendChannelMessageQueryAsync(channelMessage).thenApply(responses -> { + var responseCount = responses.size(); + return switch (responseCount) { + case 0 -> null; + case 1 -> Iterables.getOnlyElement(responses); + default -> { + // there were more than one response, so we need to close the + // other responses to prevent leaking their content + var responsesArray = responses.toArray(ChannelMessage[]::new); + for (var index = 1; index < responsesArray.length; index++) { + var response = responsesArray[index]; + response.close(); + } + + yield responsesArray[0]; + } + }; + }); + } + + /** + * Sends the given channel message to all provided targets, without waiting for any response. + * + * @param message the channel message to send. + * @param allowClusterRedirect if other nodes in the cluster are to be considered as targets. + * @throws NullPointerException if the given channel message is null. + */ + public void sendChannelMessage(@NonNull ChannelMessage message, boolean allowClusterRedirect) { + try (var messageContent = message.content()) { + var channels = this.findTargetChannels(message.targets(), allowClusterRedirect); + if (channels.isEmpty()) { + return; + } + + for (var channel : channels) { + messageContent.acquire(); // acquire once as the construct of ChannelMessagePacket releases the content + var packet = new ChannelMessagePacket(message, false); + if (message.sendSync()) { + channel.sendPacketSync(packet); + } else { + channel.sendPacket(packet); + } + } + } + } + + /** + * Sends the given channel message, waiting for a result of each sent message. If one receiver does not respond, the + * returned future only completes after the configured query timeout of the target channel. Therefore, a timout is + * advisable. + * + * @param message the channel message to send. + * @param allowClusterRedirect if other nodes in the cluster are to be considered as targets. + * @throws NullPointerException if the given channel message is null. + */ + public @NonNull CompletableFuture> sendChannelMessageQueryAsync( + @NonNull ChannelMessage message, + boolean allowClusterRedirect + ) { + try (var messageContent = message.content()) { + var channels = this.findTargetChannels(message.targets(), allowClusterRedirect); + if (channels.isEmpty()) { + return CompletableFuture.completedFuture(new ArrayList<>()); + } + + var results = new ArrayList(); + var resultTask = new CountingTask>(results, channels.size()); + for (var channel : channels) { + messageContent.acquire(); // acquire once as the construct of ChannelMessagePacket releases the content + var packet = new ChannelMessagePacket(message, false); + channel.sendQueryAsync(packet) + .thenAccept(result -> { + var resultContent = result.content(); + try { + Collection responses = resultContent.readObject(CHANNEL_MESSAGE_LIST_TYPE); + if (responses != null) { + results.addAll(responses); + } + } finally { + resultContent.forceRelease(); + } + }) + .whenComplete((_, _) -> resultTask.countDown()) + .exceptionally(thrown -> { + LOGGER.debug("Exception while sending/receiving channel message query", thrown); + return null; + }); + } + + return resultTask; + } + } + + /** + * Finds the corresponding channels for the given channel message targets. + * + * @param targets the targets to find the channels for. + * @param allowClusterRedirect if other nodes in the cluster are to be considered as targets. + * @return the resolvable network channels for the given channel message targets. + * @throws NullPointerException if the given targets collection is null. + */ + protected @NonNull Collection findTargetChannels( + @NonNull Collection targets, + boolean allowClusterRedirect + ) { + var targetCount = targets.size(); + return switch (targetCount) { + case 0 -> Set.of(); + case 1 -> { + var target = Iterables.getOnlyElement(targets); + yield this.findTargetChannels(target, allowClusterRedirect); + } + default -> targets.stream() + .flatMap(target -> this.findTargetChannels(target, allowClusterRedirect).stream()) + .collect(Collectors.toUnmodifiableSet()); + }; + } + + /** + * Finds the corresponding network channels for the given channel message target. + * + * @param target the channel message target to find the channels for. + * @param allowClusterRedirect if other nodes in the cluster are to be considered as targets. + * @return the resolvable network channels for the given channel message target. + * @throws NullPointerException if the given channel message target is null. + */ + protected @NonNull Collection findTargetChannels( + @NonNull ChannelMessageTarget target, + boolean allowClusterRedirect + ) { + // special handling for all service targets, as they all need special handling. the first step extracts + // the target service snapshots, the second step finds the network channel to which the message should + // be sent (either the service directly for local services or the owning cluster node) + var messageTargeType = target.type(); + Collection targetServiceSnapshots = switch (messageTargeType) { + case SERVICE -> { + var serviceName = target.name(); + if (serviceName != null) { + var serviceInfo = this.cloudServiceManager.serviceByName(serviceName); + yield serviceInfo == null ? List.of() : List.of(serviceInfo); + } else { + yield this.cloudServiceManager.runningServices(); + } + } + case SERVICES_BY_TASK -> { + var taskName = Objects.requireNonNull(target.name(), "TASK target without name"); + yield this.cloudServiceManager.servicesByTask(taskName); + } + case SERVICES_BY_GROUP -> { + var groupName = Objects.requireNonNull(target.name(), "GROUP target without name"); + yield this.cloudServiceManager.servicesByGroup(groupName); + } + case SERVICES_BY_ENV -> { + var environmentName = Objects.requireNonNull(target.name(), "ENVIRONMENT target without name"); + yield this.cloudServiceManager.servicesByEnvironment(environmentName); + } + case SERVICES_WITH_PROPERTY -> { + var propertyKey = Objects.requireNonNull(target.name(), "PROPERTY target without name"); + yield this.cloudServiceManager.services().stream() + .filter(service -> service.propertyHolder().contains(propertyKey)) + .toList(); + } + default -> null; // not a service target + }; + if (targetServiceSnapshots != null) { + return targetServiceSnapshots.stream() + .map(service -> this.cloudServiceManager.serviceProvider(service.serviceId().uniqueId())) + .map(provider -> provider == EmptySpecificCloudServiceProvider.INSTANCE ? null : provider) + .filter(Objects::nonNull) + .map(provider -> { + if (provider instanceof CloudService cloudService) { + // service running locally on the current node + return cloudService.networkChannel(); + } else if (allowClusterRedirect) { + // service running on a remote node, redirect the message to the network channel of that node + var serviceSnapshot = provider.serviceInfo(); + if (serviceSnapshot != null) { + var nodeId = serviceSnapshot.serviceId().nodeUniqueId(); + var associatedNode = this.nodeServerProvider.node(nodeId); + return associatedNode == null ? null : associatedNode.channel(); + } + } + + return null; + }) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + } + + // targets one or more node in the cluster, only resolve these if allowed + // to redirect the message within the cluster + if (messageTargeType == ChannelMessageTarget.Type.NODE && allowClusterRedirect) { + var nodeId = target.name(); + if (nodeId == null) { + return this.nodeServerProvider.connectedNodeChannels(); + } else { + var nodeServer = this.nodeServerProvider.node(nodeId); + var channel = nodeServer == null ? null : nodeServer.channel(); + return channel == null ? List.of() : List.of(channel); + } + } + + // targets all components in the network, redirect to locally running services and all nodes + if (messageTargeType == ChannelMessageTarget.Type.ALL) { + var targetChannels = this.cloudServiceManager.localCloudServices().stream() + .map(CloudService::networkChannel) + .filter(Objects::nonNull) + .collect(Collectors.toCollection(HashSet::new)); + if (allowClusterRedirect) { + targetChannels.addAll(this.nodeServerProvider.connectedNodeChannels()); + } + + return targetChannels; + } + + return List.of(); // fall-through, not an error case + } +} diff --git a/node/impl/src/main/java/eu/cloudnetservice/node/impl/provider/NodeMessenger.java b/node/impl/src/main/java/eu/cloudnetservice/node/impl/provider/NodeMessenger.java deleted file mode 100644 index bac6dcc5ca..0000000000 --- a/node/impl/src/main/java/eu/cloudnetservice/node/impl/provider/NodeMessenger.java +++ /dev/null @@ -1,303 +0,0 @@ -/* - * Copyright 2019-2024 CloudNetService team & contributors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package eu.cloudnetservice.node.impl.provider; - -import com.google.common.collect.Iterables; -import dev.derklaro.aerogel.auto.annotation.Provides; -import eu.cloudnetservice.driver.channel.ChannelMessage; -import eu.cloudnetservice.driver.channel.ChannelMessageTarget; -import eu.cloudnetservice.driver.impl.network.standard.ChannelMessagePacket; -import eu.cloudnetservice.driver.network.NetworkChannel; -import eu.cloudnetservice.driver.provider.CloudMessenger; -import eu.cloudnetservice.driver.service.ServiceInfoSnapshot; -import eu.cloudnetservice.node.cluster.NodeServerProvider; -import eu.cloudnetservice.node.service.CloudService; -import eu.cloudnetservice.node.service.CloudServiceManager; -import eu.cloudnetservice.utils.base.concurrent.CountingTask; -import eu.cloudnetservice.utils.base.concurrent.TaskUtil; -import io.leangen.geantyref.TypeFactory; -import jakarta.inject.Inject; -import jakarta.inject.Singleton; -import java.lang.reflect.Type; -import java.time.Duration; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; -import lombok.NonNull; -import org.jetbrains.annotations.Nullable; - -@Singleton -@Provides(CloudMessenger.class) -public class NodeMessenger implements CloudMessenger { - - protected static final Type COL_MSG = TypeFactory.parameterizedClass(Collection.class, ChannelMessage.class); - - protected final NodeServerProvider nodeServerProvider; - protected final CloudServiceManager cloudServiceManager; - - @Inject - public NodeMessenger( - @NonNull NodeServerProvider nodeServerProvider, - @NonNull CloudServiceManager cloudServiceManager - ) { - this.nodeServerProvider = nodeServerProvider; - this.cloudServiceManager = cloudServiceManager; - } - - @Override - public void sendChannelMessage(@NonNull ChannelMessage message) { - this.sendChannelMessage(message, true); - } - - @Override - public @NonNull CompletableFuture> sendChannelMessageQueryAsync( - @NonNull ChannelMessage message - ) { - return this.sendChannelMessageQueryAsync(message, true); - } - - @Override - public @NonNull CompletableFuture sendSingleChannelMessageQueryAsync( - @NonNull ChannelMessage channelMessage - ) { - return TaskUtil.supplyAsync(() -> this.sendSingleChannelMessageQuery(channelMessage)); - } - - @Override - public @NonNull Collection sendChannelMessageQuery(@NonNull ChannelMessage channelMessage) { - return TaskUtil.getOrDefault(this.sendChannelMessageQueryAsync(channelMessage), Duration.ofSeconds(20), List.of()); - } - - @Override - public @Nullable ChannelMessage sendSingleChannelMessageQuery(@NonNull ChannelMessage channelMessage) { - return Iterables.getFirst(this.sendChannelMessageQuery(channelMessage), null); - } - - @Override - public @NonNull CompletableFuture sendChannelMessageAsync(@NonNull ChannelMessage channelMessage) { - return TaskUtil.runAsync(() -> this.sendChannelMessage(channelMessage)); - } - - public void sendChannelMessage(@NonNull ChannelMessage message, boolean allowClusterRedirect) { - // find the target channels to send the message to - var channels = this.findChannels(message.targets(), allowClusterRedirect); - for (var channel : channels) { - // acquire the message content for each channel we're sending the message to - // when writing the message content to the target buffer, the message is released - // that means when the message was written to all channels it's released unless someone acquired it before - message.content().acquire(); - - // construct and send the packet - var packet = new ChannelMessagePacket(message, false); - if (message.sendSync()) { - channel.sendPacketSync(packet); - } else { - channel.sendPacket(packet); - } - } - - // release the message now - message.content().release(); - } - - public @NonNull CompletableFuture> sendChannelMessageQueryAsync( - @NonNull ChannelMessage message, - boolean allowClusterRedirect - ) { - // find the target channels to send the message to - var channels = this.findChannels(message.targets(), allowClusterRedirect); - if (channels.isEmpty()) { - // no target channels found, release the message now - message.content().release(); - return TaskUtil.finishedFuture(new HashSet<>()); - } else { - // the result we generate - Set result = new HashSet<>(); - var task = new CountingTask>(result, channels.size()); - - // send the packet to each channel - for (var channel : channels) { - // acquire the message content for each channel we're sending the message to - // when writing the message content to the target buffer, the message is released - // that means when the message was written to all channels it's released unless someone acquired it before - message.content().acquire(); - - channel.sendQueryAsync(new ChannelMessagePacket(message, false)).whenComplete((packet, th) -> { - // check if we got an actual result from the request - if (th == null && packet.readable()) { - // add all resulting messages we got - result.addAll(packet.content().readObject(COL_MSG)); - } - - // count down - one channel responded - task.countDown(); - }); - } - - // release the message now - message.content().release(); - - // return the task on which the user can wait - return task; - } - } - - protected @NonNull Collection findChannels( - @NonNull Collection targets, - boolean allowClusterRedirect - ) { - // check if there is only one channel - if (targets.size() == 1) { - // get the target - we can suppress the nullable warning because we expect the collection to not contain null values - return this.findTargetChannels(Iterables.getOnlyElement(targets), allowClusterRedirect); - } else { - // filter all the channels for the targets - return targets.stream() - .flatMap(target -> this.findTargetChannels(target, allowClusterRedirect).stream()) - .collect(Collectors.toSet()); - } - } - - protected @NonNull Collection findTargetChannels( - @NonNull ChannelMessageTarget target, - boolean allowClusterRedirect - ) { - switch (target.type()) { - // just include all known channels - case ALL -> { - Set result = new HashSet<>(); - // all local services - this.cloudServiceManager.localCloudServices().stream() - .map(CloudService::networkChannel) - .filter(Objects::nonNull) - .forEach(result::add); - // all connected nodes - if (allowClusterRedirect) { - result.addAll(this.nodeServerProvider.connectedNodeChannels()); - } - return result; - } - case NODE -> { - // search for the matching node server - if (allowClusterRedirect) { - // check if a specific node server was selected or all node servers are targeted - if (target.name() == null) { - return this.nodeServerProvider.connectedNodeChannels(); - } - // check if we know the target node server - var server = this.nodeServerProvider.node(target.name()); - return server == null || server.channel() == null - ? Collections.emptySet() - : Collections.singleton(server.channel()); - } else { - // not allowed to redirect the message - return Collections.emptySet(); - } - } - case SERVICE -> { - // check if a specific service was requested - if (target.name() == null) { - // if no specific name is given just get all local channels - var channels = this.cloudServiceManager.localCloudServices().stream() - .map(CloudService::networkChannel) - .filter(Objects::nonNull) - .collect(Collectors.toSet()); - // check if cluster redirect is allowed - add all connected node channels then - if (allowClusterRedirect) { - channels.addAll(this.nodeServerProvider.connectedNodeChannels()); - } - // return here - return channels; - } else { - // check if the service is running locally - use the known channel then - var localService = this.cloudServiceManager.localCloudService(target.name()); - if (localService != null) { - return localService.networkChannel() == null - ? Collections.emptySet() - : Collections.singleton(localService.networkChannel()); - } - } - // check if we are allowed to redirect the message to the node running the service - if (allowClusterRedirect) { - // if no specific service is given just send it to all nodes - if (target.name() == null) { - return this.nodeServerProvider.connectedNodeChannels(); - } - // check if we know the service from the cluster - var service = this.cloudServiceManager.serviceByName(target.name()); - if (service != null) { - // check if we know the target node server to send the channel message to instead - var server = this.nodeServerProvider.node(service.serviceId().nodeUniqueId()); - return server == null || server.channel() == null - ? Collections.emptySet() - : Collections.singleton(server.channel()); - } - } - // unable to retrieve information about the target - just an empty set then - return Collections.emptySet(); - } - case TASK -> { - // lookup all services of the given task - return this.filterChannels( - this.cloudServiceManager.servicesByTask(target.name()), - allowClusterRedirect); - } - case ENVIRONMENT -> { - // lookup all services of the given environment - return this.filterChannels( - this.cloudServiceManager.servicesByEnvironment(target.environment().name()), - allowClusterRedirect); - } - case GROUP -> { - // lookup all services of the given group - return this.filterChannels( - this.cloudServiceManager.servicesByGroup(target.name()), - allowClusterRedirect); - } - default -> throw new IllegalArgumentException("Unhandled ChannelMessageTarget.Type: " + target.type()); - } - } - - protected @NonNull Collection filterChannels( - @NonNull Collection snapshots, - boolean allowClusterRedirect - ) { - return snapshots.stream() - .map(service -> { - // check if the service is running locally - var localService = this.cloudServiceManager.localCloudService(service.serviceId().name()); - if (localService != null) { - return localService.networkChannel(); - } - // check if we are allowed to redirect the message to the node running the service - if (allowClusterRedirect) { - // check if we know the node on which the service is running - var nodeServer = this.nodeServerProvider.node(service.serviceId().nodeUniqueId()); - return nodeServer == null ? null : nodeServer.channel(); - } - // no target found - return null; - }) - .filter(Objects::nonNull) - .collect(Collectors.toSet()); - } -} diff --git a/node/impl/src/main/java/eu/cloudnetservice/node/impl/service/defaults/AbstractService.java b/node/impl/src/main/java/eu/cloudnetservice/node/impl/service/defaults/AbstractService.java index 8ac406612e..a88c4b7371 100644 --- a/node/impl/src/main/java/eu/cloudnetservice/node/impl/service/defaults/AbstractService.java +++ b/node/impl/src/main/java/eu/cloudnetservice/node/impl/service/defaults/AbstractService.java @@ -205,7 +205,14 @@ public boolean valid() { .channel(NetworkConstants.INTERNAL_MSG_CHANNEL) .build() .sendSingleQuery(); - return response == null ? this.currentServiceInfo : response.content().readObject(ServiceInfoSnapshot.class); + return switch (response) { + case null -> this.currentServiceInfo; + case ChannelMessage channelMessage -> { + try (channelMessage) { + yield channelMessage.content().readObject(ServiceInfoSnapshot.class); + } + } + }; } else { return this.currentServiceInfo; } diff --git a/node/impl/src/main/java/eu/cloudnetservice/node/impl/service/defaults/NodeCloudServiceFactory.java b/node/impl/src/main/java/eu/cloudnetservice/node/impl/service/defaults/NodeCloudServiceFactory.java index e3845483e9..a3a901c51b 100644 --- a/node/impl/src/main/java/eu/cloudnetservice/node/impl/service/defaults/NodeCloudServiceFactory.java +++ b/node/impl/src/main/java/eu/cloudnetservice/node/impl/service/defaults/NodeCloudServiceFactory.java @@ -18,7 +18,6 @@ import dev.derklaro.aerogel.auto.annotation.Provides; import eu.cloudnetservice.driver.channel.ChannelMessage; -import eu.cloudnetservice.driver.channel.ChannelMessageTarget; import eu.cloudnetservice.driver.event.EventManager; import eu.cloudnetservice.driver.impl.network.NetworkConstants; import eu.cloudnetservice.driver.network.buffer.DataBuf; @@ -40,7 +39,6 @@ import jakarta.inject.Inject; import jakarta.inject.Singleton; import java.time.Duration; -import java.util.Objects; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; @@ -208,7 +206,7 @@ private void registerServiceChannelListener() { .channel(NetworkConstants.INTERNAL_MSG_CHANNEL) .message("head_node_to_node_finish_service_registration") .buffer(DataBuf.empty().writeUniqueId(serviceUniqueId)) - .target(ChannelMessageTarget.Type.NODE, associatedNode.info().uniqueId()) + .targetNode(associatedNode.info().uniqueId()) .build() .send(); return result; @@ -226,7 +224,7 @@ private void registerServiceChannelListener() { ) { // send a request to the node to start a service var future = ChannelMessage.builder() - .target(ChannelMessageTarget.Type.NODE, targetNode) + .targetNode(targetNode) .message(message) .channel(NetworkConstants.INTERNAL_MSG_CHANNEL) .buffer(DataBuf.empty().writeObject(configuration)) @@ -234,10 +232,16 @@ private void registerServiceChannelListener() { .sendSingleQueryAsync(); var result = TaskUtil.getOrDefault(future, Duration.ofSeconds(20), null); - // read the result service info from the buffer, if the there was no response then we need to fail (only the head - // node should queue start requests) - var createResult = result == null ? null : result.content().readObject(ServiceCreateResult.class); - return Objects.requireNonNullElse(createResult, ServiceCreateResult.FAILED); + // read the result service info from the buffer, if the there was no response then we need + // to fail (only the head node should queue start requests) + return switch (result) { + case null -> ServiceCreateResult.FAILED; + case ChannelMessage channelMessage -> { + try (channelMessage) { + yield channelMessage.content().readObject(ServiceCreateResult.class); + } + } + }; } protected @NonNull ServiceCreateResult scheduleCreateRetryIfEnabled( diff --git a/wrapper-jvm/impl/src/main/java/eu/cloudnetservice/wrapper/impl/network/listener/ChannelMessagePacketListener.java b/wrapper-jvm/impl/src/main/java/eu/cloudnetservice/wrapper/impl/network/listener/ChannelMessagePacketListener.java index 4a5178e354..7c587ef3e0 100644 --- a/wrapper-jvm/impl/src/main/java/eu/cloudnetservice/wrapper/impl/network/listener/ChannelMessagePacketListener.java +++ b/wrapper-jvm/impl/src/main/java/eu/cloudnetservice/wrapper/impl/network/listener/ChannelMessagePacketListener.java @@ -25,12 +25,17 @@ import eu.cloudnetservice.driver.network.protocol.PacketListener; import jakarta.inject.Inject; import jakarta.inject.Singleton; -import java.util.Set; +import java.util.List; import lombok.NonNull; +import org.jetbrains.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @Singleton public final class ChannelMessagePacketListener implements PacketListener { + private static final Logger LOGGER = LoggerFactory.getLogger(ChannelMessagePacketListener.class); + private final EventManager eventManager; @Inject @@ -40,49 +45,52 @@ public ChannelMessagePacketListener(@NonNull EventManager eventManager) { @Override public void handle(@NonNull NetworkChannel channel, @NonNull Packet packet) { - // skip the first boolean (comes from wrapper) in the buffer as we don't need it - packet.content().readBoolean(); - // read the channel message from the buffer - var message = packet.content().readObject(ChannelMessage.class); + // for fields and order see ChannelMessagePacket + var isQuery = packet.uniqueId() != null; + var packetContent = packet.content(); + packetContent.readBoolean(); // skip comesFromWrapper info as it's not relevant + var channelMessage = packet.content().readObject(ChannelMessage.class); // get the query response if available - var response = this.eventManager - .callEvent(new ChannelMessageReceiveEvent(message, channel, packet.uniqueId() != null)) - .queryResponse(); - - // check if we need to respond to the channel message - if (packet.uniqueId() != null) { - // wait for the future if a response was supplied - if (response != null) { - response.whenComplete((queryResponse, throwable) -> { - // respond with nothing if no result was set - if (throwable != null || queryResponse == null) { - channel.sendPacket(packet.constructResponse(DataBuf.empty())); - } else { - // serialize the single response - channel.sendPacket(packet.constructResponse(DataBuf.empty().writeObject(Set.of(queryResponse)))); - } - - // release the message content (do it here so that the async processing still has access to it) - message.content().release(); - }); + var event = this.eventManager.callEvent(new ChannelMessageReceiveEvent(channelMessage, channel, isQuery)); + var responseTask = event.queryResponse(); + if (isQuery) { + // the wrapper has to always respond to channel message queries, + // even if there is no response provided by any listener on the local service + if (responseTask != null) { + responseTask + .whenComplete((_, _) -> channelMessage.content().release()) + .whenComplete((queryResponse, thrown) -> { + this.sendChannelMessageResponse(queryResponse, packet, channel); + if (thrown != null) { + LOGGER.warn("Caught exception while constructing response to channel message {}", channelMessage, thrown); + } + }); + return; } else { - // respond with an empty buffer to indicate the node that there was no result & release the message content - channel.sendPacket(packet.constructResponse(DataBuf.empty())); - message.content().release(); + this.sendChannelMessageResponse(null, packet, channel); } - } else if (response != null) { - // just release the initial response content when available - response.thenAccept(responseMessage -> { - // release both messages - message.content().release(); - if (responseMessage != null) { - responseMessage.content().release(); - } - }); - } else { - // release the message content instantly - message.content().release(); } + + channelMessage.content().release(); + } + + /** + * Sends the given channel message response for a received channel message query. + * + * @param response the response to send, null if no response was constructed. + * @param requestPacket the packet that requested the channel message response. + * @param sourceChannel the channel from which the query was received. + * @throws NullPointerException if the given request packet or source channel is null. + */ + private void sendChannelMessageResponse( + @Nullable ChannelMessage response, + @NonNull Packet requestPacket, + @NonNull NetworkChannel sourceChannel + ) { + var responseData = response == null ? List.of() : List.of(response); + var responseBuffer = DataBuf.empty().writeObject(responseData); + var responsePacket = requestPacket.constructResponse(responseBuffer); + sourceChannel.sendPacket(responsePacket); } } diff --git a/wrapper-jvm/impl/src/main/java/eu/cloudnetservice/wrapper/impl/provider/WrapperMessenger.java b/wrapper-jvm/impl/src/main/java/eu/cloudnetservice/wrapper/impl/provider/WrapperMessenger.java index 90ab29ad57..4e50997de2 100644 --- a/wrapper-jvm/impl/src/main/java/eu/cloudnetservice/wrapper/impl/provider/WrapperMessenger.java +++ b/wrapper-jvm/impl/src/main/java/eu/cloudnetservice/wrapper/impl/provider/WrapperMessenger.java @@ -21,7 +21,6 @@ import eu.cloudnetservice.driver.channel.ChannelMessage; import eu.cloudnetservice.driver.impl.network.standard.ChannelMessagePacket; import eu.cloudnetservice.driver.network.NetworkClient; -import eu.cloudnetservice.driver.network.protocol.Packet; import eu.cloudnetservice.driver.provider.CloudMessenger; import eu.cloudnetservice.utils.base.concurrent.TaskUtil; import io.leangen.geantyref.TypeFactory; @@ -39,7 +38,8 @@ @Provides(CloudMessenger.class) public class WrapperMessenger implements CloudMessenger { - private static final Type MESSAGES = TypeFactory.parameterizedClass(Collection.class, ChannelMessage.class); + private static final Type CHANNEL_MESSAGE_LIST_TYPE = + TypeFactory.parameterizedClass(List.class, ChannelMessage.class); private final NetworkClient networkClient; @@ -48,6 +48,9 @@ public WrapperMessenger(@NonNull NetworkClient networkClient) { this.networkClient = networkClient; } + /** + * {@inheritDoc} + */ @Override public void sendChannelMessage(@NonNull ChannelMessage channelMessage) { if (channelMessage.sendSync()) { @@ -57,34 +60,73 @@ public void sendChannelMessage(@NonNull ChannelMessage channelMessage) { } } + /** + * {@inheritDoc} + */ @Override public @NonNull Collection sendChannelMessageQuery(@NonNull ChannelMessage channelMessage) { return this.sendChannelMessageQueryAsync(channelMessage).join(); } + /** + * {@inheritDoc} + */ @Override public @Nullable ChannelMessage sendSingleChannelMessageQuery(@NonNull ChannelMessage channelMessage) { - return Iterables.getFirst(this.sendChannelMessageQuery(channelMessage), null); + return this.sendSingleChannelMessageQueryAsync(channelMessage).join(); } + /** + * {@inheritDoc} + */ @Override public @NonNull CompletableFuture sendChannelMessageAsync(@NonNull ChannelMessage channelMessage) { return TaskUtil.runAsync(() -> this.sendChannelMessage(channelMessage)); } + /** + * {@inheritDoc} + */ @Override public @NonNull CompletableFuture> sendChannelMessageQueryAsync( @NonNull ChannelMessage message ) { - return this.networkClient.firstChannel().sendQueryAsync(new ChannelMessagePacket(message, true)) - .thenApply(Packet::content) - .thenApply(data -> Objects.requireNonNullElse(data.readObject(MESSAGES), List.of())); + return this.networkClient.firstChannel() + .sendQueryAsync(new ChannelMessagePacket(message, true)) + .thenApply(response -> { + var packetContent = response.content(); + try { + return Objects.requireNonNullElse(packetContent.readObject(CHANNEL_MESSAGE_LIST_TYPE), List.of()); + } finally { + packetContent.forceRelease(); + } + }); } + /** + * {@inheritDoc} + */ @Override public @NonNull CompletableFuture sendSingleChannelMessageQueryAsync( @NonNull ChannelMessage message ) { - return this.sendChannelMessageQueryAsync(message).thenApply(resp -> Iterables.getFirst(resp, null)); + return this.sendChannelMessageQueryAsync(message).thenApply(responses -> { + var responseCount = responses.size(); + return switch (responseCount) { + case 0 -> null; + case 1 -> Iterables.getOnlyElement(responses); + default -> { + // there were more than one response, so we need to close the + // other responses to prevent leaking their content + var responsesArray = responses.toArray(ChannelMessage[]::new); + for (var index = 1; index < responsesArray.length; index++) { + var response = responsesArray[index]; + response.close(); + } + + yield responsesArray[0]; + } + }; + }); } }