From 28140c944411f0bd1d3ea7dbe891c0a7c5582a6b Mon Sep 17 00:00:00 2001 From: "Arora, Kaushal" Date: Wed, 28 May 2025 14:53:10 +0530 Subject: [PATCH 1/2] Fix to use key from event for retry records instead of vehicle ID --- .../base/processors/DeviceMessagingAgentPreProcessor.java | 2 +- .../eclipse/ecsp/stream/dma/dao/key/RetryRecordKey.java | 8 ++++---- .../org/eclipse/ecsp/stream/dma/handler/RetryHandler.java | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/eclipse/ecsp/analytics/stream/base/processors/DeviceMessagingAgentPreProcessor.java b/src/main/java/org/eclipse/ecsp/analytics/stream/base/processors/DeviceMessagingAgentPreProcessor.java index 52d009d..96a1e84 100644 --- a/src/main/java/org/eclipse/ecsp/analytics/stream/base/processors/DeviceMessagingAgentPreProcessor.java +++ b/src/main/java/org/eclipse/ecsp/analytics/stream/base/processors/DeviceMessagingAgentPreProcessor.java @@ -144,7 +144,7 @@ public void process(Record, IgniteEvent> kafkaRecord) { if (StringUtils.isNotEmpty(correlationId)) { String igniteKey = (String) key.getKey(); - String retryRecordKeyPart = RetryRecordKey.createVehiclePart(igniteKey, correlationId); + String retryRecordKeyPart = RetryRecordKey.createKeyPart(igniteKey, correlationId); removeEventFromCache(retryRecordKeyPart); } this.spc.forward(new Record<>(key, value, System.currentTimeMillis())); diff --git a/src/main/java/org/eclipse/ecsp/stream/dma/dao/key/RetryRecordKey.java b/src/main/java/org/eclipse/ecsp/stream/dma/dao/key/RetryRecordKey.java index 3d6ef31..55bc1cd 100644 --- a/src/main/java/org/eclipse/ecsp/stream/dma/dao/key/RetryRecordKey.java +++ b/src/main/java/org/eclipse/ecsp/stream/dma/dao/key/RetryRecordKey.java @@ -74,14 +74,14 @@ public RetryRecordKey(String key, String taskId) { } /** - * Creates the vehicle part. + * Creates the key part. * - * @param vehicleId the vehicle id + * @param key the event key * @param messageId the message id * @return the string */ - public static String createVehiclePart(String vehicleId, String messageId) { - return (vehicleId + DMAConstants.SEMI_COLON + messageId); + public static String createKeyPart(String key, String messageId) { + return (key + DMAConstants.SEMI_COLON + messageId); } /** diff --git a/src/main/java/org/eclipse/ecsp/stream/dma/handler/RetryHandler.java b/src/main/java/org/eclipse/ecsp/stream/dma/handler/RetryHandler.java index 700589f..06e8e6e 100644 --- a/src/main/java/org/eclipse/ecsp/stream/dma/handler/RetryHandler.java +++ b/src/main/java/org/eclipse/ecsp/stream/dma/handler/RetryHandler.java @@ -359,7 +359,7 @@ private void retryHandle(IgniteKey key, DeviceMessage value, boolean firstAtt return; } boolean cutOffNotExceeded = validateIgniteEvent(header); - String retryRecordKeyPart = RetryRecordKey.createVehiclePart(header.getVehicleId(), header.getMessageId()); + String retryRecordKeyPart = RetryRecordKey.createKeyPart((String) key.getKey(), header.getMessageId()); if (cutOffNotExceeded) { if (checkDeviceInactive(key, value)) { logger.debug("Device is inactive for ignitekey {} and value {}. Removing Retry entry Record " @@ -565,7 +565,7 @@ private void saveToOfflineBufferAndDeleteFromCache(IgniteKey key, DeviceMessa ? value.getDeviceMessageHeader().getDevMsgTopicSuffix().toLowerCase() : null); logger.info("Saved event with key: {} and value: {} to mongo as max retries have exhausted.", key, value); - String retryRecordKeyPart = RetryRecordKey.createVehiclePart(value.getDeviceMessageHeader().getVehicleId(), + String retryRecordKeyPart = RetryRecordKey.createKeyPart((String) key.getKey(), value.getDeviceMessageHeader().getMessageId()); RetryRecordKey retryKey = new RetryRecordKey(retryRecordKeyPart, taskId); retryEventDAO.deleteFromMap(retryEventMapKey, retryKey, Optional.empty(), From 3d1ef24c72f5cb79ab1755cc704fc6730091683c Mon Sep 17 00:00:00 2001 From: "Arora, Kaushal" Date: Wed, 28 May 2025 17:26:26 +0530 Subject: [PATCH 2/2] Fix test cases to use key instead of vehicle id --- .../eclipse/ecsp/stream/dma/handler/RetryHandlerTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/test/java/org/eclipse/ecsp/stream/dma/handler/RetryHandlerTest.java b/src/test/java/org/eclipse/ecsp/stream/dma/handler/RetryHandlerTest.java index 5054e89..c57a3bf 100644 --- a/src/test/java/org/eclipse/ecsp/stream/dma/handler/RetryHandlerTest.java +++ b/src/test/java/org/eclipse/ecsp/stream/dma/handler/RetryHandlerTest.java @@ -101,7 +101,7 @@ public class RetryHandlerTest { /** The retry test key. */ private RetryTestKey retryTestKey = new RetryTestKey(); - + /** The max retry. */ private int maxRetry = 3; @@ -403,7 +403,7 @@ public void testRetryHandleWhenResponseExpectedIsNotSet() throws InterruptedExce @Test public void testRetryHandleWhenMaxRetryThresholdHasNotBeenReached() throws InterruptedException { retryHandler.close(); - String retryRecordKey = "vehicleId;msg123"; + String retryRecordKey = "Vehicle12345;msg123"; ConcurrentHashSet retryRecordKeys = new ConcurrentHashSet(); retryRecordKeys.add(retryRecordKey); // It should be able to retry past keys, hence we are subtracting 10 @@ -525,7 +525,7 @@ private static RetryTestEvent getRetryTestEvent() { @Test public void testRetryHandleWhenMaxRetryThresholdHasReached() throws InterruptedException { retryHandler.close(); - String retryRecordKey = "vehicleId;msg123"; + String retryRecordKey = "Vehicle12345;msg123"; ConcurrentHashSet retryRecordKeys = new ConcurrentHashSet(); retryRecordKeys.add(retryRecordKey); @@ -607,7 +607,7 @@ public void testRetryHandleWhenMaxRetryThresholdHasReached() throws InterruptedE @Test public void testRetryHandleWhenNoDataPresentInRetryEventDAO() throws InterruptedException { retryHandler.close(); - String retryRecordKey = "vehicleId;msg123"; + String retryRecordKey = "Vehicle12345;msg123"; ConcurrentHashSet retryRecordKeys = new ConcurrentHashSet(); retryRecordKeys.add(retryRecordKey);