Skip to content

Commit

Permalink
Fix Migration service workflow blocked in the service deployment task
Browse files Browse the repository at this point in the history
  • Loading branch information
baixinsui committed Nov 23, 2023
1 parent 187eaca commit d5f2f98
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,11 @@ public DatabaseDeployServiceStorage(DeployServiceRepository deployServiceReposit
this.deployServiceRepository = deployServiceRepository;
}

/**
* Store the entity to the database.
*
* @param deployServiceEntity the model of registered service.
*/
@Override
public boolean store(DeployServiceEntity deployServiceEntity) {
this.deployServiceRepository.save(deployServiceEntity);
return true;
}

/**
* Store the entity to the database and flush the data immediately.
*
* @param deployServiceEntity the model of registered service.
* @param deployServiceEntity the entity of service.
* @return deployServiceEntity the entity of service.
*/
@Override
public DeployServiceEntity storeAndFlush(DeployServiceEntity deployServiceEntity) {
Expand Down Expand Up @@ -126,17 +116,4 @@ public DeployServiceEntity findDeployServiceById(UUID id) {
public void deleteDeployService(DeployServiceEntity deployServiceEntity) {
this.deployServiceRepository.delete(deployServiceEntity);
}

@Override
public DeployServiceEntity queryRefreshDeployServiceById(UUID id) {
Optional<DeployServiceEntity> optionalEntity =
this.deployServiceRepository.findById(id);
if (optionalEntity.isPresent()) {
DeployServiceEntity entity = optionalEntity.get();
entityManager.refresh(entity);
return entity;
} else {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,8 @@ public interface DeployServiceStorage {
/**
* Add or update deployed service data to database.
*
* @param deployServiceEntity the model of deployed service.
*/
boolean store(DeployServiceEntity deployServiceEntity);

/**
* Add or update deployed service data to database.
*
* @param deployServiceEntity the model of deployed service.
* @param deployServiceEntity the entity of service.
* @return deployServiceEntity the entity of service.
*/
DeployServiceEntity storeAndFlush(DeployServiceEntity deployServiceEntity);

Expand Down Expand Up @@ -59,5 +53,4 @@ public interface DeployServiceStorage {
*/
void deleteDeployService(DeployServiceEntity deployServiceEntity);

DeployServiceEntity queryRefreshDeployServiceById(UUID id);
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.eclipse.xpanse.modules.database.service;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -19,7 +18,6 @@
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.dao.OptimisticLockingFailureException;

@ExtendWith(MockitoExtension.class)
class DatabaseDeployServiceStorageTest {
Expand All @@ -40,44 +38,6 @@ void setUp() {
databaseDeployServiceStorageUnderTest.entityManager = entityManager;
}

@Test
void testStore() {
final DeployServiceEntity deployServiceEntity = new DeployServiceEntity();
deployServiceEntity.setId(id);
deployServiceEntity.setUserId(userId);
deployServiceEntity.setCategory(Category.AI);
deployServiceEntity.setName("name");
deployServiceEntity.setCustomerServiceName("customerServiceName");
databaseDeployServiceStorageUnderTest.store(deployServiceEntity);
final DeployServiceEntity entity = new DeployServiceEntity();
entity.setId(id);
entity.setUserId(userId);
entity.setCategory(Category.AI);
entity.setName("name");
entity.setCustomerServiceName("customerServiceName");
verify(mockDeployServiceRepository).save(entity);
}

@Test
void testStore_DeployServiceRepositoryThrowsOptimisticLockingFailureException() {
final DeployServiceEntity deployServiceEntity = new DeployServiceEntity();
deployServiceEntity.setId(id);
deployServiceEntity.setUserId(userId);
deployServiceEntity.setCategory(Category.AI);
deployServiceEntity.setName("name");
deployServiceEntity.setCustomerServiceName("customerServiceName");
final DeployServiceEntity entity = new DeployServiceEntity();
entity.setId(id);
entity.setUserId(userId);
entity.setCategory(Category.AI);
entity.setName("name");
entity.setCustomerServiceName("customerServiceName");
when(mockDeployServiceRepository.save(entity))
.thenThrow(OptimisticLockingFailureException.class);
assertThatThrownBy(() -> databaseDeployServiceStorageUnderTest.store(
deployServiceEntity)).isInstanceOf(OptimisticLockingFailureException.class);
}

@Test
void testStoreAndFlush() {
final DeployServiceEntity deployServiceEntity = new DeployServiceEntity();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -613,10 +613,16 @@ private void maskSensitiveFields(DeployServiceEntity deployServiceEntity) {

/**
* Deployment service.
*
* @param newId new service id.
* @param userId user id.
* @param deployRequest deploy request.
* @return new deployed service entity.
*/
@Async("taskExecutor")
public CompletableFuture<Void> deployService(UUID newId,
String userId, DeployRequest deployRequest) {
public CompletableFuture<DeployServiceEntity> deployService(UUID newId,
String userId,
DeployRequest deployRequest) {
MDC.put(TASK_ID, newId.toString());
log.info("start deploy service, service id : {}", newId);
DeployTask deployTask = new DeployTask();
Expand All @@ -626,53 +632,28 @@ public CompletableFuture<Void> deployService(UUID newId,
Deployment deployment = getDeployHandler(deployTask);
deployTask.getDeployRequest().setUserId(userId);
deploy(deployment, deployTask);
return CompletableFuture.completedFuture(null);
DeployServiceEntity deployServiceEntity =
deployServiceStorage.findDeployServiceById(newId);
return CompletableFuture.completedFuture(deployServiceEntity);
}

/**
* Destroy service by deployed service id.
*
* @param id deployed service id.
* @return updated service entity.
*/
@Async("taskExecutor")
public CompletableFuture<Void> destroyService(String id) {
public CompletableFuture<DeployServiceEntity> destroyService(String id) {
MDC.put(TASK_ID, id);
log.info("start destroy service, service id : {}", id);
DeployTask deployTask = new DeployTask();
deployTask.setId(UUID.fromString(id));
Deployment deployment = getDestroyHandler(deployTask);
destroy(deployment, deployTask);
return CompletableFuture.completedFuture(null);
}

/**
* Method to determine whether deploy is successful.
*/
public boolean isDeploySuccess(UUID id) {
MDC.put(TASK_ID, id.toString());
log.info(" starting to poll for status update.. , service id : {}", id);
ServiceDeploymentState deployState = null;
while (deployState == ServiceDeploymentState.DEPLOYING || deployState == null) {
deployState = deployServiceStorage.queryRefreshDeployServiceById(id)
.getServiceDeploymentState();
}
log.info("deployment status updated,state:{}", deployState);
return deployState == ServiceDeploymentState.DEPLOY_SUCCESS;
}

/**
* Method to determine whether destroy is successful.
*/
@Async("taskExecutor")
public CompletableFuture<Boolean> isDestroySuccess(UUID id) {
ServiceDeploymentState destroyState = null;
MDC.put(TASK_ID, id.toString());
log.info(" starting to poll for status update.. , service id : {}", id);
while (destroyState == ServiceDeploymentState.DESTROYING || destroyState == null) {
destroyState = deployServiceStorage.findDeployServiceById(id)
.getServiceDeploymentState();
}
log.info("destroy status updated,state:{}", destroyState);
return CompletableFuture.completedFuture(
destroyState == ServiceDeploymentState.DESTROY_SUCCESS);
DeployServiceEntity deployServiceEntity =
deployServiceStorage.findDeployServiceById(UUID.fromString(id));
return CompletableFuture.completedFuture(deployServiceEntity);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,19 @@

import java.io.Serializable;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.activiti.engine.RuntimeService;
import org.activiti.engine.delegate.DelegateExecution;
import org.activiti.engine.delegate.JavaDelegate;
import org.eclipse.xpanse.modules.database.service.DeployServiceEntity;
import org.eclipse.xpanse.modules.deployment.DeployService;
import org.eclipse.xpanse.modules.models.service.deploy.DeployRequest;
import org.eclipse.xpanse.modules.models.service.deploy.MigrateRequest;
import org.eclipse.xpanse.modules.models.service.deploy.enums.ServiceDeploymentState;
import org.eclipse.xpanse.modules.workflow.consts.MigrateConstants;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -30,16 +33,13 @@
@Component
public class MigrateDeployProcess implements Serializable, JavaDelegate {

private static RuntimeService runtimeService;
private static DeployService deployService;
private final RuntimeService runtimeService;
private final DeployService deployService;

@Autowired
public void setRuntimeService(RuntimeService runtimeService) {
public MigrateDeployProcess(RuntimeService runtimeService,
DeployService deployService) {
this.runtimeService = runtimeService;
}

@Autowired
public void setDeployService(DeployService deployService) {
this.deployService = deployService;
}

Expand All @@ -52,7 +52,6 @@ public void execute(DelegateExecution execution) {

String processInstanceId = execution.getProcessInstanceId();
Map<String, Object> variables = runtimeService.getVariables(processInstanceId);
UUID id = (UUID) variables.get(MigrateConstants.ID);
UUID newId = (UUID) variables.get(MigrateConstants.NEW_ID);
runtimeService.updateBusinessKey(processInstanceId, newId.toString());
int deployRetryNum = (int) variables.get(MigrateConstants.DEPLOY_RETRY_NUM);
Expand All @@ -68,9 +67,13 @@ public void execute(DelegateExecution execution) {
DeployRequest deployRequest = new DeployRequest();
BeanUtils.copyProperties(migrateRequest, deployRequest);
String userId = (String) variables.get(MigrateConstants.USER_ID);
CompletableFuture<Void> future = deployService.deployService(newId, userId, deployRequest);
future.join();
boolean deploySuccess = deployService.isDeploySuccess(newId);

CompletableFuture<DeployServiceEntity> future =
deployService.deployService(newId, userId, deployRequest);
DeployServiceEntity result = future.get();

boolean deploySuccess = Objects.nonNull(result)
&& ServiceDeploymentState.DEPLOY_SUCCESS == result.getServiceDeploymentState();
if (!deploySuccess && deployRetryNum >= 1) {
runtimeService.setVariable(processInstanceId, MigrateConstants.ASSIGNEE, userId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@

import java.io.Serializable;
import java.util.Map;
import java.util.UUID;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.activiti.engine.RuntimeService;
import org.activiti.engine.delegate.DelegateExecution;
import org.activiti.engine.delegate.JavaDelegate;
import org.eclipse.xpanse.modules.database.service.DeployServiceEntity;
import org.eclipse.xpanse.modules.deployment.DeployService;
import org.eclipse.xpanse.modules.models.service.deploy.enums.ServiceDeploymentState;
import org.eclipse.xpanse.modules.workflow.consts.MigrateConstants;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
Expand All @@ -27,17 +29,14 @@
@Component
public class MigrateDestroyProcess implements Serializable, JavaDelegate {

private static RuntimeService runtimeService;
private static DeployService deployService;
private final RuntimeService runtimeService;
private final DeployService deployService;

@Autowired
public void setRuntimeService(RuntimeService runtimeService) {
MigrateDestroyProcess.runtimeService = runtimeService;
}

@Autowired
public void setDeployService(DeployService deployService) {
MigrateDestroyProcess.deployService = deployService;
public MigrateDestroyProcess(RuntimeService runtimeService,
DeployService deployService) {
this.runtimeService = runtimeService;
this.deployService = deployService;
}

/**
Expand All @@ -49,19 +48,20 @@ public void execute(DelegateExecution execution) {
String processInstanceId = execution.getProcessInstanceId();
Map<String, Object> variables = runtimeService.getVariables(processInstanceId);
String id = variables.get(MigrateConstants.ID).toString();
String userId = (String) variables.get(MigrateConstants.USER_ID);
int destroyRetryNum = (int) variables.get(MigrateConstants.DESTROY_RETRY_NUM);
if (destroyRetryNum > 0) {
log.info("Process instance: {} retry destroy service : {},RetryNum:{}",
processInstanceId, id, destroyRetryNum);
}
runtimeService.setVariable(processInstanceId, MigrateConstants.DESTROY_RETRY_NUM,
destroyRetryNum + 1);
CompletableFuture<Void> future = deployService.destroyService(id);
future.join();
CompletableFuture<Boolean> destroySuccess =
deployService.isDestroySuccess(UUID.fromString(id));
CompletableFuture<DeployServiceEntity> future = deployService.destroyService(id);
DeployServiceEntity deployServiceEntity = future.get();

boolean destroySuccess = Objects.nonNull(deployServiceEntity)
&& ServiceDeploymentState.DEPLOY_SUCCESS
== deployServiceEntity.getServiceDeploymentState();
runtimeService.setVariable(processInstanceId, MigrateConstants.IS_DESTROY_SUCCESS,
destroySuccess.get());
destroySuccess);
}
}

0 comments on commit d5f2f98

Please sign in to comment.