Skip to content

Commit

Permalink
Fixed OptimisticLockExeption catch to fix processing of the update la…
Browse files Browse the repository at this point in the history
…st event on device

Signed-off-by: coduz <alberto.codutti@eurotech.com>
  • Loading branch information
Coduz committed Nov 21, 2019
1 parent e0066bb commit 74db7cc
Showing 1 changed file with 51 additions and 24 deletions.
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2011, 2017 Eurotech and/or its affiliates and others
* Copyright (c) 2016, 2019 Eurotech and/or its affiliates and others
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
Expand All @@ -11,10 +11,9 @@
*******************************************************************************/
package org.eclipse.kapua.service.device.registry.event.internal;

import javax.persistence.OptimisticLockException;

import org.eclipse.kapua.KapuaEntityNotFoundException;
import org.eclipse.kapua.KapuaException;
import org.eclipse.kapua.KapuaOptimisticLockingException;
import org.eclipse.kapua.commons.service.internal.AbstractKapuaService;
import org.eclipse.kapua.commons.util.ArgumentValidator;
import org.eclipse.kapua.locator.KapuaLocator;
Expand Down Expand Up @@ -45,7 +44,7 @@ public class DeviceEventServiceImpl extends AbstractKapuaService implements Devi

private static final Logger LOG = LoggerFactory.getLogger(DeviceEventServiceImpl.class);

private static final int MAX_ITERATION = 3;
private static final int MAX_RETRY = 3;
private static final double MAX_WAIT = 200d;

private final AuthorizationService authorizationService;
Expand All @@ -57,7 +56,9 @@ public class DeviceEventServiceImpl extends AbstractKapuaService implements Devi
*/
public DeviceEventServiceImpl() {
super(DeviceEntityManagerFactory.instance());

KapuaLocator locator = KapuaLocator.getInstance();

authorizationService = locator.getService(AuthorizationService.class);
permissionFactory = locator.getFactory(PermissionFactory.class);
deviceRegistryService = locator.getService(DeviceRegistryService.class);
Expand All @@ -75,36 +76,21 @@ public DeviceEvent create(DeviceEventCreator deviceEventCreator) throws KapuaExc
ArgumentValidator.notNull(deviceEventCreator.getReceivedOn(), "deviceEventCreator.receivedOn");
ArgumentValidator.notEmptyOrNull(deviceEventCreator.getResource(), "deviceEventCreator.eventType");

//
// Check Access
authorizationService.checkPermission(permissionFactory.newPermission(DeviceDomains.DEVICE_EVENT_DOMAIN, Actions.write, deviceEventCreator.getScopeId()));

//
// Check that device exists
if (deviceRegistryService.find(deviceEventCreator.getScopeId(), deviceEventCreator.getDeviceId()) == null) {
throw new KapuaEntityNotFoundException(Device.TYPE, deviceEventCreator.getDeviceId());
}

// Create the event
DeviceEvent deviceEvent = entityManagerSession.onTransactedInsert(entityManager -> DeviceEventDAO.create(entityManager, deviceEventCreator));
int iteration = 0;
do {
try {
Device device = deviceRegistryService.find(deviceEvent.getScopeId(), deviceEvent.getDeviceId());
if (device != null) {
device.setLastEventId(deviceEvent.getId());
deviceRegistryService.update(device);
}
break;
}
catch (OptimisticLockException e) {
LOG.warn("Concurrent update for device id {}... try again (if maximum attempts is not reach)", deviceEvent.getDeviceId());
try {
Thread.sleep((long)(Math.random() * MAX_WAIT));
} catch (InterruptedException e1) {
LOG.warn("Error while waiting {}", e.getMessage());
}
}
}
while(iteration++ < MAX_ITERATION);

updateLastEventOnDevice(deviceEvent);

return deviceEvent;
}

Expand Down Expand Up @@ -170,4 +156,45 @@ public void delete(KapuaId scopeId, KapuaId deviceEventId) throws KapuaException
DeviceEventDAO.delete(em, scopeId, deviceEventId);
});
}


/**
* Updates the {@link Device#getLastEventId()} with the given {@link DeviceEvent}.
*
* @param deviceEvent The {@link DeviceEvent} that needs to be set.
* @throws KapuaException If {@link Device} does not exist or updating the entity causes an error that is not {@link KapuaOptimisticLockingException} which is ignored.
* @since 1.2.0
*/
private void updateLastEventOnDevice(DeviceEvent deviceEvent) throws KapuaException {
int retry = 0;
do {
retry++;
try {
Device device = deviceRegistryService.find(deviceEvent.getScopeId(), deviceEvent.getDeviceId());

if (device == null) {
throw new KapuaEntityNotFoundException(Device.TYPE, deviceEvent.getDeviceId());
}

if (device.getLastEvent() == null ||
device.getLastEvent().getReceivedOn().before(deviceEvent.getReceivedOn())) {
device.setLastEventId(deviceEvent.getId());
deviceRegistryService.update(device);
}
break;
} catch (KapuaOptimisticLockingException e) {
LOG.warn("Concurrent update for device: {} - Event id: {} Attempt: {}/{}. {}", deviceEvent.getDeviceId(), deviceEvent.getId(), retry, MAX_RETRY, retry < MAX_RETRY ? "Retrying..." : "Skipping update!");

if (retry < MAX_RETRY) {
try {
Thread.sleep((long) (Math.random() * MAX_WAIT));
} catch (InterruptedException e1) {
LOG.warn("Error while waiting retry: {}", e.getMessage());
}
}
}
}
while (retry < MAX_RETRY);
}

}

0 comments on commit 74db7cc

Please sign in to comment.