diff --git a/src/main/java/org/eclipse/uprotocol/communication/InMemoryRpcClient.java b/src/main/java/org/eclipse/uprotocol/communication/InMemoryRpcClient.java index 18d3699b..f5898363 100644 --- a/src/main/java/org/eclipse/uprotocol/communication/InMemoryRpcClient.java +++ b/src/main/java/org/eclipse/uprotocol/communication/InMemoryRpcClient.java @@ -68,7 +68,7 @@ public InMemoryRpcClient(UTransport transport, LocalUriProvider uriProvider) { getTransport().registerListener( UriFactory.ANY, - getUriProvider().getSource(), + Optional.of(getUriProvider().getSource()), mResponseHandler) .toCompletableFuture().join(); } @@ -125,7 +125,10 @@ public CompletionStage invokeMethod(UUri methodUri, UPayload requestPa */ public void close() { mRequests.clear(); - getTransport().unregisterListener(UriFactory.ANY, getUriProvider().getSource(), mResponseHandler); + getTransport().unregisterListener( + UriFactory.ANY, + Optional.of(getUriProvider().getSource()), + mResponseHandler); } private void handleResponse(UMessage message) { diff --git a/src/main/java/org/eclipse/uprotocol/communication/InMemoryRpcServer.java b/src/main/java/org/eclipse/uprotocol/communication/InMemoryRpcServer.java index 3978d29a..6c341e2e 100644 --- a/src/main/java/org/eclipse/uprotocol/communication/InMemoryRpcServer.java +++ b/src/main/java/org/eclipse/uprotocol/communication/InMemoryRpcServer.java @@ -112,7 +112,7 @@ public CompletionStage registerRequestHandler(UUri originFilter, int resou return CompletableFuture.failedFuture(new UStatusException( UCode.ALREADY_EXISTS, "Handler already registered")); } - return getTransport().registerListener(UriFactory.ANY, method, mRequestHandler) + return getTransport().registerListener(UriFactory.ANY, Optional.of(method), mRequestHandler) .whenComplete((ok, throwable) -> { if (throwable != null) { mRequestsHandlers.remove(method); @@ -140,7 +140,7 @@ public CompletionStage unregisterRequestHandler( } if (mRequestsHandlers.remove(method, handler)) { - return getTransport().unregisterListener(UriFactory.ANY, method, mRequestHandler); + return getTransport().unregisterListener(UriFactory.ANY, Optional.of(method), mRequestHandler); } else { return CompletableFuture.failedFuture(new UStatusException( UCode.NOT_FOUND, "Handler not found")); diff --git a/src/main/java/org/eclipse/uprotocol/communication/SimpleNotifier.java b/src/main/java/org/eclipse/uprotocol/communication/SimpleNotifier.java index d0d7d9b5..5890314b 100644 --- a/src/main/java/org/eclipse/uprotocol/communication/SimpleNotifier.java +++ b/src/main/java/org/eclipse/uprotocol/communication/SimpleNotifier.java @@ -13,6 +13,7 @@ package org.eclipse.uprotocol.communication; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -64,11 +65,11 @@ public CompletionStage notify(int resourceId, UUri destination, CallOption @Override public CompletionStage registerNotificationListener(UUri topic, UListener listener) { - return getTransport().registerListener(topic, getUriProvider().getSource(), listener); + return getTransport().registerListener(topic, Optional.of(getUriProvider().getSource()), listener); } @Override public CompletionStage unregisterNotificationListener(UUri topic, UListener listener) { - return getTransport().unregisterListener(topic, getUriProvider().getSource(), listener); + return getTransport().unregisterListener(topic, Optional.of(getUriProvider().getSource()), listener); } } diff --git a/src/main/java/org/eclipse/uprotocol/transport/UTransport.java b/src/main/java/org/eclipse/uprotocol/transport/UTransport.java index e5846aab..50841d24 100644 --- a/src/main/java/org/eclipse/uprotocol/transport/UTransport.java +++ b/src/main/java/org/eclipse/uprotocol/transport/UTransport.java @@ -12,6 +12,7 @@ */ package org.eclipse.uprotocol.transport; +import java.util.Optional; import java.util.concurrent.CompletionStage; import org.eclipse.uprotocol.communication.UStatusException; @@ -38,6 +39,7 @@ public interface UTransport { * {@link UAttributes} contained in the message determine the addressing semantics. * @return The outcome of the operation. The stage will be completed with a {@link UStatusException} if * the message could not be sent. + * @throws NullPointerException if the argument is {@code null}. */ CompletionStage send(UMessage message); @@ -49,17 +51,19 @@ public interface UTransport { * UUri * specification. *

- * This default implementation invokes {@link #registerListener(UUri, UUri, UListener)} with the + * This default implementation invokes {@link #registerListener(UUri, Optional, UListener)} with the * given source filter and a sink filter of {@link UriFactory#ANY}. * * @param sourceFilter The source address pattern that messages need to match. + * Use {@link UriFactory#ANY} to match any source. * @param listener The listener to invoke. The listener can be unregistered again - * using {@link #unregisterListener(UUri, UUri, UListener)}. + * using {@link #unregisterListener(UUri, UListener)}. * @return The outcome of the operation. The stage will be completed with a {@link UStatusException} if * the listener could not be registered. + * @throws NullPointerException if any of the arguments are {@code null}. */ default CompletionStage registerListener(UUri sourceFilter, UListener listener) { - return registerListener(sourceFilter, UriFactory.ANY, listener); + return registerListener(sourceFilter, Optional.of(UriFactory.ANY), listener); } /** @@ -71,36 +75,40 @@ default CompletionStage registerListener(UUri sourceFilter, UListener list * specification. * * @param sourceFilter The source address pattern that messages need to match. + * Use {@link UriFactory#ANY} to match any source. * @param sinkFilter The sink address pattern that messages need to match. + * Use {@link UriFactory#ANY} to match any sink. Use {@link Optional#empty()} to match only messages without a sink. * @param listener The listener to invoke. The listener can be unregistered again - * using {@link #unregisterListener(UUri, UUri, UListener)}. + * using {@link #unregisterListener(UUri, Optional, UListener)}. * @return The outcome of the operation. The stage will be completed with a {@link UStatusException} if * the listener could not be registered. + * @throws NullPointerException if any of the arguments are {@code null}. */ - CompletionStage registerListener(UUri sourceFilter, UUri sinkFilter, UListener listener); + CompletionStage registerListener(UUri sourceFilter, Optional sinkFilter, UListener listener); /** - * Unregisters a message listener. + * Unregisters a previously {@link #registerListener(UUri, UListener) registered} message listener. *

- * The listener will no longer be called for any (matching) messages after this function has + * The listener will no longer be called for any (matching) messages after this method has * returned successfully. *

- * This default implementation invokes {@link #unregisterListener(UUri, UUri, UListener)} with the + * This default implementation invokes {@link #unregisterListener(UUri, Optional, UListener)} with the * given source filter and a sink filter of {@link UriFactory#ANY}. * * @param sourceFilter The source address pattern that the listener had been registered for. * @param listener The listener to unregister. * @return The outcome of the operation. The stage will be completed with a {@link UStatusException} if * the listener could not be unregistered. + * @throws NullPointerException if any of the arguments are {@code null}. */ default CompletionStage unregisterListener(UUri sourceFilter, UListener listener) { - return unregisterListener(sourceFilter, UriFactory.ANY, listener); + return unregisterListener(sourceFilter, Optional.of(UriFactory.ANY), listener); } /** - * Unregisters a message listener. + * Unregisters a previously {@link #registerListener(UUri, Optional, UListener) registered} message listener. *

- * The listener will no longer be called for any (matching) messages after this function has + * The listener will no longer be called for any (matching) messages after this method has * returned successfully. * * @param sourceFilter The source address pattern that the listener had been registered for. @@ -108,6 +116,7 @@ default CompletionStage unregisterListener(UUri sourceFilter, UListener li * @param listener The listener to unregister. * @return The outcome of the operation. The stage will be completed with a {@link UStatusException} if * the listener could not be unregistered. + * @throws NullPointerException if any of the arguments are {@code null}. */ - CompletionStage unregisterListener(UUri sourceFilter, UUri sinkFilter, UListener listener); + CompletionStage unregisterListener(UUri sourceFilter, Optional sinkFilter, UListener listener); } diff --git a/src/main/java/org/eclipse/uprotocol/uri/validator/UriValidator.java b/src/main/java/org/eclipse/uprotocol/uri/validator/UriValidator.java index 1b766761..5f0af76f 100644 --- a/src/main/java/org/eclipse/uprotocol/uri/validator/UriValidator.java +++ b/src/main/java/org/eclipse/uprotocol/uri/validator/UriValidator.java @@ -17,7 +17,10 @@ import java.util.Objects; import java.util.Optional; +import org.eclipse.uprotocol.communication.UStatusException; +import org.eclipse.uprotocol.transport.UTransport; import org.eclipse.uprotocol.uri.factory.UriFactory; +import org.eclipse.uprotocol.v1.UCode; import org.eclipse.uprotocol.v1.UUri; /** @@ -270,4 +273,48 @@ public static boolean hasWildcard(UUri uri) { hasWildcardEntityVersion(uri) || hasWildcardResourceId(uri); } + + /** + * Verifies that given uProtocol URIs can be used as source and sink filter URIs + * for registering listeners. + *

+ * This function is helpful for implementing {@link UTransport} in accordance with the + * uProtocol Transport Layer specification. + * + * @param sourceFilter The source filter URI to verify. + * @param sinkFilter The optional sink filter URI to verify. + * @throws UStatusException if the given URIs cannot be used as filter criteria. + */ + public static void verifyFilterCriteria(UUri sourceFilter, Optional sinkFilter) { + sinkFilter.ifPresentOrElse( + filter -> { + if (isNotificationDestination(filter) && isNotificationDestination(sourceFilter)) { + throw new UStatusException( + UCode.INVALID_ARGUMENT, + "source and sink filters must not both have resource ID 0"); + } + if (isRpcMethod(filter) + && !hasWildcardResourceId(sourceFilter) + && !isRpcResponse(sourceFilter)) { + throw new UStatusException( + UCode.INVALID_ARGUMENT, + """ + source filter must either have the wildcard resource ID or resource ID 0, \ + if sink filter matches RPC method resource ID + """); + } + }, + () -> { + if (!hasWildcardResourceId(sourceFilter) && !isTopic(sourceFilter)) { + throw new UStatusException( + UCode.INVALID_ARGUMENT, + """ + source filter must either have the wildcard resource ID or a resource ID from topic range, \ + if sink filter is empty + """); + } // no sink filter provided + } + ); + // everything else might match valid messages + } } diff --git a/src/main/java/org/eclipse/uprotocol/validation/ValidationUtils.java b/src/main/java/org/eclipse/uprotocol/validation/ValidationUtils.java index 0c84bb57..51e31896 100644 --- a/src/main/java/org/eclipse/uprotocol/validation/ValidationUtils.java +++ b/src/main/java/org/eclipse/uprotocol/validation/ValidationUtils.java @@ -1,5 +1,5 @@ /** - * SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + * SPDX-FileCopyrightText: 2025 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. diff --git a/src/test/java/org/eclipse/uprotocol/communication/CommunicationLayerClientTestBase.java b/src/test/java/org/eclipse/uprotocol/communication/CommunicationLayerClientTestBase.java index 848538df..4fc17638 100644 --- a/src/test/java/org/eclipse/uprotocol/communication/CommunicationLayerClientTestBase.java +++ b/src/test/java/org/eclipse/uprotocol/communication/CommunicationLayerClientTestBase.java @@ -15,6 +15,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import org.eclipse.uprotocol.transport.LocalUriProvider; @@ -54,11 +55,12 @@ class CommunicationLayerClientTestBase { protected ArgumentCaptor requestMessage; @BeforeEach + @SuppressWarnings("unchecked") void setUpTransport() { transport = mock(UTransport.class); - Mockito.lenient().when(transport.registerListener(any(UUri.class), any(UUri.class), any(UListener.class))) + Mockito.lenient().when(transport.registerListener(any(UUri.class), any(Optional.class), any(UListener.class))) .thenReturn(CompletableFuture.completedFuture(null)); - Mockito.lenient().when(transport.unregisterListener(any(UUri.class), any(UUri.class), any(UListener.class))) + Mockito.lenient().when(transport.unregisterListener(any(UUri.class), any(Optional.class), any(UListener.class))) .thenReturn(CompletableFuture.completedFuture(null)); Mockito.lenient().when(transport.send(any(UMessage.class))) .thenReturn(CompletableFuture.completedFuture(null)); diff --git a/src/test/java/org/eclipse/uprotocol/communication/InMemoryRpcClientTest.java b/src/test/java/org/eclipse/uprotocol/communication/InMemoryRpcClientTest.java index 4bd562c3..bed348ef 100644 --- a/src/test/java/org/eclipse/uprotocol/communication/InMemoryRpcClientTest.java +++ b/src/test/java/org/eclipse/uprotocol/communication/InMemoryRpcClientTest.java @@ -77,11 +77,12 @@ private static Stream callOptionsAndPayloadProvider() { @ParameterizedTest(name = "Test successful RPC, sending and receiving a payload: {index} - {arguments}") @MethodSource("callOptionsAndPayloadProvider") + @SuppressWarnings("unchecked") void testInvokeMethodWithPayloadSucceeds(CallOptions options, UPayload payload, UCode responseStatus) { RpcClient rpcClient = new InMemoryRpcClient(transport, uriProvider); var response = rpcClient.invokeMethod(METHOD_URI, payload, options); - verify(transport).registerListener(any(UUri.class), any(UUri.class), responseListener.capture()); + verify(transport).registerListener(any(UUri.class), any(Optional.class), responseListener.capture()); verify(transport).send(requestMessage.capture()); assertEquals(payload.data(), requestMessage.getValue().getPayload()); assertMessageHasOptions(options, requestMessage.getValue()); @@ -137,10 +138,11 @@ void testInvokeMethodFailsForTransportError() { @Test @DisplayName("Test unsuccessful RPC, with service returning error") + @SuppressWarnings("unchecked") void testInvokeMethodFailsForErroneousServiceInvocation() { RpcClient rpcClient = new InMemoryRpcClient(transport, uriProvider); var response = rpcClient.invokeMethod(METHOD_URI, UPayload.EMPTY, CallOptions.DEFAULT); - verify(transport).registerListener(any(UUri.class), any(UUri.class), responseListener.capture()); + verify(transport).registerListener(any(UUri.class), any(Optional.class), responseListener.capture()); verify(transport).send(requestMessage.capture()); assertEquals(UPayload.EMPTY.data(), requestMessage.getValue().getPayload()); assertMessageHasOptions(CallOptions.DEFAULT, requestMessage.getValue()); @@ -168,10 +170,11 @@ static Stream unexpectedMessageHandlerProvider() { @ParameterizedTest(name = "Test client handles unexpected incoming messages: {index} - {arguments}") @MethodSource("unexpectedMessageHandlerProvider") + @SuppressWarnings("unchecked") void testHandleUnexpectedResponse(Consumer unexpectedMessageHandler) { var rpcClient = new InMemoryRpcClient(transport, uriProvider); Optional.ofNullable(unexpectedMessageHandler).ifPresent(rpcClient::setUnexpectedMessageHandler); - verify(transport).registerListener(any(UUri.class), any(UUri.class), responseListener.capture()); + verify(transport).registerListener(any(UUri.class), any(Optional.class), responseListener.capture()); // send an arbitrary request rpcClient.invokeMethod(METHOD_URI, UPayload.EMPTY, CallOptions.DEFAULT); @@ -214,8 +217,14 @@ void testHandleUnexpectedResponse(Consumer unexpectedMessageHandler) { @DisplayName("Test close() unregisters the response listener from the transport") void testCloseUnregistersResponseListenerFromTransport() { InMemoryRpcClient rpcClient = new InMemoryRpcClient(transport, uriProvider); - verify(transport).registerListener(eq(UriFactory.ANY), eq(TRANSPORT_SOURCE), responseListener.capture()); + verify(transport).registerListener( + eq(UriFactory.ANY), + eq(Optional.of(TRANSPORT_SOURCE)), + responseListener.capture()); rpcClient.close(); - verify(transport).unregisterListener(eq(UriFactory.ANY), eq(TRANSPORT_SOURCE), eq(responseListener.getValue())); + verify(transport).unregisterListener( + eq(UriFactory.ANY), + eq(Optional.of(TRANSPORT_SOURCE)), + eq(responseListener.getValue())); } } diff --git a/src/test/java/org/eclipse/uprotocol/communication/InMemoryRpcServerTest.java b/src/test/java/org/eclipse/uprotocol/communication/InMemoryRpcServerTest.java index 2798704d..e453a835 100644 --- a/src/test/java/org/eclipse/uprotocol/communication/InMemoryRpcServerTest.java +++ b/src/test/java/org/eclipse/uprotocol/communication/InMemoryRpcServerTest.java @@ -103,7 +103,7 @@ void testRegisterRequestListenerSucceeds() { .join(); verify(transport).registerListener( eq(originFilter), - eq(METHOD_URI), + eq(Optional.of(METHOD_URI)), any(UListener.class)); server.unregisterRequestHandler( @@ -114,7 +114,7 @@ void testRegisterRequestListenerSucceeds() { .join(); verify(transport).unregisterListener( eq(originFilter), - eq(METHOD_URI), + eq(Optional.of(METHOD_URI)), any(UListener.class)); } @@ -132,7 +132,7 @@ void testRegisteringTwiceTheSameRequestHandler() { .join(); verify(transport, times(1)).registerListener( eq(originFilter), - eq(METHOD_URI), + eq(Optional.of(METHOD_URI)), any(UListener.class)); var exception = assertThrows(CompletionException.class, () -> server.registerRequestHandler( @@ -146,6 +146,7 @@ void testRegisteringTwiceTheSameRequestHandler() { @Test @DisplayName("Test unregistering a request handler that wasn't registered already") + @SuppressWarnings("unchecked") void testUnregisterRequestHandlerFailsForUnknownHandler() { final RpcServer server = new InMemoryRpcServer(transport, uriProvider); @@ -156,14 +157,15 @@ void testUnregisterRequestHandlerFailsForUnknownHandler() { assertEquals(UCode.NOT_FOUND, ((UStatusException) exception.getCause()).getCode()); verify(transport, never()).unregisterListener( any(UUri.class), - any(UUri.class), + any(Optional.class), any(UListener.class)); } @Test @DisplayName("Test registering a request handler with unavailable transport fails") + @SuppressWarnings("unchecked") void testRegisteringRequestListenerFailsIfTransportIsUnavailable() { - when(transport.registerListener(any(UUri.class), any(UUri.class), any(UListener.class))) + when(transport.registerListener(any(UUri.class), any(Optional.class), any(UListener.class))) .thenReturn(CompletableFuture.failedFuture(new UStatusException(UCode.UNAVAILABLE, "unavailable"))); RpcServer server = new InMemoryRpcServer(transport, uriProvider); @@ -174,7 +176,7 @@ void testRegisteringRequestListenerFailsIfTransportIsUnavailable() { assertEquals(UCode.UNAVAILABLE, ((UStatusException) exception.getCause()).getCode()); verify(transport, times(1)).registerListener( eq(UriFactory.ANY), - eq(METHOD_URI), + eq(Optional.of(METHOD_URI)), any(UListener.class)); } @@ -203,7 +205,7 @@ void testHandleRequestHandlesException(Exception thrownException, UCode expected server.registerRequestHandler(UriFactory.ANY, METHOD_URI.getResourceId(), handler) .toCompletableFuture().join(); final ArgumentCaptor requestListener = ArgumentCaptor.forClass(UListener.class); - verify(transport).registerListener(eq(UriFactory.ANY), eq(METHOD_URI), requestListener.capture()); + verify(transport).registerListener(eq(UriFactory.ANY), eq(Optional.of(METHOD_URI)), requestListener.capture()); final var request = UMessageBuilder.request(uriProvider.getSource(), METHOD_URI, 5000).build(); requestListener.getValue().onReceive(request); @@ -260,7 +262,7 @@ void testRequestHandlerReportsSendError( server.registerRequestHandler(UriFactory.ANY, METHOD_URI.getResourceId(), handler) .toCompletableFuture().join(); final ArgumentCaptor requestListener = ArgumentCaptor.forClass(UListener.class); - verify(transport).registerListener(eq(UriFactory.ANY), eq(METHOD_URI), requestListener.capture()); + verify(transport).registerListener(eq(UriFactory.ANY), eq(Optional.of(METHOD_URI)), requestListener.capture()); requestListener.getValue().onReceive(request); verify(handler).handleRequest(request); @@ -290,7 +292,7 @@ void testRequestHandlerIgnoresRequestsWithoutHandler(Consumer unexpect server.registerRequestHandler(UriFactory.ANY, METHOD_URI.getResourceId(), handler) .toCompletableFuture().join(); final ArgumentCaptor requestListener = ArgumentCaptor.forClass(UListener.class); - verify(transport).registerListener(eq(UriFactory.ANY), eq(METHOD_URI), requestListener.capture()); + verify(transport).registerListener(eq(UriFactory.ANY), eq(Optional.of(METHOD_URI)), requestListener.capture()); server.unregisterRequestHandler(UriFactory.ANY, METHOD_URI.getResourceId(), handler) .toCompletableFuture().join(); @@ -326,7 +328,7 @@ void testHandleRequestIgnoresNonRequestMessages() { server.registerRequestHandler(UriFactory.ANY, METHOD_URI.getResourceId(), handler) .toCompletableFuture().join(); final ArgumentCaptor requestListener = ArgumentCaptor.forClass(UListener.class); - verify(transport).registerListener(eq(UriFactory.ANY), eq(METHOD_URI), requestListener.capture()); + verify(transport).registerListener(eq(UriFactory.ANY), eq(Optional.of(METHOD_URI)), requestListener.capture()); requestListener.getValue().onReceive(invalidNotification); verify(handler, never()).handleRequest(any(UMessage.class)); @@ -349,7 +351,7 @@ void testHandleRequestSucceedsWithPayload() { server.registerRequestHandler(UriFactory.ANY, METHOD_URI.getResourceId(), handler) .toCompletableFuture().join(); final ArgumentCaptor requestListener = ArgumentCaptor.forClass(UListener.class); - verify(transport).registerListener(eq(UriFactory.ANY), eq(METHOD_URI), requestListener.capture()); + verify(transport).registerListener(eq(UriFactory.ANY), eq(Optional.of(METHOD_URI)), requestListener.capture()); requestListener.getValue().onReceive(request); verify(handler).handleRequest(request); diff --git a/src/test/java/org/eclipse/uprotocol/communication/InMemorySubscriberTest.java b/src/test/java/org/eclipse/uprotocol/communication/InMemorySubscriberTest.java index 5fc6ee97..f7efc365 100644 --- a/src/test/java/org/eclipse/uprotocol/communication/InMemorySubscriberTest.java +++ b/src/test/java/org/eclipse/uprotocol/communication/InMemorySubscriberTest.java @@ -102,15 +102,16 @@ class InMemorySubscriberTest { private UListener listener; @BeforeEach + @SuppressWarnings("unchecked") void setup() { uriProvider = StaticUriProvider.of(SOURCE); transport = mock(UTransport.class); - Mockito.lenient().when(transport.registerListener(any(UUri.class), any(UUri.class), any(UListener.class))) + Mockito.lenient().when(transport.registerListener(any(UUri.class), any(Optional.class), any(UListener.class))) .thenReturn(CompletableFuture.completedFuture(null)); Mockito.lenient().when(transport.registerListener(any(UUri.class), any(UListener.class))) .thenReturn(CompletableFuture.completedFuture(null)); - Mockito.lenient().when(transport.unregisterListener(any(UUri.class), any(UUri.class), any(UListener.class))) + Mockito.lenient().when(transport.unregisterListener(any(UUri.class), any(Optional.class), any(UListener.class))) .thenReturn(CompletableFuture.completedFuture(null)); Mockito.lenient().when(transport.unregisterListener(any(UUri.class), any(UListener.class))) .thenReturn(CompletableFuture.completedFuture(null)); @@ -186,7 +187,7 @@ void testConstructorForTransportAndUriProvider() { SUBSCRIPTION_NOTIFICATION_TOPIC_URI.getAuthorityName()); verify(transport).registerListener( eq(SUBSCRIPTION_NOTIFICATION_TOPIC_URI), - eq(SOURCE), + eq(Optional.of(SOURCE)), any(UListener.class)); } diff --git a/src/test/java/org/eclipse/uprotocol/communication/SimpleNotifierTest.java b/src/test/java/org/eclipse/uprotocol/communication/SimpleNotifierTest.java index cbb4ef3f..7239f911 100644 --- a/src/test/java/org/eclipse/uprotocol/communication/SimpleNotifierTest.java +++ b/src/test/java/org/eclipse/uprotocol/communication/SimpleNotifierTest.java @@ -15,10 +15,12 @@ import static org.junit.Assert.assertThrows; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -103,20 +105,21 @@ void testRegisterListener() { notifier.registerNotificationListener(TOPIC_URI, listener).toCompletableFuture().join(); verify(transport).registerListener( TOPIC_URI, - TRANSPORT_SOURCE, + Optional.of(TRANSPORT_SOURCE), listener); notifier.unregisterNotificationListener(TOPIC_URI, listener).toCompletableFuture().join(); verify(transport).unregisterListener( TOPIC_URI, - TRANSPORT_SOURCE, + Optional.of(TRANSPORT_SOURCE), listener); } @Test @DisplayName("Test unregistering a listener that was not registered") + @SuppressWarnings("unchecked") void testUnregisterListenerNotRegistered() { final var listener = mock(UListener.class); - when(transport.unregisterListener(TOPIC_URI, TRANSPORT_SOURCE, listener)) + when(transport.unregisterListener(any(UUri.class), any(Optional.class), any(UListener.class))) .thenReturn(CompletableFuture.failedFuture( new UStatusException(UCode.NOT_FOUND, "no such listener"))); final var exception = assertThrows(CompletionException.class, () -> { @@ -124,7 +127,7 @@ void testUnregisterListenerNotRegistered() { }); verify(transport).unregisterListener( TOPIC_URI, - TRANSPORT_SOURCE, + Optional.of(TRANSPORT_SOURCE), listener); assertEquals(UCode.NOT_FOUND, ((UStatusException) exception.getCause()).getCode()); } diff --git a/src/test/java/org/eclipse/uprotocol/communication/UClientTest.java b/src/test/java/org/eclipse/uprotocol/communication/UClientTest.java index 1ed752f7..fd6a2c80 100644 --- a/src/test/java/org/eclipse/uprotocol/communication/UClientTest.java +++ b/src/test/java/org/eclipse/uprotocol/communication/UClientTest.java @@ -19,6 +19,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import org.eclipse.uprotocol.transport.StaticUriProvider; @@ -65,11 +66,11 @@ void setUp() { @Test void testFactoryMethod() { var transport = mock(UTransport.class); - when(transport.registerListener(any(UUri.class), any(UUri.class), any(UListener.class))) + when(transport.registerListener(any(UUri.class), any(Optional.class), any(UListener.class))) .thenReturn(CompletableFuture.completedFuture(null)); var uriProvider = StaticUriProvider.of(TRANSPORT_SOURCE); UClient.create(transport, uriProvider); - verify(transport).registerListener(any(UUri.class), eq(TRANSPORT_SOURCE), any(UListener.class)); + verify(transport).registerListener(any(UUri.class), eq(Optional.of(TRANSPORT_SOURCE)), any(UListener.class)); } @Test diff --git a/src/test/java/org/eclipse/uprotocol/transport/UTransportTest.java b/src/test/java/org/eclipse/uprotocol/transport/UTransportTest.java index 32c1a705..5f661240 100644 --- a/src/test/java/org/eclipse/uprotocol/transport/UTransportTest.java +++ b/src/test/java/org/eclipse/uprotocol/transport/UTransportTest.java @@ -12,6 +12,7 @@ */ package org.eclipse.uprotocol.transport; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import static org.mockito.ArgumentMatchers.any; @@ -53,20 +54,22 @@ void setUp() { @Test @DisplayName("Test default implementation of registerListener") + @SuppressWarnings("unchecked") void testRegisterListener() { - when(transport.registerListener(any(UUri.class), any(UUri.class), any(UListener.class))) + when(transport.registerListener(any(UUri.class), any(Optional.class), any(UListener.class))) .thenReturn(CompletableFuture.completedFuture(null)); transport.registerListener(SOURCE_FILTER, listener).toCompletableFuture().join(); - verify(transport).registerListener(eq(SOURCE_FILTER), eq(UriFactory.ANY), eq(listener)); + verify(transport).registerListener(eq(SOURCE_FILTER), eq(Optional.of(UriFactory.ANY)), eq(listener)); } @Test @DisplayName("Test happy path unregister listener") + @SuppressWarnings("unchecked") void testUnregisterListener() { - when(transport.unregisterListener(any(UUri.class), any(UUri.class), any(UListener.class))) + when(transport.unregisterListener(any(UUri.class), any(Optional.class), any(UListener.class))) .thenReturn(CompletableFuture.completedFuture(null)); transport.unregisterListener(SOURCE_FILTER, listener).toCompletableFuture().join(); - verify(transport).unregisterListener(eq(SOURCE_FILTER), eq(UriFactory.ANY), eq(listener)); + verify(transport).unregisterListener(eq(SOURCE_FILTER), eq(Optional.of(UriFactory.ANY)), eq(listener)); } } diff --git a/src/test/java/org/eclipse/uprotocol/transport/validator/UAttributesValidatorTest.java b/src/test/java/org/eclipse/uprotocol/transport/validator/UAttributesValidatorTest.java index 129510a9..24554e5d 100644 --- a/src/test/java/org/eclipse/uprotocol/transport/validator/UAttributesValidatorTest.java +++ b/src/test/java/org/eclipse/uprotocol/transport/validator/UAttributesValidatorTest.java @@ -110,7 +110,7 @@ void testGetValidatorReturnsMatchingValidator( 10000, true 29999, true 30000, true - 30001, false + 40000, false """) void testIsExpired(int ttl, boolean expectedIsExpired) { var now = Instant.now(); diff --git a/src/test/java/org/eclipse/uprotocol/uri/validator/UriValidatorTest.java b/src/test/java/org/eclipse/uprotocol/uri/validator/UriValidatorTest.java index f2e37e6f..6735f244 100644 --- a/src/test/java/org/eclipse/uprotocol/uri/validator/UriValidatorTest.java +++ b/src/test/java/org/eclipse/uprotocol/uri/validator/UriValidatorTest.java @@ -13,17 +13,21 @@ package org.eclipse.uprotocol.uri.validator; +import org.eclipse.uprotocol.communication.UStatusException; import org.eclipse.uprotocol.uri.serializer.UriSerializer; +import org.eclipse.uprotocol.v1.UCode; import org.eclipse.uprotocol.v1.UUri; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; +import static org.junit.Assert.assertEquals; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.Arrays; +import java.util.Optional; class UriValidatorTest { @@ -165,4 +169,34 @@ void testHasWildcard(String uri, boolean shouldSucceed) { assertFalse(UriValidator.hasWildcard(uuri)); } } + + @ParameterizedTest(name = "Test verifyFilterCriteria: {index} {arguments}") + @CsvSource(useHeadersInDisplayName = true, textBlock = """ + source, sink, should fail + //vehicle1/AA/1/FFFF, //vehicle2/BB/1/FFFF, false + //vehicle1/AA/1/9000, //vehicle2/BB/1/0, false + //vehicle1/AA/1/0, //vehicle2/BB/1/1, false + # source and sink both have resource ID 0 + //vehicle1/AA/1/0, //vehicle2/BB/1/0, true + //vehicle1/AA/1/FFFF, //vehicle2/BB/1/1A, false + //vehicle1/AA/1/0, //vehicle2/BB/1/1A, false + # sink is RPC but source has invalid resource ID + //vehicle1/AA/1/CC, //vehicle2/BB/1/1A, true + //vehicle1/AA/1/9000, , false + //vehicle1/AA/1/FFFF, , false + # sink is empty but source has non-topic resource ID + //vehicle1/AA/1/CC, , true + """) + void testVerifyFilterCriteriaFails(String source, String sink, boolean shouldFail) { + var sourceFilter = UriSerializer.deserialize(source); + Optional sinkFilter = sink != null ? Optional.of(UriSerializer.deserialize(sink)) : Optional.empty(); + if (shouldFail) { + UStatusException exception = assertThrows( + UStatusException.class, + () -> UriValidator.verifyFilterCriteria(sourceFilter, sinkFilter)); + assertEquals(UCode.INVALID_ARGUMENT, exception.getCode()); + } else { + assertDoesNotThrow(() -> UriValidator.verifyFilterCriteria(sourceFilter, sinkFilter)); + } + } }