From e3ad5a75243fe5455d3c2ae40623375bf6d8dd4f Mon Sep 17 00:00:00 2001 From: Kai Hudalla Date: Mon, 22 Sep 2025 16:51:54 +0200 Subject: [PATCH] Improve UTransport interface The UTransport interface's methods have been updated to be more explicit about the arguments being passed in. In particular, the methods no longer accept null values for UUri parameters. Also added a helper method for validating filter URIs that are being used for registering listeners. --- .../communication/InMemoryRpcClient.java | 7 ++- .../communication/InMemoryRpcServer.java | 4 +- .../communication/SimpleNotifier.java | 5 +- .../uprotocol/transport/UTransport.java | 33 ++++++++----- .../uprotocol/uri/validator/UriValidator.java | 47 +++++++++++++++++++ .../uprotocol/validation/ValidationUtils.java | 2 +- .../CommunicationLayerClientTestBase.java | 6 ++- .../communication/InMemoryRpcClientTest.java | 19 ++++++-- .../communication/InMemoryRpcServerTest.java | 24 +++++----- .../communication/InMemorySubscriberTest.java | 7 +-- .../communication/SimpleNotifierTest.java | 11 +++-- .../uprotocol/communication/UClientTest.java | 5 +- .../uprotocol/transport/UTransportTest.java | 11 +++-- .../validator/UAttributesValidatorTest.java | 2 +- .../uri/validator/UriValidatorTest.java | 34 ++++++++++++++ 15 files changed, 166 insertions(+), 51 deletions(-) diff --git a/src/main/java/org/eclipse/uprotocol/communication/InMemoryRpcClient.java b/src/main/java/org/eclipse/uprotocol/communication/InMemoryRpcClient.java index 18d3699b..f5898363 100644 --- a/src/main/java/org/eclipse/uprotocol/communication/InMemoryRpcClient.java +++ b/src/main/java/org/eclipse/uprotocol/communication/InMemoryRpcClient.java @@ -68,7 +68,7 @@ public InMemoryRpcClient(UTransport transport, LocalUriProvider uriProvider) { getTransport().registerListener( UriFactory.ANY, - getUriProvider().getSource(), + Optional.of(getUriProvider().getSource()), mResponseHandler) .toCompletableFuture().join(); } @@ -125,7 +125,10 @@ public CompletionStage invokeMethod(UUri methodUri, UPayload requestPa */ public void close() { mRequests.clear(); - getTransport().unregisterListener(UriFactory.ANY, getUriProvider().getSource(), mResponseHandler); + getTransport().unregisterListener( + UriFactory.ANY, + Optional.of(getUriProvider().getSource()), + mResponseHandler); } private void handleResponse(UMessage message) { diff --git a/src/main/java/org/eclipse/uprotocol/communication/InMemoryRpcServer.java b/src/main/java/org/eclipse/uprotocol/communication/InMemoryRpcServer.java index 3978d29a..6c341e2e 100644 --- a/src/main/java/org/eclipse/uprotocol/communication/InMemoryRpcServer.java +++ b/src/main/java/org/eclipse/uprotocol/communication/InMemoryRpcServer.java @@ -112,7 +112,7 @@ public CompletionStage registerRequestHandler(UUri originFilter, int resou return CompletableFuture.failedFuture(new UStatusException( UCode.ALREADY_EXISTS, "Handler already registered")); } - return getTransport().registerListener(UriFactory.ANY, method, mRequestHandler) + return getTransport().registerListener(UriFactory.ANY, Optional.of(method), mRequestHandler) .whenComplete((ok, throwable) -> { if (throwable != null) { mRequestsHandlers.remove(method); @@ -140,7 +140,7 @@ public CompletionStage unregisterRequestHandler( } if (mRequestsHandlers.remove(method, handler)) { - return getTransport().unregisterListener(UriFactory.ANY, method, mRequestHandler); + return getTransport().unregisterListener(UriFactory.ANY, Optional.of(method), mRequestHandler); } else { return CompletableFuture.failedFuture(new UStatusException( UCode.NOT_FOUND, "Handler not found")); diff --git a/src/main/java/org/eclipse/uprotocol/communication/SimpleNotifier.java b/src/main/java/org/eclipse/uprotocol/communication/SimpleNotifier.java index d0d7d9b5..5890314b 100644 --- a/src/main/java/org/eclipse/uprotocol/communication/SimpleNotifier.java +++ b/src/main/java/org/eclipse/uprotocol/communication/SimpleNotifier.java @@ -13,6 +13,7 @@ package org.eclipse.uprotocol.communication; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -64,11 +65,11 @@ public CompletionStage notify(int resourceId, UUri destination, CallOption @Override public CompletionStage registerNotificationListener(UUri topic, UListener listener) { - return getTransport().registerListener(topic, getUriProvider().getSource(), listener); + return getTransport().registerListener(topic, Optional.of(getUriProvider().getSource()), listener); } @Override public CompletionStage unregisterNotificationListener(UUri topic, UListener listener) { - return getTransport().unregisterListener(topic, getUriProvider().getSource(), listener); + return getTransport().unregisterListener(topic, Optional.of(getUriProvider().getSource()), listener); } } diff --git a/src/main/java/org/eclipse/uprotocol/transport/UTransport.java b/src/main/java/org/eclipse/uprotocol/transport/UTransport.java index e5846aab..50841d24 100644 --- a/src/main/java/org/eclipse/uprotocol/transport/UTransport.java +++ b/src/main/java/org/eclipse/uprotocol/transport/UTransport.java @@ -12,6 +12,7 @@ */ package org.eclipse.uprotocol.transport; +import java.util.Optional; import java.util.concurrent.CompletionStage; import org.eclipse.uprotocol.communication.UStatusException; @@ -38,6 +39,7 @@ public interface UTransport { * {@link UAttributes} contained in the message determine the addressing semantics. * @return The outcome of the operation. The stage will be completed with a {@link UStatusException} if * the message could not be sent. + * @throws NullPointerException if the argument is {@code null}. */ CompletionStage send(UMessage message); @@ -49,17 +51,19 @@ public interface UTransport { * UUri * specification. *

