Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public void process(Record<IgniteKey<?>, IgniteEvent> kafkaRecord) {

if (StringUtils.isNotEmpty(correlationId)) {
String igniteKey = (String) key.getKey();
String retryRecordKeyPart = RetryRecordKey.createVehiclePart(igniteKey, correlationId);
String retryRecordKeyPart = RetryRecordKey.createKeyPart(igniteKey, correlationId);
removeEventFromCache(retryRecordKeyPart);
}
this.spc.forward(new Record<>(key, value, System.currentTimeMillis()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,14 @@ public RetryRecordKey(String key, String taskId) {
}

/**
* Creates the vehicle part.
* Creates the key part.
*
* @param vehicleId the vehicle id
* @param key the event key
* @param messageId the message id
* @return the string
*/
public static String createVehiclePart(String vehicleId, String messageId) {
return (vehicleId + DMAConstants.SEMI_COLON + messageId);
public static String createKeyPart(String key, String messageId) {
return (key + DMAConstants.SEMI_COLON + messageId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ private void retryHandle(IgniteKey<?> key, DeviceMessage value, boolean firstAtt
return;
}
boolean cutOffNotExceeded = validateIgniteEvent(header);
String retryRecordKeyPart = RetryRecordKey.createVehiclePart(header.getVehicleId(), header.getMessageId());
String retryRecordKeyPart = RetryRecordKey.createKeyPart((String) key.getKey(), header.getMessageId());
if (cutOffNotExceeded) {
if (checkDeviceInactive(key, value)) {
logger.debug("Device is inactive for ignitekey {} and value {}. Removing Retry entry Record "
Expand Down Expand Up @@ -565,7 +565,7 @@ private void saveToOfflineBufferAndDeleteFromCache(IgniteKey<?> key, DeviceMessa
? value.getDeviceMessageHeader().getDevMsgTopicSuffix().toLowerCase() : null);
logger.info("Saved event with key: {} and value: {} to mongo as max retries have exhausted.",
key, value);
String retryRecordKeyPart = RetryRecordKey.createVehiclePart(value.getDeviceMessageHeader().getVehicleId(),
String retryRecordKeyPart = RetryRecordKey.createKeyPart((String) key.getKey(),
value.getDeviceMessageHeader().getMessageId());
RetryRecordKey retryKey = new RetryRecordKey(retryRecordKeyPart, taskId);
retryEventDAO.deleteFromMap(retryEventMapKey, retryKey, Optional.empty(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public class RetryHandlerTest {

/** The retry test key. */
private RetryTestKey retryTestKey = new RetryTestKey();

/** The max retry. */
private int maxRetry = 3;

Expand Down Expand Up @@ -403,7 +403,7 @@ public void testRetryHandleWhenResponseExpectedIsNotSet() throws InterruptedExce
@Test
public void testRetryHandleWhenMaxRetryThresholdHasNotBeenReached() throws InterruptedException {
retryHandler.close();
String retryRecordKey = "vehicleId;msg123";
String retryRecordKey = "Vehicle12345;msg123";
ConcurrentHashSet<String> retryRecordKeys = new ConcurrentHashSet<String>();
retryRecordKeys.add(retryRecordKey);
// It should be able to retry past keys, hence we are subtracting 10
Expand Down Expand Up @@ -525,7 +525,7 @@ private static RetryTestEvent getRetryTestEvent() {
@Test
public void testRetryHandleWhenMaxRetryThresholdHasReached() throws InterruptedException {
retryHandler.close();
String retryRecordKey = "vehicleId;msg123";
String retryRecordKey = "Vehicle12345;msg123";
ConcurrentHashSet<String> retryRecordKeys = new ConcurrentHashSet<String>();
retryRecordKeys.add(retryRecordKey);

Expand Down Expand Up @@ -607,7 +607,7 @@ public void testRetryHandleWhenMaxRetryThresholdHasReached() throws InterruptedE
@Test
public void testRetryHandleWhenNoDataPresentInRetryEventDAO() throws InterruptedException {
retryHandler.close();
String retryRecordKey = "vehicleId;msg123";
String retryRecordKey = "Vehicle12345;msg123";
ConcurrentHashSet<String> retryRecordKeys = new ConcurrentHashSet<String>();
retryRecordKeys.add(retryRecordKey);

Expand Down
Loading