Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adapt cancel flow #1274

Merged
merged 7 commits into from Sep 21, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -12,16 +12,19 @@
import static org.eclipse.hawkbit.tenancy.configuration.TenantConfigurationProperties.TenantConfigurationKey.BATCH_ASSIGNMENTS_ENABLED;

import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import com.google.common.collect.Iterables;
import org.eclipse.hawkbit.api.ApiType;
import org.eclipse.hawkbit.api.ArtifactUrl;
import org.eclipse.hawkbit.api.ArtifactUrlHandler;
Expand Down Expand Up @@ -50,7 +53,7 @@
import org.eclipse.hawkbit.repository.event.remote.TargetAssignDistributionSetEvent;
import org.eclipse.hawkbit.repository.event.remote.TargetAttributesRequestedEvent;
import org.eclipse.hawkbit.repository.event.remote.TargetDeletedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.CancelTargetAssignmentEvent;
import org.eclipse.hawkbit.repository.event.remote.CancelTargetAssignmentEvent;
import org.eclipse.hawkbit.repository.model.Action;
import org.eclipse.hawkbit.repository.model.ActionProperties;
import org.eclipse.hawkbit.repository.model.Artifact;
Expand All @@ -70,6 +73,8 @@
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
import org.springframework.context.event.EventListener;
import org.springframework.data.domain.PageRequest;
import org.springframework.security.core.context.SecurityContext;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.util.CollectionUtils;

