diff --git a/service/device/registry/internal/src/main/java/org/eclipse/kapua/service/device/registry/event/internal/DeviceEventServiceImpl.java b/service/device/registry/internal/src/main/java/org/eclipse/kapua/service/device/registry/event/internal/DeviceEventServiceImpl.java index f1bdd40970a..8e116015d17 100644 --- a/service/device/registry/internal/src/main/java/org/eclipse/kapua/service/device/registry/event/internal/DeviceEventServiceImpl.java +++ b/service/device/registry/internal/src/main/java/org/eclipse/kapua/service/device/registry/event/internal/DeviceEventServiceImpl.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2011, 2017 Eurotech and/or its affiliates and others + * Copyright (c) 2016, 2019 Eurotech and/or its affiliates and others * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 @@ -11,10 +11,9 @@ *******************************************************************************/ package org.eclipse.kapua.service.device.registry.event.internal; -import javax.persistence.OptimisticLockException; - import org.eclipse.kapua.KapuaEntityNotFoundException; import org.eclipse.kapua.KapuaException; +import org.eclipse.kapua.KapuaOptimisticLockingException; import org.eclipse.kapua.commons.service.internal.AbstractKapuaService; import org.eclipse.kapua.commons.util.ArgumentValidator; import org.eclipse.kapua.locator.KapuaLocator; @@ -45,7 +44,7 @@ public class DeviceEventServiceImpl extends AbstractKapuaService implements Devi private static final Logger LOG = LoggerFactory.getLogger(DeviceEventServiceImpl.class); - private static final int MAX_ITERATION = 3; + private static final int MAX_RETRY = 3; private static final double MAX_WAIT = 200d; private final AuthorizationService authorizationService; @@ -57,7 +56,9 @@ public class DeviceEventServiceImpl extends AbstractKapuaService implements Devi */ public DeviceEventServiceImpl() { super(DeviceEntityManagerFactory.instance()); + KapuaLocator locator = KapuaLocator.getInstance(); + authorizationService = locator.getService(AuthorizationService.class); permissionFactory = locator.getFactory(PermissionFactory.class); deviceRegistryService = locator.getService(DeviceRegistryService.class); @@ -75,9 +76,11 @@ public DeviceEvent create(DeviceEventCreator deviceEventCreator) throws KapuaExc ArgumentValidator.notNull(deviceEventCreator.getReceivedOn(), "deviceEventCreator.receivedOn"); ArgumentValidator.notEmptyOrNull(deviceEventCreator.getResource(), "deviceEventCreator.eventType"); + // // Check Access authorizationService.checkPermission(permissionFactory.newPermission(DeviceDomains.DEVICE_EVENT_DOMAIN, Actions.write, deviceEventCreator.getScopeId())); + // // Check that device exists if (deviceRegistryService.find(deviceEventCreator.getScopeId(), deviceEventCreator.getDeviceId()) == null) { throw new KapuaEntityNotFoundException(Device.TYPE, deviceEventCreator.getDeviceId()); @@ -85,26 +88,9 @@ public DeviceEvent create(DeviceEventCreator deviceEventCreator) throws KapuaExc // Create the event DeviceEvent deviceEvent = entityManagerSession.onTransactedInsert(entityManager -> DeviceEventDAO.create(entityManager, deviceEventCreator)); - int iteration = 0; - do { - try { - Device device = deviceRegistryService.find(deviceEvent.getScopeId(), deviceEvent.getDeviceId()); - if (device != null) { - device.setLastEventId(deviceEvent.getId()); - deviceRegistryService.update(device); - } - break; - } - catch (OptimisticLockException e) { - LOG.warn("Concurrent update for device id {}... try again (if maximum attempts is not reach)", deviceEvent.getDeviceId()); - try { - Thread.sleep((long)(Math.random() * MAX_WAIT)); - } catch (InterruptedException e1) { - LOG.warn("Error while waiting {}", e.getMessage()); - } - } - } - while(iteration++ < MAX_ITERATION); + + updateLastEventOnDevice(deviceEvent); + return deviceEvent; } @@ -170,4 +156,45 @@ public void delete(KapuaId scopeId, KapuaId deviceEventId) throws KapuaException DeviceEventDAO.delete(em, scopeId, deviceEventId); }); } + + + /** + * Updates the {@link Device#getLastEventId()} with the given {@link DeviceEvent}. + * + * @param deviceEvent The {@link DeviceEvent} that needs to be set. + * @throws KapuaException If {@link Device} does not exist or updating the entity causes an error that is not {@link KapuaOptimisticLockingException} which is ignored. + * @since 1.2.0 + */ + private void updateLastEventOnDevice(DeviceEvent deviceEvent) throws KapuaException { + int retry = 0; + do { + retry++; + try { + Device device = deviceRegistryService.find(deviceEvent.getScopeId(), deviceEvent.getDeviceId()); + + if (device == null) { + throw new KapuaEntityNotFoundException(Device.TYPE, deviceEvent.getDeviceId()); + } + + if (device.getLastEvent() == null || + device.getLastEvent().getReceivedOn().before(deviceEvent.getReceivedOn())) { + device.setLastEventId(deviceEvent.getId()); + deviceRegistryService.update(device); + } + break; + } catch (KapuaOptimisticLockingException e) { + LOG.warn("Concurrent update for device: {} - Event id: {} Attempt: {}/{}. {}", deviceEvent.getDeviceId(), deviceEvent.getId(), retry, MAX_RETRY, retry < MAX_RETRY ? "Retrying..." : "Skipping update!"); + + if (retry < MAX_RETRY) { + try { + Thread.sleep((long) (Math.random() * MAX_WAIT)); + } catch (InterruptedException e1) { + LOG.warn("Error while waiting retry: {}", e.getMessage()); + } + } + } + } + while (retry < MAX_RETRY); + } + }