Skip to content

Commit

Permalink
Move javascript racing tests to mapping processors.
Browse files Browse the repository at this point in the history
The mapper can not prevent racing and preserve performance
on its own.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Dec 29, 2021
1 parent a938a57 commit 221c039
Show file tree
Hide file tree
Showing 5 changed files with 292 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -77,8 +75,6 @@
import com.typesafe.config.ConfigFactory;

import akka.actor.ActorSystem;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;

/**
* Tests the {@link JavaScriptMessageMapperRhino} by initializing different mapping templates and ensuring that they
Expand Down Expand Up @@ -1480,89 +1476,4 @@ public void testWithProtobufJsJavascriptOutgoingMapping() {
.isNotEmpty();
});
}

@Test
public void testConcurrentJavascriptIncomingMappingUsingGlobalVariable() {
// GIVEN:
// Incoming script sleeps for the seconds specified in the text payload,
// then reads and sets the thing name as the value of the global variable.
final String incomingScript = "var $global = $global;\n" +
"function sleep(sec){\n" +
" var start = new Date();\n" +
" var now = new Date();\n" +
" while(now - start < sec*1000) now = new Date();\n" +
"}\n" +
"function mapToDittoProtocolMsg(\n" +
" headers,\n" +
" textPayload,\n" +
" bytePayload,\n" +
" contentType\n" +
") {\n" +
" $global = textPayload;\n" +
" sleep(parseInt(textPayload, 10));\n" +
" let namespace = \"" + MAPPING_INCOMING_NAMESPACE + "\";\n" +
" let group = \"things\";\n" +
" let channel = \"twin\";\n" +
" let criterion = \"commands\";\n" +
" let action = \"modify\";\n" +
" let path = \"" + MAPPING_INCOMING_PATH + "\";\n" +
" let dittoHeaders = {};\n" +
" dittoHeaders[\"correlation-id\"] = textPayload;\n" +
" let value = textPayload;\n" +
" let name = $global;\n" +
" $global = textPayload;\n" +
" return Ditto.buildDittoProtocolMsg(\n" +
" namespace,\n" +
" name,\n" +
" group,\n" +
" channel,\n" +
" criterion,\n" +
" action,\n" +
" path,\n" +
" dittoHeaders,\n" +
" value\n" +
" );\n" +
"}";
final var mapper = JavaScriptMessageMapperFactory.createJavaScriptMessageMapperRhino();
mapper.configure(CONNECTION,
CONNECTIVITY_CONFIG,
JavaScriptMessageMapperFactory
.createJavaScriptMessageMapperConfigurationBuilder("plain", Collections.emptyMap())
.incomingScript(incomingScript)
.outgoingScript(MAPPING_OUTGOING_PLAIN)
.build(),
actorSystem
);

// WHEN:
// Mapper is asked to map the text messages ["5", "4", "3", "2", "1"], which make the mapper sleep for 5 to 1s.
final Map<String, String> headers = new HashMap<>();
headers.put(ExternalMessage.CONTENT_TYPE_HEADER, CONTENT_TYPE_PLAIN);
final var messages = IntStream.range(-5, 0)
.map(i -> -i)
.mapToObj(i -> ExternalMessageFactory.newExternalMessageBuilder(headers)
.withText(String.valueOf(i))
.build())
.collect(Collectors.toList());

final var results = Source.from(messages)
.mapAsync(messages.size(), m -> CompletableFuture.supplyAsync(() -> mapper.map(m)))
.mapConcat(ms -> ms)
.runWith(Sink.seq(), actorSystem)
.toCompletableFuture()
.join();

assertThat(results.size()).isEqualTo(messages.size());

// THEN:
// The results of the mapper should preserve the incoming message order "5", "4", "3", "2", "1".
// The results should not be "4", "3", "2", "1", "1" due to the global variable persisting between messages.
System.out.println(
results.stream().map(a -> a.getTopicPath().getEntityName()).collect(Collectors.joining("\n")));
for (int i = 0; i < results.size(); ++i) {
final var adaptable = results.get(i);
assertThat(adaptable.getTopicPath().getEntityName()).isEqualTo(String.valueOf(5 - i));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,7 @@ public final class InboundMappingProcessorTest {
@BeforeClass
public static void setUp() {
actorSystem = ActorSystem.create("AkkaTestSystem", TestConstants.CONFIG);

logger = Mockito.mock(ThreadSafeDittoLoggingAdapter.class);
when(logger.withMdcEntry(Mockito.any(CharSequence.class), Mockito.nullable(CharSequence.class)))
.thenReturn(logger);
when(logger.withCorrelationId(Mockito.nullable(String.class))).thenReturn(logger);
when(logger.withCorrelationId(Mockito.nullable(WithDittoHeaders.class))).thenReturn(logger);
when(logger.withCorrelationId(Mockito.nullable(DittoHeaders.class))).thenReturn(logger);
when(logger.withCorrelationId(Mockito.nullable(CharSequence.class))).thenReturn(logger);

logger = TestConstants.mockThreadSafeDittoLoggingAdapter();
protocolAdapterProvider = new DittoProtocolAdapterProvider(Mockito.mock(ProtocolConfig.class));
}

Expand Down

0 comments on commit 221c039

Please sign in to comment.