/**
Expand All @@ -84,6 +89,8 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {

private static final Logger LOG = LoggerFactory.getLogger(AmqpMessageDispatcherService.class);

private static final int MAX_PROCESSING_SIZE = 1000;

private final ArtifactUrlHandler artifactUrlHandler;
private final AmqpMessageSenderService amqpSenderService;
private final SystemSecurityContext systemSecurityContext;
Expand Down Expand Up @@ -177,14 +184,16 @@ protected void onMultiAction(final MultiActionEvent multiActionEvent) {
}

private List<Target> getTargetsWithoutPendingCancellations(final Set<String> controllerIds) {
return targetManagement.getByControllerID(controllerIds).stream().filter(target -> {
if (hasPendingCancellations(target.getControllerId())) {
LOG.debug("Target {} has pending cancellations. Will not send update message to it.",
target.getControllerId());
return false;
}
return true;
}).collect(Collectors.toList());
return partitionedParallelExecution(controllerIds, partition -> {
return targetManagement.getByControllerID(partition).stream().filter(target -> {
if (hasPendingCancellations(target.getControllerId())) {
LOG.debug("Target {} has pending cancellations. Will not send update message to it.",
target.getControllerId());
return false;
}
return true;
}).collect(Collectors.toList());
});
}

private void sendUpdateMessageToTarget(final TargetAssignDistributionSetEvent assignedEvent,
Expand Down Expand Up @@ -319,18 +328,45 @@ protected void targetCancelAssignmentToDistributionSet(final CancelTargetAssignm
return;
}

final Optional<Target> eventEntity = cancelEvent.getEntity();
if (eventEntity.isPresent()) {
final Target target = eventEntity.get();
sendCancelMessageToTarget(cancelEvent.getTenant(), target.getControllerId(), cancelEvent.getActionId(),
target.getAddress());
} else {
LOG.warn(
"Cannot process the received CancelTargetAssignmentEvent with action ID {} because the referenced target with ID {} does no longer exist.",
cancelEvent.getActionId(), cancelEvent.getEntityId());
final List<Target> eventTargets = partitionedParallelExecution(cancelEvent.getActions().keySet(),
targetManagement::getByControllerID);

eventTargets.forEach(target -> {
cancelEvent.getActionPropertiesForController(target.getControllerId()).map(ActionProperties::getId)
.ifPresent(actionId -> {
sendCancelMessageToTarget(cancelEvent.getTenant(), target.getControllerId(), actionId,
target.getAddress());
});
});
}

private static <T, R> List<R> partitionedParallelExecution(final Collection<T> controllerIds,
final Function<Collection<T>, List<R>> loadingFunction) {
// Ensure not exceeding the max value of MAX_PROCESSING_SIZE
if (controllerIds.size() > MAX_PROCESSING_SIZE) {
// Split the provided collection
final Iterable<List<T>> partitions = Iterables.partition(controllerIds, MAX_PROCESSING_SIZE);
// Preserve the security context because it gets lost when executing
// loading calls in new threads
final SecurityContext context = SecurityContextHolder.getContext();
// Handling remote request in parallel streams
return StreamSupport.stream(partitions.spliterator(), true) //
.flatMap(partition -> withSecurityContext(() -> loadingFunction.apply(partition), context).stream())
.collect(Collectors.toList());
}
return loadingFunction.apply(controllerIds);
}

private static <T> T withSecurityContext(final Supplier<T> callable, final SecurityContext securityContext) {
final SecurityContext oldContext = SecurityContextHolder.getContext();
try {
SecurityContextHolder.setContext(securityContext);
return callable.get();
} finally {
SecurityContextHolder.setContext(oldContext);
}
}

/**
* Method to send a message to a RabbitMQ Exchange after a Target was
* deleted.
Expand Down
Expand Up @@ -11,6 +11,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.io.File;
Expand All @@ -37,7 +38,7 @@
import org.eclipse.hawkbit.repository.event.remote.TargetAssignDistributionSetEvent;
import org.eclipse.hawkbit.repository.event.remote.TargetAttributesRequestedEvent;
import org.eclipse.hawkbit.repository.event.remote.TargetDeletedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.CancelTargetAssignmentEvent;
import org.eclipse.hawkbit.repository.event.remote.CancelTargetAssignmentEvent;
import org.eclipse.hawkbit.repository.jpa.RepositoryApplicationConfiguration;
import org.eclipse.hawkbit.repository.model.Action;
import org.eclipse.hawkbit.repository.model.Artifact;
Expand Down Expand Up @@ -230,12 +231,14 @@ void sendUpdateAttributesRequest() {
@Test
@Description("Verifies that send cancel event works")
void testSendCancelRequest() {
final Action action = mock(Action.class);
when(action.getId()).thenReturn(1L);
when(action.getTarget()).thenReturn(testTarget);
final CancelTargetAssignmentEvent cancelTargetAssignmentDistributionSetEvent = new CancelTargetAssignmentEvent(
testTarget, 1L, serviceMatcher.getServiceId());
action, serviceMatcher.getServiceId());
amqpMessageDispatcherService
.targetCancelAssignmentToDistributionSet(cancelTargetAssignmentDistributionSetEvent);
final Message sendMessage = createArgumentCapture(
cancelTargetAssignmentDistributionSetEvent.getEntity().get().getAddress());
final Message sendMessage = createArgumentCapture(AMQP_URI);
assertCancelMessage(sendMessage);

}
Expand Down
Expand Up @@ -48,7 +48,7 @@
import org.eclipse.hawkbit.repository.event.remote.TargetPollEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.ActionCreatedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.ActionUpdatedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.CancelTargetAssignmentEvent;
import org.eclipse.hawkbit.repository.event.remote.CancelTargetAssignmentEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.DistributionSetCreatedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.RolloutCreatedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.RolloutGroupCreatedEvent;
Expand Down
Expand Up @@ -41,7 +41,7 @@
import org.eclipse.hawkbit.repository.event.remote.TargetPollEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.ActionCreatedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.ActionUpdatedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.CancelTargetAssignmentEvent;
import org.eclipse.hawkbit.repository.event.remote.CancelTargetAssignmentEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.DistributionSetCreatedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.SoftwareModuleCreatedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.SoftwareModuleUpdatedEvent;
Expand Down
@@ -0,0 +1,56 @@
/**
* Copyright (c) 2022 Bosch.IO GmbH and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*/
package org.eclipse.hawkbit.repository.event.remote;

import org.eclipse.hawkbit.repository.model.Action;
import org.eclipse.hawkbit.repository.model.ActionProperties;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

