Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public InMemoryRpcClient(UTransport transport, LocalUriProvider uriProvider) {

getTransport().registerListener(
UriFactory.ANY,
getUriProvider().getSource(),
Optional.of(getUriProvider().getSource()),
mResponseHandler)
.toCompletableFuture().join();
}
Expand Down Expand Up @@ -125,7 +125,10 @@ public CompletionStage<UPayload> invokeMethod(UUri methodUri, UPayload requestPa
*/
public void close() {
mRequests.clear();
getTransport().unregisterListener(UriFactory.ANY, getUriProvider().getSource(), mResponseHandler);
getTransport().unregisterListener(
UriFactory.ANY,
Optional.of(getUriProvider().getSource()),
mResponseHandler);
}

private void handleResponse(UMessage message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public CompletionStage<Void> registerRequestHandler(UUri originFilter, int resou
return CompletableFuture.failedFuture(new UStatusException(
UCode.ALREADY_EXISTS, "Handler already registered"));
}
return getTransport().registerListener(UriFactory.ANY, method, mRequestHandler)
return getTransport().registerListener(UriFactory.ANY, Optional.of(method), mRequestHandler)
.whenComplete((ok, throwable) -> {
if (throwable != null) {
mRequestsHandlers.remove(method);
Expand Down Expand Up @@ -140,7 +140,7 @@ public CompletionStage<Void> unregisterRequestHandler(
}

if (mRequestsHandlers.remove(method, handler)) {
return getTransport().unregisterListener(UriFactory.ANY, method, mRequestHandler);
return getTransport().unregisterListener(UriFactory.ANY, Optional.of(method), mRequestHandler);
} else {
return CompletableFuture.failedFuture(new UStatusException(
UCode.NOT_FOUND, "Handler not found"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package org.eclipse.uprotocol.communication;

import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

Expand Down Expand Up @@ -64,11 +65,11 @@ public CompletionStage<Void> notify(int resourceId, UUri destination, CallOption

@Override
public CompletionStage<Void> registerNotificationListener(UUri topic, UListener listener) {
return getTransport().registerListener(topic, getUriProvider().getSource(), listener);
return getTransport().registerListener(topic, Optional.of(getUriProvider().getSource()), listener);
}

@Override
public CompletionStage<Void> unregisterNotificationListener(UUri topic, UListener listener) {
return getTransport().unregisterListener(topic, getUriProvider().getSource(), listener);
return getTransport().unregisterListener(topic, Optional.of(getUriProvider().getSource()), listener);
}
}
33 changes: 21 additions & 12 deletions src/main/java/org/eclipse/uprotocol/transport/UTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package org.eclipse.uprotocol.transport;

import java.util.Optional;
import java.util.concurrent.CompletionStage;

import org.eclipse.uprotocol.communication.UStatusException;
Expand All @@ -38,6 +39,7 @@ public interface UTransport {
* {@link UAttributes} contained in the message determine the addressing semantics.
* @return The outcome of the operation. The stage will be completed with a {@link UStatusException} if
* the message could not be sent.
* @throws NullPointerException if the argument is {@code null}.
*/
CompletionStage<Void> send(UMessage message);

Expand All @@ -49,17 +51,19 @@ public interface UTransport {
* <a href="https://github.com/eclipse-uprotocol/up-spec/blob/v1.6.0-alpha.6/basics/uri.adoc">UUri
* specification</a>.
* <p>
* This default implementation invokes {@link #registerListener(UUri, UUri, UListener)} with the
* This default implementation invokes {@link #registerListener(UUri, Optional<UUri>, UListener)} with the
* given source filter and a sink filter of {@link UriFactory#ANY}.
*
* @param sourceFilter The <em>source</em> address pattern that messages need to match.
* Use {@link UriFactory#ANY} to match any source.
* @param listener The listener to invoke. The listener can be unregistered again
* using {@link #unregisterListener(UUri, UUri, UListener)}.
* using {@link #unregisterListener(UUri, UListener)}.
* @return The outcome of the operation. The stage will be completed with a {@link UStatusException} if
* the listener could not be registered.
* @throws NullPointerException if any of the arguments are {@code null}.
*/
default CompletionStage<Void> registerListener(UUri sourceFilter, UListener listener) {
return registerListener(sourceFilter, UriFactory.ANY, listener);
return registerListener(sourceFilter, Optional.of(UriFactory.ANY), listener);
}

/**
Expand All @@ -71,43 +75,48 @@ default CompletionStage<Void> registerListener(UUri sourceFilter, UListener list
* specification</a>.
*
* @param sourceFilter The <em>source</em> address pattern that messages need to match.
* Use {@link UriFactory#ANY} to match any source.
* @param sinkFilter The <em>sink</em> address pattern that messages need to match.
* Use {@link UriFactory#ANY} to match any sink. Use {@link Optional#empty()} to match only messages without a sink.
* @param listener The listener to invoke. The listener can be unregistered again
* using {@link #unregisterListener(UUri, UUri, UListener)}.
* using {@link #unregisterListener(UUri, Optional<UUri>, UListener)}.
* @return The outcome of the operation. The stage will be completed with a {@link UStatusException} if
* the listener could not be registered.
* @throws NullPointerException if any of the arguments are {@code null}.
*/
CompletionStage<Void> registerListener(UUri sourceFilter, UUri sinkFilter, UListener listener);
CompletionStage<Void> registerListener(UUri sourceFilter, Optional<UUri> sinkFilter, UListener listener);

/**
* Unregisters a message listener.
* Unregisters a previously {@link #registerListener(UUri, UListener) registered} message listener.
* <p>
* The listener will no longer be called for any (matching) messages after this function has
* The listener will no longer be called for any (matching) messages after this method has
* returned successfully.
* <p>
* This default implementation invokes {@link #unregisterListener(UUri, UUri, UListener)} with the
* This default implementation invokes {@link #unregisterListener(UUri, Optional<UUri>, UListener)} with the
* given source filter and a sink filter of {@link UriFactory#ANY}.
*
* @param sourceFilter The <em>source</em> address pattern that the listener had been registered for.
* @param listener The listener to unregister.
* @return The outcome of the operation. The stage will be completed with a {@link UStatusException} if
* the listener could not be unregistered.
* @throws NullPointerException if any of the arguments are {@code null}.
*/
default CompletionStage<Void> unregisterListener(UUri sourceFilter, UListener listener) {
return unregisterListener(sourceFilter, UriFactory.ANY, listener);
return unregisterListener(sourceFilter, Optional.of(UriFactory.ANY), listener);
}

/**
* Unregisters a message listener.
* Unregisters a previously {@link #registerListener(UUri, Optional<UUri>, UListener) registered} message listener.
* <p>
* The listener will no longer be called for any (matching) messages after this function has
* The listener will no longer be called for any (matching) messages after this method has
* returned successfully.
*
* @param sourceFilter The <em>source</em> address pattern that the listener had been registered for.
* @param sinkFilter The <em>sink</em> address pattern that the listener had been registered for.
* @param listener The listener to unregister.
* @return The outcome of the operation. The stage will be completed with a {@link UStatusException} if
* the listener could not be unregistered.
* @throws NullPointerException if any of the arguments are {@code null}.
*/
CompletionStage<Void> unregisterListener(UUri sourceFilter, UUri sinkFilter, UListener listener);
CompletionStage<Void> unregisterListener(UUri sourceFilter, Optional<UUri> sinkFilter, UListener listener);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
import java.util.Objects;
import java.util.Optional;

import org.eclipse.uprotocol.communication.UStatusException;
import org.eclipse.uprotocol.transport.UTransport;
import org.eclipse.uprotocol.uri.factory.UriFactory;
import org.eclipse.uprotocol.v1.UCode;
import org.eclipse.uprotocol.v1.UUri;

/**
Expand Down Expand Up @@ -270,4 +273,48 @@ public static boolean hasWildcard(UUri uri) {
hasWildcardEntityVersion(uri) ||
hasWildcardResourceId(uri);
}

/**
* Verifies that given uProtocol URIs can be used as source and sink filter URIs
* for registering listeners.
* <p>
* This function is helpful for implementing {@link UTransport} in accordance with the
* uProtocol Transport Layer specification.
*
* @param sourceFilter The source filter URI to verify.
* @param sinkFilter The optional sink filter URI to verify.
* @throws UStatusException if the given URIs cannot be used as filter criteria.
*/
public static void verifyFilterCriteria(UUri sourceFilter, Optional<UUri> sinkFilter) {
sinkFilter.ifPresentOrElse(
filter -> {
if (isNotificationDestination(filter) && isNotificationDestination(sourceFilter)) {
throw new UStatusException(
UCode.INVALID_ARGUMENT,
"source and sink filters must not both have resource ID 0");
}
if (isRpcMethod(filter)
&& !hasWildcardResourceId(sourceFilter)
&& !isRpcResponse(sourceFilter)) {
throw new UStatusException(
UCode.INVALID_ARGUMENT,
"""
source filter must either have the wildcard resource ID or resource ID 0, \
if sink filter matches RPC method resource ID
""");
}
},
() -> {
if (!hasWildcardResourceId(sourceFilter) && !isTopic(sourceFilter)) {
throw new UStatusException(
UCode.INVALID_ARGUMENT,
"""
source filter must either have the wildcard resource ID or a resource ID from topic range, \
if sink filter is empty
""");
} // no sink filter provided
}
);
// everything else might match valid messages
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation
* SPDX-FileCopyrightText: 2025 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import org.eclipse.uprotocol.transport.LocalUriProvider;
Expand Down Expand Up @@ -54,11 +55,12 @@ class CommunicationLayerClientTestBase {
protected ArgumentCaptor<UMessage> requestMessage;

@BeforeEach
@SuppressWarnings("unchecked")
void setUpTransport() {
transport = mock(UTransport.class);
Mockito.lenient().when(transport.registerListener(any(UUri.class), any(UUri.class), any(UListener.class)))
Mockito.lenient().when(transport.registerListener(any(UUri.class), any(Optional.class), any(UListener.class)))
.thenReturn(CompletableFuture.completedFuture(null));
Mockito.lenient().when(transport.unregisterListener(any(UUri.class), any(UUri.class), any(UListener.class)))
Mockito.lenient().when(transport.unregisterListener(any(UUri.class), any(Optional.class), any(UListener.class)))
.thenReturn(CompletableFuture.completedFuture(null));
Mockito.lenient().when(transport.send(any(UMessage.class)))
.thenReturn(CompletableFuture.completedFuture(null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,12 @@ private static Stream<Arguments> callOptionsAndPayloadProvider() {

@ParameterizedTest(name = "Test successful RPC, sending and receiving a payload: {index} - {arguments}")
@MethodSource("callOptionsAndPayloadProvider")
@SuppressWarnings("unchecked")
void testInvokeMethodWithPayloadSucceeds(CallOptions options, UPayload payload, UCode responseStatus) {
RpcClient rpcClient = new InMemoryRpcClient(transport, uriProvider);

var response = rpcClient.invokeMethod(METHOD_URI, payload, options);
verify(transport).registerListener(any(UUri.class), any(UUri.class), responseListener.capture());
verify(transport).registerListener(any(UUri.class), any(Optional.class), responseListener.capture());
verify(transport).send(requestMessage.capture());
assertEquals(payload.data(), requestMessage.getValue().getPayload());
assertMessageHasOptions(options, requestMessage.getValue());
Expand Down Expand Up @@ -137,10 +138,11 @@ void testInvokeMethodFailsForTransportError() {

@Test
@DisplayName("Test unsuccessful RPC, with service returning error")
@SuppressWarnings("unchecked")
void testInvokeMethodFailsForErroneousServiceInvocation() {
RpcClient rpcClient = new InMemoryRpcClient(transport, uriProvider);
var response = rpcClient.invokeMethod(METHOD_URI, UPayload.EMPTY, CallOptions.DEFAULT);
verify(transport).registerListener(any(UUri.class), any(UUri.class), responseListener.capture());
verify(transport).registerListener(any(UUri.class), any(Optional.class), responseListener.capture());
verify(transport).send(requestMessage.capture());
assertEquals(UPayload.EMPTY.data(), requestMessage.getValue().getPayload());
assertMessageHasOptions(CallOptions.DEFAULT, requestMessage.getValue());
Expand Down Expand Up @@ -168,10 +170,11 @@ static Stream<Arguments> unexpectedMessageHandlerProvider() {

@ParameterizedTest(name = "Test client handles unexpected incoming messages: {index} - {arguments}")
@MethodSource("unexpectedMessageHandlerProvider")
@SuppressWarnings("unchecked")
void testHandleUnexpectedResponse(Consumer<UMessage> unexpectedMessageHandler) {
var rpcClient = new InMemoryRpcClient(transport, uriProvider);
Optional.ofNullable(unexpectedMessageHandler).ifPresent(rpcClient::setUnexpectedMessageHandler);
verify(transport).registerListener(any(UUri.class), any(UUri.class), responseListener.capture());
verify(transport).registerListener(any(UUri.class), any(Optional.class), responseListener.capture());

// send an arbitrary request
rpcClient.invokeMethod(METHOD_URI, UPayload.EMPTY, CallOptions.DEFAULT);
Expand Down Expand Up @@ -214,8 +217,14 @@ void testHandleUnexpectedResponse(Consumer<UMessage> unexpectedMessageHandler) {
@DisplayName("Test close() unregisters the response listener from the transport")
void testCloseUnregistersResponseListenerFromTransport() {
InMemoryRpcClient rpcClient = new InMemoryRpcClient(transport, uriProvider);
verify(transport).registerListener(eq(UriFactory.ANY), eq(TRANSPORT_SOURCE), responseListener.capture());
verify(transport).registerListener(
eq(UriFactory.ANY),
eq(Optional.of(TRANSPORT_SOURCE)),
responseListener.capture());
rpcClient.close();
verify(transport).unregisterListener(eq(UriFactory.ANY), eq(TRANSPORT_SOURCE), eq(responseListener.getValue()));
verify(transport).unregisterListener(
eq(UriFactory.ANY),
eq(Optional.of(TRANSPORT_SOURCE)),
eq(responseListener.getValue()));
}
}
Loading