Skip to content

Commit

Permalink
Add first implementation of reconcileWorkers
Browse files Browse the repository at this point in the history
It is quite limited; in particular it leaves superfluous workers
in suspended state that prevents coordinator from successful restarts.
(Probably they should be closed.)
  • Loading branch information
mederly committed Apr 17, 2018
1 parent 389311c commit b4deff7
Show file tree
Hide file tree
Showing 12 changed files with 557 additions and 129 deletions.
Expand Up @@ -38,12 +38,7 @@
import com.evolveum.midpoint.security.api.AuthorizationConstants;
import com.evolveum.midpoint.task.api.Task;
import com.evolveum.midpoint.task.api.TaskExecutionStatus;
import com.evolveum.midpoint.util.exception.CommunicationException;
import com.evolveum.midpoint.util.exception.ConfigurationException;
import com.evolveum.midpoint.util.exception.ExpressionEvaluationException;
import com.evolveum.midpoint.util.exception.ObjectNotFoundException;
import com.evolveum.midpoint.util.exception.SchemaException;
import com.evolveum.midpoint.util.exception.SecurityViolationException;
import com.evolveum.midpoint.util.exception.*;
import com.evolveum.midpoint.util.logging.Trace;
import com.evolveum.midpoint.util.logging.TraceManager;
import com.evolveum.midpoint.web.application.AuthorizationAction;
Expand Down Expand Up @@ -122,6 +117,7 @@ public class PageTasks extends PageAdminTasks implements Refreshable {
private static final String OPERATION_RESUME_TASKS = DOT_CLASS + "resumeTasks";
private static final String OPERATION_RESUME_TASK = DOT_CLASS + "resumeTask";
private static final String OPERATION_DELETE_TASKS = DOT_CLASS + "deleteTasks";
private static final String OPERATION_RECONCILE_WORKERS = DOT_CLASS + "reconcileWorkers";
private static final String OPERATION_DELETE_ALL_CLOSED_TASKS = DOT_CLASS + "deleteAllClosedTasks";
private static final String OPERATION_SCHEDULE_TASKS = DOT_CLASS + "scheduleTasks";
private static final String OPERATION_DELETE_NODES = DOT_CLASS + "deleteNodes";
Expand Down Expand Up @@ -702,15 +698,15 @@ public String getObject() {
}
});

IColumn<TaskDto, String> menuColumn = new InlineMenuButtonColumn<TaskDto>(createTasksInlineMenu(false), 2, PageTasks.this){
IColumn<TaskDto, String> menuColumn = new InlineMenuButtonColumn<TaskDto>(createTasksInlineMenu(false, null), 2, PageTasks.this){
@Override
protected int getHeaderNumberOfButtons() {
return 2;
}

@Override
protected List<InlineMenuItem> getHeaderMenuItems() {
return createTasksInlineMenu(true);
return createTasksInlineMenu(true, null);
}
};
columns.add(menuColumn);
Expand Down Expand Up @@ -741,7 +737,7 @@ public IModel<String> getDataModel(IModel<TaskDto> rowModel) {
};
}