/**
* Abstract class providing information about an assignment.
*/
public abstract class AbstractAssignmentEvent extends RemoteTenantAwareEvent {

private static final long serialVersionUID = 1L;

private final Map<String, ActionProperties> actions = new HashMap<>();

/**
* Default constructor.
*/
protected AbstractAssignmentEvent() {
// for serialization libs like jackson
}

protected AbstractAssignmentEvent(final Object source, final Action a, final String applicationId) {
super(source, a.getTenant(), applicationId);
actions.put(a.getTarget().getControllerId(), new ActionProperties(a));
}

protected AbstractAssignmentEvent(final Object source, final String tenant, final List<Action> a,
final String applicationId) {
super(source, tenant, applicationId);
actions.putAll(a.stream()
.collect(Collectors.toMap(action -> action.getTarget().getControllerId(), ActionProperties::new)));
}

public Map<String, ActionProperties> getActions() {
return actions;
}

public Optional<ActionProperties> getActionPropertiesForController(final String controllerId) {
return Optional.ofNullable(actions.get(controllerId));
}

}
@@ -0,0 +1,39 @@
/**
* Copyright (c) 2022 Bosch.IO GmbH and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*/
package org.eclipse.hawkbit.repository.event.remote;

import java.util.List;

import org.eclipse.hawkbit.repository.model.Action;

/**
* Event that gets sent when the assignment of a distribution set to a target
* gets canceled.
*/
public class CancelTargetAssignmentEvent extends AbstractAssignmentEvent {

private static final long serialVersionUID = 1L;

/**
* Default constructor.
*/
public CancelTargetAssignmentEvent() {
// for serialization libs like jackson
}

public CancelTargetAssignmentEvent(final Action a, final String applicationId) {
super(applicationId, a, applicationId);
}

public CancelTargetAssignmentEvent(final String tenant, final List<Action> a, final String applicationId) {
super(applicationId, tenant, a, applicationId);

}

}
Expand Up @@ -9,28 +9,23 @@
package org.eclipse.hawkbit.repository.event.remote;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.eclipse.hawkbit.repository.model.Action;
import org.eclipse.hawkbit.repository.model.ActionProperties;

/**
* TenantAwareEvent that gets sent when a distribution set gets assigned to a
* target.
*/
public class TargetAssignDistributionSetEvent extends RemoteTenantAwareEvent {
public class TargetAssignDistributionSetEvent extends AbstractAssignmentEvent {

private static final long serialVersionUID = 1L;

private long distributionSetId;

private boolean maintenanceWindowAvailable;

private final Map<String, ActionProperties> actions = new HashMap<>();

/**
* Default constructor.
*/
Expand All @@ -54,12 +49,12 @@ public TargetAssignDistributionSetEvent() {
*/
public TargetAssignDistributionSetEvent(final String tenant, final long distributionSetId, final List<Action> a,
final String applicationId, final boolean maintenanceWindowAvailable) {
super(distributionSetId, tenant, applicationId);
super(distributionSetId, tenant,
a.stream().filter(action -> action.getDistributionSet().getId().longValue() == distributionSetId)
.collect(Collectors.toList()),
applicationId);
this.distributionSetId = distributionSetId;
this.maintenanceWindowAvailable = maintenanceWindowAvailable;
actions.putAll(a.stream().filter(action -> action.getDistributionSet().getId().longValue() == distributionSetId)
.collect(Collectors.toMap(action -> action.getTarget().getControllerId(), ActionProperties::new)));

}

/**
Expand All @@ -83,8 +78,4 @@ public boolean isMaintenanceWindowAvailable() {
return maintenanceWindowAvailable;
}

public Map<String, ActionProperties> getActions() {
return actions;
}

}

This file was deleted.

Expand Up @@ -34,7 +34,7 @@
import org.eclipse.hawkbit.repository.event.remote.TenantConfigurationDeletedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.ActionCreatedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.ActionUpdatedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.CancelTargetAssignmentEvent;
import org.eclipse.hawkbit.repository.event.remote.CancelTargetAssignmentEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.DistributionSetCreatedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.DistributionSetTagCreatedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.DistributionSetTagUpdatedEvent;
Expand Down