- * This default implementation invokes {@link #registerListener(UUri, UUri, UListener)} with the + * This default implementation invokes {@link #registerListener(UUri, Optional, UListener)} with the * given source filter and a sink filter of {@link UriFactory#ANY}. * * @param sourceFilter The source address pattern that messages need to match. + * Use {@link UriFactory#ANY} to match any source. * @param listener The listener to invoke. The listener can be unregistered again - * using {@link #unregisterListener(UUri, UUri, UListener)}. + * using {@link #unregisterListener(UUri, UListener)}. * @return The outcome of the operation. The stage will be completed with a {@link UStatusException} if * the listener could not be registered. + * @throws NullPointerException if any of the arguments are {@code null}. */ default CompletionStage registerListener(UUri sourceFilter, UListener listener) { - return registerListener(sourceFilter, UriFactory.ANY, listener); + return registerListener(sourceFilter, Optional.of(UriFactory.ANY), listener); } /** @@ -71,36 +75,40 @@ default CompletionStage registerListener(UUri sourceFilter, UListener list * specification. * * @param sourceFilter The source address pattern that messages need to match. + * Use {@link UriFactory#ANY} to match any source. * @param sinkFilter The sink address pattern that messages need to match. + * Use {@link UriFactory#ANY} to match any sink. Use {@link Optional#empty()} to match only messages without a sink. * @param listener The listener to invoke. The listener can be unregistered again - * using {@link #unregisterListener(UUri, UUri, UListener)}. + * using {@link #unregisterListener(UUri, Optional, UListener)}. * @return The outcome of the operation. The stage will be completed with a {@link UStatusException} if * the listener could not be registered. + * @throws NullPointerException if any of the arguments are {@code null}. */ - CompletionStage registerListener(UUri sourceFilter, UUri sinkFilter, UListener listener); + CompletionStage registerListener(UUri sourceFilter, Optional sinkFilter, UListener listener); /** - * Unregisters a message listener. + * Unregisters a previously {@link #registerListener(UUri, UListener) registered} message listener. *

- * The listener will no longer be called for any (matching) messages after this function has + * The listener will no longer be called for any (matching) messages after this method has * returned successfully. *

- * This default implementation invokes {@link #unregisterListener(UUri, UUri, UListener)} with the + * This default implementation invokes {@link #unregisterListener(UUri, Optional, UListener)} with the * given source filter and a sink filter of {@link UriFactory#ANY}. * * @param sourceFilter The source address pattern that the listener had been registered for. * @param listener The listener to unregister. * @return The outcome of the operation. The stage will be completed with a {@link UStatusException} if * the listener could not be unregistered. + * @throws NullPointerException if any of the arguments are {@code null}. */ default CompletionStage unregisterListener(UUri sourceFilter, UListener listener) { - return unregisterListener(sourceFilter, UriFactory.ANY, listener); + return unregisterListener(sourceFilter, Optional.of(UriFactory.ANY), listener); } /** - * Unregisters a message listener. + * Unregisters a previously {@link #registerListener(UUri, Optional, UListener) registered} message listener. *

- * The listener will no longer be called for any (matching) messages after this function has + * The listener will no longer be called for any (matching) messages after this method has * returned successfully. * * @param sourceFilter The source address pattern that the listener had been registered for. @@ -108,6 +116,7 @@ default CompletionStage unregisterListener(UUri sourceFilter, UListener li * @param listener The listener to unregister. * @return The outcome of the operation. The stage will be completed with a {@link UStatusException} if * the listener could not be unregistered. + * @throws NullPointerException if any of the arguments are {@code null}. */ - CompletionStage unregisterListener(UUri sourceFilter, UUri sinkFilter, UListener listener); + CompletionStage unregisterListener(UUri sourceFilter, Optional sinkFilter, UListener listener); } diff --git a/src/main/java/org/eclipse/uprotocol/uri/validator/UriValidator.java b/src/main/java/org/eclipse/uprotocol/uri/validator/UriValidator.java index 1b766761..5f0af76f 100644 --- a/src/main/java/org/eclipse/uprotocol/uri/validator/UriValidator.java +++ b/src/main/java/org/eclipse/uprotocol/uri/validator/UriValidator.java @@ -17,7 +17,10 @@ import java.util.Objects; import java.util.Optional; +import org.eclipse.uprotocol.communication.UStatusException; +import org.eclipse.uprotocol.transport.UTransport; import org.eclipse.uprotocol.uri.factory.UriFactory; +import org.eclipse.uprotocol.v1.UCode; import org.eclipse.uprotocol.v1.UUri; /** @@ -270,4 +273,48 @@ public static boolean hasWildcard(UUri uri) { hasWildcardEntityVersion(uri) || hasWildcardResourceId(uri); } + + /** + * Verifies that given uProtocol URIs can be used as source and sink filter URIs + * for registering listeners. + *

+ * This function is helpful for implementing {@link UTransport} in accordance with the + * uProtocol Transport Layer specification. + * + * @param sourceFilter The source filter URI to verify. + * @param sinkFilter The optional sink filter URI to verify. + * @throws UStatusException if the given URIs cannot be used as filter criteria. + */ + public static void verifyFilterCriteria(UUri sourceFilter, Optional sinkFilter) { + sinkFilter.ifPresentOrElse( + filter -> { + if (isNotificationDestination(filter) && isNotificationDestination(sourceFilter)) { + throw new UStatusException( + UCode.INVALID_ARGUMENT, + "source and sink filters must not both have resource ID 0"); + } + if (isRpcMethod(filter) + && !hasWildcardResourceId(sourceFilter) + && !isRpcResponse(sourceFilter)) { + throw new UStatusException( + UCode.INVALID_ARGUMENT, + """ + source filter must either have the wildcard resource ID or resource ID 0, \ + if sink filter matches RPC method resource ID + """); + } + }, + () -> { + if (!hasWildcardResourceId(sourceFilter) && !isTopic(sourceFilter)) { + throw new UStatusException( + UCode.INVALID_ARGUMENT, + """ + source filter must either have the wildcard resource ID or a resource ID from topic range, \ + if sink filter is empty + """); + } // no sink filter provided + } + ); + // everything else might match valid messages + } } diff --git a/src/main/java/org/eclipse/uprotocol/validation/ValidationUtils.java b/src/main/java/org/eclipse/uprotocol/validation/ValidationUtils.java index 0c84bb57..51e31896 100644 --- a/src/main/java/org/eclipse/uprotocol/validation/ValidationUtils.java +++ b/src/main/java/org/eclipse/uprotocol/validation/ValidationUtils.java @@ -1,5 +1,5 @@ /** - * SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + * SPDX-FileCopyrightText: 2025 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. diff --git a/src/test/java/org/eclipse/uprotocol/communication/CommunicationLayerClientTestBase.java b/src/test/java/org/eclipse/uprotocol/communication/CommunicationLayerClientTestBase.java index 848538df..4fc17638 100644 --- a/src/test/java/org/eclipse/uprotocol/communication/CommunicationLayerClientTestBase.java +++ b/src/test/java/org/eclipse/uprotocol/communication/CommunicationLayerClientTestBase.java @@ -15,6 +15,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import org.eclipse.uprotocol.transport.LocalUriProvider; @@ -54,11 +55,12 @@ class CommunicationLayerClientTestBase { protected ArgumentCaptor requestMessage; @BeforeEach + @SuppressWarnings("unchecked") void setUpTransport() { transport = mock(UTransport.class); - Mockito.lenient().when(transport.registerListener(any(UUri.class), any(UUri.class), any(UListener.class))) + Mockito.lenient().when(transport.registerListener(any(UUri.class), any(Optional.class), any(UListener.class))) .thenReturn(CompletableFuture.completedFuture(null)); - Mockito.lenient().when(transport.unregisterListener(any(UUri.class), any(UUri.class), any(UListener.class))) + Mockito.lenient().when(transport.unregisterListener(any(UUri.class), any(Optional.class), any(UListener.class))) .thenReturn(CompletableFuture.completedFuture(null)); Mockito.lenient().when(transport.send(any(UMessage.class))) .thenReturn(CompletableFuture.completedFuture(null)); diff --git a/src/test/java/org/eclipse/uprotocol/communication/InMemoryRpcClientTest.java b/src/test/java/org/eclipse/uprotocol/communication/InMemoryRpcClientTest.java index 4bd562c3..bed348ef 100644 --- a/src/test/java/org/eclipse/uprotocol/communication/InMemoryRpcClientTest.java +++ b/src/test/java/org/eclipse/uprotocol/communication/InMemoryRpcClientTest.java @@ -77,11 +77,12 @@ private static Stream callOptionsAndPayloadProvider() { @ParameterizedTest(name = "Test successful RPC, sending and receiving a payload: {index} - {arguments}") @MethodSource("callOptionsAndPayloadProvider") + @SuppressWarnings("unchecked") void testInvokeMethodWithPayloadSucceeds(CallOptions options, UPayload payload, UCode responseStatus) { RpcClient rpcClient = new InMemoryRpcClient(transport, uriProvider); var response = rpcClient.invokeMethod(METHOD_URI, payload, options); - verify(transport).registerListener(any(UUri.class), any(UUri.class), responseListener.capture()); + verify(transport).registerListener(any(UUri.class), any(Optional.class), responseListener.capture()); verify(transport).send(requestMessage.capture()); assertEquals(payload.data(), requestMessage.getValue().getPayload()); assertMessageHasOptions(options, requestMessage.getValue()); @@ -137,10 +138,11 @@ void testInvokeMethodFailsForTransportError() { @Test @DisplayName("Test unsuccessful RPC, with service returning error") + @SuppressWarnings("unchecked") void testInvokeMethodFailsForErroneousServiceInvocation() { RpcClient rpcClient = new InMemoryRpcClient(transport, uriProvider); var response = rpcClient.invokeMethod(METHOD_URI, UPayload.EMPTY, CallOptions.DEFAULT); - verify(transport).registerListener(any(UUri.class), any(UUri.class), responseListener.capture()); + verify(transport).registerListener(any(UUri.class), any(Optional.class), responseListener.capture()); verify(transport).send(requestMessage.capture()); assertEquals(UPayload.EMPTY.data(), requestMessage.getValue().getPayload()); assertMessageHasOptions(CallOptions.DEFAULT, requestMessage.getValue()); @@ -168,10 +170,11 @@ static Stream unexpectedMessageHandlerProvider() { @ParameterizedTest(name = "Test client handles unexpected incoming messages: {index} - {arguments}") @MethodSource("unexpectedMessageHandlerProvider") + @SuppressWarnings("unchecked") void testHandleUnexpectedResponse(Consumer unexpectedMessageHandler) { var rpcClient = new InMemoryRpcClient(transport, uriProvider); Optional.ofNullable(unexpectedMessageHandler).ifPresent(rpcClient::setUnexpectedMessageHandler); - verify(transport).registerListener(any(UUri.class), any(UUri.class), responseListener.capture()); + verify(transport).registerListener(any(UUri.class), any(Optional.class), responseListener.capture()); // send an arbitrary request rpcClient.invokeMethod(METHOD_URI, UPayload.EMPTY, CallOptions.DEFAULT); @@ -214,8 +217,14 @@ void testHandleUnexpectedResponse(Consumer unexpectedMessageHandler) { @DisplayName("Test close() unregisters the response listener from the transport") void testCloseUnregistersResponseListenerFromTransport() { InMemoryRpcClient rpcClient = new InMemoryRpcClient(transport, uriProvider); - verify(transport).registerListener(eq(UriFactory.ANY), eq(TRANSPORT_SOURCE), responseListener.capture()); + verify(transport).registerListener( + eq(UriFactory.ANY), + eq(Optional.of(TRANSPORT_SOURCE)), + responseListener.capture()); rpcClient.close(); - verify(transport).unregisterListener(eq(UriFactory.ANY), eq(TRANSPORT_SOURCE), eq(responseListener.getValue())); + verify(transport).unregisterListener( + eq(UriFactory.ANY), + eq(Optional.of(TRANSPORT_SOURCE)), + eq(responseListener.getValue())); } } diff --git a/src/test/java/org/eclipse/uprotocol/communication/InMemoryRpcServerTest.java b/src/test/java/org/eclipse/uprotocol/communication/InMemoryRpcServerTest.java index 2798704d..e453a835 100644 --- a/src/test/java/org/eclipse/uprotocol/communication/InMemoryRpcServerTest.java +++ b/src/test/java/org/eclipse/uprotocol/communication/InMemoryRpcServerTest.java @@ -103,7 +103,7 @@ void testRegisterRequestListenerSucceeds() { .join(); verify(transport).registerListener( eq(originFilter), - eq(METHOD_URI), + eq(Optional.of(METHOD_URI)), any(UListener.class)); server.unregisterRequestHandler( @@ -114,7 +114,7 @@ void testRegisterRequestListenerSucceeds() { .join(); verify(transport).unregisterListener( eq(originFilter), - eq(METHOD_URI), + eq(Optional.of(METHOD_URI)), any(UListener.class)); } @@ -132,7 +132,7 @@ void testRegisteringTwiceTheSameRequestHandler() { .join(); verify(transport, times(1)).registerListener( eq(originFilter), - eq(METHOD_URI), + eq(Optional.of(METHOD_URI)), any(UListener.class)); var exception = assertThrows(CompletionException.class, () -> server.registerRequestHandler( @@ -146,6 +146,7 @@ void testRegisteringTwiceTheSameRequestHandler() { @Test @DisplayName("Test unregistering a request handler that wasn't registered already") + @SuppressWarnings("unchecked") void testUnregisterRequestHandlerFailsForUnknownHandler() { final RpcServer server = new InMemoryRpcServer(transport, uriProvider); @@ -156,14 +157,15 @@ void testUnregisterRequestHandlerFailsForUnknownHandler() { assertEquals(UCode.NOT_FOUND, ((UStatusException) exception.getCause()).getCode()); verify(transport, never()).unregisterListener( any(UUri.class), - any(UUri.class), + any(Optional.class), any(UListener.class)); } @Test @DisplayName("Test registering a request handler with unavailable transport fails") + @SuppressWarnings("unchecked") void testRegisteringRequestListenerFailsIfTransportIsUnavailable() { - when(transport.registerListener(any(UUri.class), any(UUri.class), any(UListener.class))) + when(transport.registerListener(any(UUri.class), any(Optional.class), any(UListener.class))) .thenReturn(CompletableFuture.failedFuture(new UStatusException(UCode.UNAVAILABLE, "unavailable"))); RpcServer server = new InMemoryRpcServer(transport, uriProvider); @@ -174,7 +176,7 @@ void testRegisteringRequestListenerFailsIfTransportIsUnavailable() { assertEquals(UCode.UNAVAILABLE, ((UStatusException) exception.getCause()).getCode()); verify(transport, times(1)).registerListener( eq(UriFactory.ANY), - eq(METHOD_URI), + eq(Optional.of(METHOD_URI)), any(UListener.class)); } @@ -203,7 +205,7 @@ void testHandleRequestHandlesException(Exception thrownException, UCode expected server.registerRequestHandler(UriFactory.ANY, METHOD_URI.getResourceId(), handler) .toCompletableFuture().join(); final ArgumentCaptor requestListener = ArgumentCaptor.forClass(UListener.class); - verify(transport).registerListener(eq(UriFactory.ANY), eq(METHOD_URI), requestListener.capture()); + verify(transport).registerListener(eq(UriFactory.ANY), eq(Optional.of(METHOD_URI)), requestListener.capture()); final var request = UMessageBuilder.request(uriProvider.getSource(), METHOD_URI, 5000).build(); requestListener.getValue().onReceive(request); @@ -260,7 +262,7 @@ void testRequestHandlerReportsSendError( server.registerRequestHandler(UriFactory.ANY, METHOD_URI.getResourceId(), handler) .toCompletableFuture().join(); final ArgumentCaptor requestListener = ArgumentCaptor.forClass(UListener.class); - verify(transport).registerListener(eq(UriFactory.ANY), eq(METHOD_URI), requestListener.capture()); + verify(transport).registerListener(eq(UriFactory.ANY), eq(Optional.of(METHOD_URI)), requestListener.capture()); requestListener.getValue().onReceive(request); verify(handler).handleRequest(request); @@ -290,7 +292,7 @@ void testRequestHandlerIgnoresRequestsWithoutHandler(Consumer unexpect server.registerRequestHandler(UriFactory.ANY, METHOD_URI.getResourceId(), handler) .toCompletableFuture().join(); final ArgumentCaptor requestListener = ArgumentCaptor.forClass(UListener.class); - verify(transport).registerListener(eq(UriFactory.ANY), eq(METHOD_URI), requestListener.capture()); + verify(transport).registerListener(eq(UriFactory.ANY), eq(Optional.of(METHOD_URI)), requestListener.capture()); server.unregisterRequestHandler(UriFactory.ANY, METHOD_URI.getResourceId(), handler) .toCompletableFuture().join(); @@ -326,7 +328,7 @@ void testHandleRequestIgnoresNonRequestMessages() { server.registerRequestHandler(UriFactory.ANY, METHOD_URI.getResourceId(), handler) .toCompletableFuture().join(); final ArgumentCaptor requestListener = ArgumentCaptor.forClass(UListener.class); - verify(transport).registerListener(eq(UriFactory.ANY), eq(METHOD_URI), requestListener.capture()); + verify(transport).registerListener(eq(UriFactory.ANY), eq(Optional.of(METHOD_URI)), requestListener.capture()); requestListener.getValue().onReceive(invalidNotification); verify(handler, never()).handleRequest(any(UMessage.class)); @@ -349,7 +351,7 @@ void testHandleRequestSucceedsWithPayload() { server.registerRequestHandler(UriFactory.ANY, METHOD_URI.getResourceId(), handler) .toCompletableFuture().join(); final ArgumentCaptor requestListener = ArgumentCaptor.forClass(UListener.class); - verify(transport).registerListener(eq(UriFactory.ANY), eq(METHOD_URI), requestListener.capture()); + verify(transport).registerListener(eq(UriFactory.ANY), eq(Optional.of(METHOD_URI)), requestListener.capture()); requestListener.getValue().onReceive(request); verify(handler).handleRequest(request); diff --git a/src/test/java/org/eclipse/uprotocol/communication/InMemorySubscriberTest.java b/src/test/java/org/eclipse/uprotocol/communication/InMemorySubscriberTest.java index 5fc6ee97..f7efc365 100644 --- a/src/test/java/org/eclipse/uprotocol/communication/InMemorySubscriberTest.java +++ b/src/test/java/org/eclipse/uprotocol/communication/InMemorySubscriberTest.java @@ -102,15 +102,16 @@ class InMemorySubscriberTest { private UListener listener; @BeforeEach + @SuppressWarnings("unchecked") void setup() { uriProvider = StaticUriProvider.of(SOURCE); transport = mock(UTransport.class); - Mockito.lenient().when(transport.registerListener(any(UUri.class), any(UUri.class), any(UListener.class))) + Mockito.lenient().when(transport.registerListener(any(UUri.class), any(Optional.class), any(UListener.class))) .thenReturn(CompletableFuture.completedFuture(null)); Mockito.lenient().when(transport.registerListener(any(UUri.class), any(UListener.class))) .thenReturn(CompletableFuture.completedFuture(null)); - Mockito.lenient().when(transport.unregisterListener(any(UUri.class), any(UUri.class), any(UListener.class))) + Mockito.lenient().when(transport.unregisterListener(any(UUri.class), any(Optional.class), any(UListener.class))) .thenReturn(CompletableFuture.completedFuture(null)); Mockito.lenient().when(transport.unregisterListener(any(UUri.class), any(UListener.class))) .thenReturn(CompletableFuture.completedFuture(null)); @@ -186,7 +187,7 @@ void testConstructorForTransportAndUriProvider() { SUBSCRIPTION_NOTIFICATION_TOPIC_URI.getAuthorityName()); verify(transport).registerListener( eq(SUBSCRIPTION_NOTIFICATION_TOPIC_URI), - eq(SOURCE), + eq(Optional.of(SOURCE)), any(UListener.class)); } diff --git a/src/test/java/org/eclipse/uprotocol/communication/SimpleNotifierTest.java b/src/test/java/org/eclipse/uprotocol/communication/SimpleNotifierTest.java index cbb4ef3f..7239f911 100644 --- a/src/test/java/org/eclipse/uprotocol/communication/SimpleNotifierTest.java +++ b/src/test/java/org/eclipse/uprotocol/communication/SimpleNotifierTest.java @@ -15,10 +15,12 @@ import static org.junit.Assert.assertThrows; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -103,20 +105,21 @@ void testRegisterListener() { notifier.registerNotificationListener(TOPIC_URI, listener).toCompletableFuture().join(); verify(transport).registerListener( TOPIC_URI, - TRANSPORT_SOURCE, + Optional.of(TRANSPORT_SOURCE), listener); notifier.unregisterNotificationListener(TOPIC_URI, listener).toCompletableFuture().join(); verify(transport).unregisterListener( TOPIC_URI, - TRANSPORT_SOURCE, + Optional.of(TRANSPORT_SOURCE), listener); } @Test @DisplayName("Test unregistering a listener that was not registered") + @SuppressWarnings("unchecked") void testUnregisterListenerNotRegistered() { final var listener = mock(UListener.class); - when(transport.unregisterListener(TOPIC_URI, TRANSPORT_SOURCE, listener)) + when(transport.unregisterListener(any(UUri.class), any(Optional.class), any(UListener.class))) .thenReturn(CompletableFuture.failedFuture( new UStatusException(UCode.NOT_FOUND, "no such listener"))); final var exception = assertThrows(CompletionException.class, () -> { @@ -124,7 +127,7 @@ void testUnregisterListenerNotRegistered() { }); verify(transport).unregisterListener( TOPIC_URI, - TRANSPORT_SOURCE, + Optional.of(TRANSPORT_SOURCE), listener); assertEquals(UCode.NOT_FOUND, ((UStatusException) exception.getCause()).getCode()); } diff --git a/src/test/java/org/eclipse/uprotocol/communication/UClientTest.java b/src/test/java/org/eclipse/uprotocol/communication/UClientTest.java index 1ed752f7..fd6a2c80 100644 --- a/src/test/java/org/eclipse/uprotocol/communication/UClientTest.java +++ b/src/test/java/org/eclipse/uprotocol/communication/UClientTest.java @@ -19,6 +19,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import org.eclipse.uprotocol.transport.StaticUriProvider; @@ -65,11 +66,11 @@ void setUp() { @Test void testFactoryMethod() { var transport = mock(UTransport.class); - when(transport.registerListener(any(UUri.class), any(UUri.class), any(UListener.class))) + when(transport.registerListener(any(UUri.class), any(Optional.class), any(UListener.class))) .thenReturn(CompletableFuture.completedFuture(null)); var uriProvider = StaticUriProvider.of(TRANSPORT_SOURCE); UClient.create(transport, uriProvider); - verify(transport).registerListener(any(UUri.class), eq(TRANSPORT_SOURCE), any(UListener.class)); + verify(transport).registerListener(any(UUri.class), eq(Optional.of(TRANSPORT_SOURCE)), any(UListener.class)); } @Test diff --git a/src/test/java/org/eclipse/uprotocol/transport/UTransportTest.java b/src/test/java/org/eclipse/uprotocol/transport/UTransportTest.java index 32c1a705..5f661240 100644 --- a/src/test/java/org/eclipse/uprotocol/transport/UTransportTest.java +++ b/src/test/java/org/eclipse/uprotocol/transport/UTransportTest.java @@ -12,6 +12,7 @@ */ package org.eclipse.uprotocol.transport; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import static org.mockito.ArgumentMatchers.any; @@ -53,20 +54,22 @@ void setUp() { @Test @DisplayName("Test default implementation of registerListener") + @SuppressWarnings("unchecked") void testRegisterListener() { - when(transport.registerListener(any(UUri.class), any(UUri.class), any(UListener.class))) + when(transport.registerListener(any(UUri.class), any(Optional.class), any(UListener.class))) .thenReturn(CompletableFuture.completedFuture(null)); transport.registerListener(SOURCE_FILTER, listener).toCompletableFuture().join(); - verify(transport).registerListener(eq(SOURCE_FILTER), eq(UriFactory.ANY), eq(listener)); + verify(transport).registerListener(eq(SOURCE_FILTER), eq(Optional.of(UriFactory.ANY)), eq(listener)); } @Test @DisplayName("Test happy path unregister listener") + @SuppressWarnings("unchecked") void testUnregisterListener() { - when(transport.unregisterListener(any(UUri.class), any(UUri.class), any(UListener.class))) + when(transport.unregisterListener(any(UUri.class), any(Optional.class), any(UListener.class))) .thenReturn(CompletableFuture.completedFuture(null)); transport.unregisterListener(SOURCE_FILTER, listener).toCompletableFuture().join(); - verify(transport).unregisterListener(eq(SOURCE_FILTER), eq(UriFactory.ANY), eq(listener)); + verify(transport).unregisterListener(eq(SOURCE_FILTER), eq(Optional.of(UriFactory.ANY)), eq(listener)); } } diff --git a/src/test/java/org/eclipse/uprotocol/transport/validator/UAttributesValidatorTest.java b/src/test/java/org/eclipse/uprotocol/transport/validator/UAttributesValidatorTest.java index 129510a9..24554e5d 100644 --- a/src/test/java/org/eclipse/uprotocol/transport/validator/UAttributesValidatorTest.java +++ b/src/test/java/org/eclipse/uprotocol/transport/validator/UAttributesValidatorTest.java @@ -110,7 +110,7 @@ void testGetValidatorReturnsMatchingValidator( 10000, true 29999, true 30000, true - 30001, false + 40000, false """) void testIsExpired(int ttl, boolean expectedIsExpired) { var now = Instant.now(); diff --git a/src/test/java/org/eclipse/uprotocol/uri/validator/UriValidatorTest.java b/src/test/java/org/eclipse/uprotocol/uri/validator/UriValidatorTest.java index f2e37e6f..6735f244 100644 --- a/src/test/java/org/eclipse/uprotocol/uri/validator/UriValidatorTest.java +++ b/src/test/java/org/eclipse/uprotocol/uri/validator/UriValidatorTest.java @@ -13,17 +13,21 @@ package org.eclipse.uprotocol.uri.validator; +import org.eclipse.uprotocol.communication.UStatusException; import org.eclipse.uprotocol.uri.serializer.UriSerializer; +import org.eclipse.uprotocol.v1.UCode; import org.eclipse.uprotocol.v1.UUri; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; +import static org.junit.Assert.assertEquals; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.Arrays; +import java.util.Optional; class UriValidatorTest { @@ -165,4 +169,34 @@ void testHasWildcard(String uri, boolean shouldSucceed) { assertFalse(UriValidator.hasWildcard(uuri)); } } + + @ParameterizedTest(name = "Test verifyFilterCriteria: {index} {arguments}") + @CsvSource(useHeadersInDisplayName = true, textBlock = """ + source, sink, should fail + //vehicle1/AA/1/FFFF, //vehicle2/BB/1/FFFF, false + //vehicle1/AA/1/9000, //vehicle2/BB/1/0, false + //vehicle1/AA/1/0, //vehicle2/BB/1/1, false + # source and sink both have resource ID 0 + //vehicle1/AA/1/0, //vehicle2/BB/1/0, true + //vehicle1/AA/1/FFFF, //vehicle2/BB/1/1A, false + //vehicle1/AA/1/0, //vehicle2/BB/1/1A, false + # sink is RPC but source has invalid resource ID + //vehicle1/AA/1/CC, //vehicle2/BB/1/1A, true + //vehicle1/AA/1/9000, , false + //vehicle1/AA/1/FFFF, , false + # sink is empty but source has non-topic resource ID + //vehicle1/AA/1/CC, , true + """) + void testVerifyFilterCriteriaFails(String source, String sink, boolean shouldFail) { + var sourceFilter = UriSerializer.deserialize(source); + Optional sinkFilter = sink != null ? Optional.of(UriSerializer.deserialize(sink)) : Optional.empty(); + if (shouldFail) { + UStatusException exception = assertThrows( + UStatusException.class, + () -> UriValidator.verifyFilterCriteria(sourceFilter, sinkFilter)); + assertEquals(UCode.INVALID_ARGUMENT, exception.getCode()); + } else { + assertDoesNotThrow(() -> UriValidator.verifyFilterCriteria(sourceFilter, sinkFilter)); + } + } }