private List<InlineMenuItem> createTasksInlineMenu(boolean isHeader) {
private List<InlineMenuItem> createTasksInlineMenu(boolean isHeader, TaskDto dto) {
List<InlineMenuItem> items = new ArrayList<>();
items.add(new InlineMenuItem(createStringResource("pageTasks.button.suspendTask"),
new Model<>(false),
Expand Down Expand Up @@ -863,6 +859,37 @@ public IModel<String> getConfirmationMessageModel(){
}

});
if (!isHeader && dto != null) {
if (dto.getTaskType().getWorkManagement() != null && dto.getTaskType().getWorkManagement().getTaskKind() == TaskKindType.COORDINATOR) {
items.add(new InlineMenuItem(createStringResource("pageTasks.button.reconcileWorkers"), false,
new ColumnMenuAction<TaskDto>() {

@Override
public void onClick(AjaxRequestTarget target) {
if (getRowModel() == null) {
throw new UnsupportedOperationException();
} else {
TaskDto rowDto = getRowModel().getObject();
reconcileWorkersConfirmedPerformed(target, rowDto);
}
}
}) {

private static final long serialVersionUID = 1L;

@Override
public boolean isShowConfirmationDialog() {
return PageTasks.this.isTaskShowConfirmationDialog((ColumnMenuAction) getAction());
}

@Override
public IModel<String> getConfirmationMessageModel() {
String actionName = createStringResource("pageTasks.message.reconcileWorkersAction").getString();
return PageTasks.this.getTaskConfirmationMessageModel((ColumnMenuAction) getAction(), actionName);
}
});
}
}
if (isHeader) {
items.add(new InlineMenuItem(createStringResource("pageTasks.button.deleteAllClosedTasks"), false,
new ColumnMenuAction<TaskDto>() {
Expand Down Expand Up @@ -1608,6 +1635,24 @@ private void deleteTaskConfirmedPerformed(AjaxRequestTarget target, TaskDto task
refreshTables(target);
}

private void reconcileWorkersConfirmedPerformed(AjaxRequestTarget target, @NotNull TaskDto task) {
Task opTask = createSimpleTask(OPERATION_RECONCILE_WORKERS);
OperationResult result = opTask.getResult();
try {
getTaskService().reconcileWorkers(task.getOid(), opTask, result);
result.computeStatus();
} catch (ObjectAlreadyExistsException | ObjectNotFoundException | SchemaException | SecurityViolationException | ExpressionEvaluationException | RuntimeException | CommunicationException | ConfigurationException e) {
result.recordFatalError("Couldn't reconcile the workers", e); // todo i18n
}
showResult(result);

TaskDtoProvider provider = (TaskDtoProvider) getTaskTable().getDataTable().getDataProvider();
provider.clearCache();

//refresh feedback and table
refreshTables(target);
}

private static class SearchFragment extends Fragment {

public SearchFragment(String id, String markupId, MarkupContainer markupProvider,
Expand Down Expand Up @@ -1767,7 +1812,7 @@ private void addInlineMenuToTaskDto(final TaskDto dto) {
return;
}

items.addAll(createTasksInlineMenu(false));
items.addAll(createTasksInlineMenu(false, dto));
}

private void addInlineMenuToNodeRow(final NodeDto dto) {
Expand Down
Expand Up @@ -2138,6 +2138,7 @@ pageTasks.alreadyPassedForNotRunningTasks=(already passed)
pageTasks.button.deactivateServiceThreads=Stop all threads
pageTasks.button.deleteNode=Delete
pageTasks.button.deleteTask=Delete
pageTasks.button.reconcileWorkers=Reconcile workers
pageTasks.button.deleteAllClosedTasks=Delete all closed tasks
pageTasks.button.reactivateServiceThreads=Start all threads
pageTasks.button.refreshTasks=Refresh tasks
Expand Down Expand Up @@ -2172,6 +2173,7 @@ pageTasks.message.suspendAction=suspend
pageTasks.message.resumeAction=resume
pageTasks.message.runNowAction=run now
pageTasks.message.deleteAction=delete
pageTasks.message.reconcileWorkersAction=reconcile workers of
pageTasks.message.deleteAllClosedTasksAction=delete all closed tasks
pageTasks.message.startAction=start
pageTasks.message.scheduleTaskAction=schedule task
Expand Down
Expand Up @@ -20,12 +20,7 @@
import com.evolveum.midpoint.schema.SelectorOptions;
import com.evolveum.midpoint.schema.result.OperationResult;
import com.evolveum.midpoint.task.api.Task;
import com.evolveum.midpoint.util.exception.CommunicationException;
import com.evolveum.midpoint.util.exception.ConfigurationException;
import com.evolveum.midpoint.util.exception.ExpressionEvaluationException;
import com.evolveum.midpoint.util.exception.ObjectNotFoundException;
import com.evolveum.midpoint.util.exception.SchemaException;
import com.evolveum.midpoint.util.exception.SecurityViolationException;
import com.evolveum.midpoint.util.exception.*;
import com.evolveum.midpoint.xml.ns._public.common.common_3.TaskType;

import java.util.Collection;
Expand Down Expand Up @@ -196,5 +191,9 @@ public interface TaskService {
* @return
*/
String getHandlerUriForCategory(String category);
//endregion

void reconcileWorkers(String oid, Task opTask, OperationResult result)
throws CommunicationException, ObjectNotFoundException, SchemaException, SecurityViolationException,
ConfigurationException, ExpressionEvaluationException, ObjectAlreadyExistsException;
//endregion
}
Expand Up @@ -1881,6 +1881,14 @@ public void synchronizeWorkflowRequests(Task operationTask, OperationResult pare
workflowManager.synchronizeWorkflowRequests(parentResult);
}

@Override
public void reconcileWorkers(String oid, Task opTask, OperationResult result)
throws CommunicationException, ObjectNotFoundException, SchemaException, SecurityViolationException,
ConfigurationException, ExpressionEvaluationException, ObjectAlreadyExistsException {
securityEnforcer.authorize(AuthorizationConstants.AUTZ_ALL_URL, null, AuthorizationParameters.EMPTY, null, opTask, result);
taskManager.reconcileWorkers(oid, result);
}

@Override
public List<String> getAllTaskCategories() {
return taskManager.getAllTaskCategories();
Expand Down
Expand Up @@ -449,6 +449,11 @@ public String getNode() {
throw new UnsupportedOperationException("not implemented yet.");
}

@Override
public String getNodeAsObserved() {
return null;
}

@Override
public OperationResultStatusType getResultStatus() {
throw new UnsupportedOperationException("not implemented yet.");
Expand Down Expand Up @@ -904,4 +909,14 @@ public TaskExecutionStatusType getStateBeforeSuspend() {
public boolean isPartitionedMaster() {
return false;
}

@Override
public TaskKindType getKind() {
return null;
}

@Override
public String getExecutionGroup() {
return null;
}
}
Expand Up @@ -214,6 +214,8 @@ void setNameImmediate(PolyStringType value, OperationResult parentResult)
*/
String getNode();

String getNodeAsObserved();

/**
* Returns true if the task can run (was not interrupted).
*
Expand Down Expand Up @@ -1035,9 +1037,13 @@ void savePendingModifications(OperationResult parentResult) throws ObjectNotFoun

TaskWorkStateType getWorkState();

TaskKindType getKind();

TaskUnpauseActionType getUnpauseAction();

TaskExecutionStatusType getStateBeforeSuspend();

boolean isPartitionedMaster();

String getExecutionGroup();
}
Expand Up @@ -410,6 +410,9 @@ boolean suspendTaskTree(String coordinatorOid, long waitTime, OperationResult pa
void resumeTaskTree(String coordinatorOid, OperationResult parentResult)
throws SchemaException, ObjectNotFoundException;

void reconcileWorkers(String coordinatorOid, OperationResult parentResult)
throws SchemaException, ObjectNotFoundException, ObjectAlreadyExistsException;

/**
* TODO is this method really necessary?
*/
Expand Down
Expand Up @@ -39,6 +39,7 @@
import com.evolveum.midpoint.task.api.*;
import com.evolveum.midpoint.task.quartzimpl.handlers.PartitioningTaskHandler;
import com.evolveum.midpoint.task.quartzimpl.work.WorkStateManager;
import com.evolveum.midpoint.task.quartzimpl.work.workers.WorkersManager;
import com.evolveum.midpoint.xml.ns._public.common.common_3.*;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.Validate;
Expand Down Expand Up @@ -164,6 +165,7 @@ public class TaskManagerQuartzImpl implements TaskManager, BeanFactoryAware {
@Autowired private LightweightIdentifierGenerator lightweightIdentifierGenerator;
@Autowired private PrismContext prismContext;
@Autowired private WorkStateManager workStateManager;
@Autowired private WorkersManager workersManager;

@Autowired
@Qualifier("securityContextManager")
Expand Down Expand Up @@ -408,6 +410,22 @@ public void resumeTaskTree(String rootTaskOid, OperationResult parentResult) thr
}
}

@Override
public void reconcileWorkers(String coordinatorTaskOid, OperationResult parentResult)
throws SchemaException, ObjectNotFoundException, ObjectAlreadyExistsException {
OperationResult result = parentResult.createSubresult(DOT_INTERFACE + "reconcileWorkers");
result.addParam("coordinatorTaskOid", coordinatorTaskOid);

try {
workersManager.reconcileWorkers(coordinatorTaskOid, result);
} catch (Throwable t) {
result.recordFatalError("Couldn't reconcile workers", t);
throw t;
} finally {
result.computeStatusIfUnknown();
}
}

@Override
public void scheduleCoordinatorAndWorkersNow(String coordinatorOid, OperationResult parentResult) throws SchemaException, ObjectNotFoundException {
OperationResult result = parentResult.createSubresult(DOT_INTERFACE + "scheduleCoordinatorAndWorkersNow");
Expand Down
Expand Up @@ -2241,6 +2241,11 @@ public String getNode() {
return taskPrism.asObjectable().getNode();
}

@Override
public String getNodeAsObserved() {
return taskPrism.asObjectable().getNodeAsObserved();
}

public void setNode(String value) {
processModificationBatched(setNodeAndPrepareDelta(value));
}
Expand Down Expand Up @@ -3196,6 +3201,12 @@ public TaskWorkStateType getWorkState() {
return taskPrism.asObjectable().getWorkState();
}

@Override
public TaskKindType getKind() {
TaskWorkManagementType workManagement = getWorkManagement();
return workManagement != null ? workManagement.getTaskKind() : null;
}

@Override
public TaskUnpauseActionType getUnpauseAction() {
return taskPrism.asObjectable().getUnpauseAction();
Expand All @@ -3220,4 +3231,10 @@ public boolean isPartitionedMaster() {
TaskWorkManagementType workManagement = getWorkManagement();
return workManagement != null && workManagement.getTaskKind() == TaskKindType.PARTITIONED_MASTER;
}

@Override
public String getExecutionGroup() {
TaskExecutionConstraintsType executionConstraints = getExecutionConstraints();
return executionConstraints != null ? executionConstraints.getGroup() : null;
}
}

0 comments on commit b4deff7

Please sign in to comment.