Skip to content

Commit

Permalink
Replace table ANALYSIS_REPORTS by CE_QUEUE and CE_ACTIVITY
Browse files Browse the repository at this point in the history
  • Loading branch information
Simon Brandhof committed Sep 18, 2015
1 parent faf4df4 commit 024672b
Show file tree
Hide file tree
Showing 129 changed files with 8,690 additions and 3,033 deletions.
Expand Up @@ -20,7 +20,6 @@
package org.sonar.server.batch; package org.sonar.server.batch;


import org.sonar.core.platform.Module; import org.sonar.core.platform.Module;
import org.sonar.server.computation.ws.SubmitReportAction;


public class BatchWsModule extends Module { public class BatchWsModule extends Module {
@Override @Override
Expand All @@ -30,7 +29,6 @@ protected void configureModule() {
GlobalAction.class, GlobalAction.class,
ProjectAction.class, ProjectAction.class,
ProjectRepositoryLoader.class, ProjectRepositoryLoader.class,
SubmitReportAction.class,
IssuesAction.class, IssuesAction.class,
UsersAction.class, UsersAction.class,
BatchWs.class); BatchWs.class);
Expand Down
Expand Up @@ -144,7 +144,7 @@ public void bulkUpdateKey(String projectKey, String stringToReplace, String repl
} }
} }


public String create(NewComponent newComponent) { public ComponentDto create(NewComponent newComponent) {
userSession.checkGlobalPermission(GlobalPermissions.PROVISIONING); userSession.checkGlobalPermission(GlobalPermissions.PROVISIONING);


DbSession session = dbClient.openSession(false); DbSession session = dbClient.openSession(false);
Expand Down Expand Up @@ -175,9 +175,9 @@ public String create(NewComponent newComponent) {
dbClient.componentIndexDao().indexResource(session, component.getId()); dbClient.componentIndexDao().indexResource(session, component.getId());
session.commit(); session.commit();


return component.key(); return component;
} finally { } finally {
session.close(); dbClient.closeSession(session);
} }
} }


Expand Down
Expand Up @@ -73,12 +73,15 @@ public Long createComponent(String key, String name, String qualifier) {
public Long createComponent(String key, @Nullable String branch, String name, @Nullable String qualifier) { public Long createComponent(String key, @Nullable String branch, String name, @Nullable String qualifier) {
// Sub view should not be created with provisioning. Will be fixed by http://jira.sonarsource.com/browse/VIEWS-296 // Sub view should not be created with provisioning. Will be fixed by http://jira.sonarsource.com/browse/VIEWS-296
if (!Qualifiers.SUBVIEW.equals(qualifier)) { if (!Qualifiers.SUBVIEW.equals(qualifier)) {
String createdKey = componentService.create(NewComponent.create(key, name).setQualifier(qualifier).setBranch(branch)); ComponentDto componentDto = componentService.create(NewComponent.create(key, name).setQualifier(qualifier).setBranch(branch));
ComponentDto component = (ComponentDto) resourceDao.selectByKey(createdKey); if (componentDto == null) {
throw new BadRequestException(String.format("Component not created: %s", key));
}
ComponentDto component = (ComponentDto) resourceDao.selectByKey(componentDto.getKey());
if (component == null) { if (component == null) {
throw new BadRequestException(String.format("Component not created: %s", createdKey)); throw new BadRequestException(String.format("Component not created: %s", key));
} }
permissionService.applyDefaultPermissionTemplate(createdKey); permissionService.applyDefaultPermissionTemplate(component.getKey());
return component.getId(); return component.getId();
} }
return null; return null;
Expand Down
@@ -0,0 +1,230 @@
/*
* SonarQube, open source software quality management tool.
* Copyright (C) 2008-2014 SonarSource
* mailto:contact AT sonarsource DOT com
*
* SonarQube is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* SonarQube is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package org.sonar.server.computation;

import com.google.common.base.Optional;
import org.sonar.api.server.ServerSide;
import org.sonar.api.utils.System2;
import org.sonar.core.util.UuidFactory;
import org.sonar.db.DbClient;
import org.sonar.db.DbSession;
import org.sonar.db.ce.CeActivityDto;
import org.sonar.db.ce.CeQueueDto;
import org.sonar.server.computation.monitoring.CEQueueStatus;

import static java.lang.String.format;

/**
* Queue of pending Compute Engine tasks. Both producer and consumer actions
* are implemented.
* <p>
* This class is decoupled from the regular task type {@link org.sonar.db.ce.CeTaskTypes#REPORT}.
* </p>
*/
@ServerSide
public class CeQueue {

private final System2 system2;
private final DbClient dbClient;
private final UuidFactory uuidFactory;
private final CEQueueStatus queueStatus;
private final CeQueueListener[] listeners;

// state
private boolean submitPaused = false;
private boolean peekPaused = false;

public CeQueue(System2 system2, DbClient dbClient, UuidFactory uuidFactory,
CEQueueStatus queueStatus, CeQueueListener[] listeners) {
this.system2 = system2;
this.dbClient = dbClient;
this.uuidFactory = uuidFactory;
this.queueStatus = queueStatus;
this.listeners = listeners;
}

public CeTaskSubmit prepareSubmit() {
return new CeTaskSubmit(uuidFactory.create());
}

public CeTask submit(CeTaskSubmit submit) {
if (submitPaused) {
throw new IllegalStateException("Compute Engine does not currently accept new tasks");
}
CeTask task = new CeTask(submit);
DbSession dbSession = dbClient.openSession(false);
try {
CeQueueDto dto = new CeQueueDto();
dto.setUuid(task.getUuid());
dto.setTaskType(task.getType());
dto.setComponentUuid(task.getComponentUuid());
dto.setStatus(CeQueueDto.Status.PENDING);
dto.setSubmitterLogin(task.getSubmitterLogin());
dto.setStartedAt(null);
dbClient.ceQueueDao().insert(dbSession, dto);
dbSession.commit();
queueStatus.addReceived();
return task;
} finally {
dbClient.closeSession(dbSession);
}
}

public Optional<CeTask> peek() {
if (peekPaused) {
return Optional.absent();
}
DbSession dbSession = dbClient.openSession(false);
try {
Optional<CeQueueDto> dto = dbClient.ceQueueDao().peek(dbSession);
if (!dto.isPresent()) {
return Optional.absent();
}
queueStatus.addInProgress();
return Optional.of(new CeTask(dto.get()));

} finally {
dbClient.closeSession(dbSession);
}
}

public boolean cancel(String taskUuid) {
DbSession dbSession = dbClient.openSession(false);
try {
Optional<CeQueueDto> queueDto = dbClient.ceQueueDao().selectByUuid(dbSession, taskUuid);
if (queueDto.isPresent()) {
if (!queueDto.get().getStatus().equals(CeQueueDto.Status.PENDING)) {
throw new IllegalStateException(String.format("Task is in progress and can't be cancelled [uuid=%s]", taskUuid));
}
cancel(dbSession, queueDto.get());
return true;
}
return false;
} finally {
dbClient.closeSession(dbSession);
}
}

void cancel(DbSession dbSession, CeQueueDto q) {
CeActivityDto activityDto = new CeActivityDto(q);
activityDto.setStatus(CeActivityDto.Status.CANCELED);
remove(dbSession, new CeTask(q), q, activityDto);
}


/**
* Removes all the tasks from the queue, whatever their status. They are marked
* as {@link org.sonar.db.ce.CeActivityDto.Status#CANCELED} in past activity.
* This method can NOT be called when workers are being executed, as in progress
* tasks can't be killed.
*
* @return the number of canceled tasks
*/
public int clear() {
return cancelAll(true);
}

/**
* Similar as {@link #clear()}, except that the tasks with status
* {@link org.sonar.db.ce.CeQueueDto.Status#IN_PROGRESS} are ignored. This method
* can be called at runtime, even if workers are being executed.
*
* @return the number of canceled tasks
*/
public int cancelAll() {
return cancelAll(false);
}

private int cancelAll(boolean includeInProgress) {
int count = 0;
DbSession dbSession = dbClient.openSession(false);
try {
for (CeQueueDto queueDto : dbClient.ceQueueDao().selectAllInAscOrder(dbSession)) {
if (includeInProgress || !queueDto.getStatus().equals(CeQueueDto.Status.IN_PROGRESS)) {
cancel(dbSession, queueDto);
count++;
}
}
return count;
} finally {
dbClient.closeSession(dbSession);
}
}

public void remove(CeTask task, CeActivityDto.Status status) {
DbSession dbSession = dbClient.openSession(false);
try {
Optional<CeQueueDto> queueDto = dbClient.ceQueueDao().selectByUuid(dbSession, task.getUuid());
if (!queueDto.isPresent()) {
throw new IllegalStateException(format("Task does not exist anymore: %s", task));
}
CeActivityDto activityDto = new CeActivityDto(queueDto.get());
activityDto.setStatus(status);
Long startedAt = activityDto.getStartedAt();
if (startedAt != null) {
activityDto.setFinishedAt(system2.now());
long executionTime = activityDto.getFinishedAt() - startedAt;
activityDto.setExecutionTimeMs(executionTime);
if (status == CeActivityDto.Status.SUCCESS) {
queueStatus.addSuccess(executionTime);
} else {
queueStatus.addError(executionTime);
}
}
remove(dbSession, task, queueDto.get(), activityDto);

} finally {
dbClient.closeSession(dbSession);
}
}

private void remove(DbSession dbSession, CeTask task, CeQueueDto queueDto, CeActivityDto activityDto) {
dbClient.ceActivityDao().insert(dbSession, activityDto);
dbClient.ceQueueDao().deleteByUuid(dbSession, queueDto.getUuid());
dbSession.commit();
for (CeQueueListener listener : listeners) {
listener.onRemoved(task, activityDto.getStatus());
}
}

public void pauseSubmit() {
this.submitPaused = true;
}

public void resumeSubmit() {
this.submitPaused = false;
}

public boolean isSubmitPaused() {
return submitPaused;
}

public void pausePeek() {
this.peekPaused = true;
}

public void resumePeek() {
this.peekPaused = false;
}

public boolean isPeekPaused() {
return peekPaused;
}
}
@@ -0,0 +1,110 @@
/*
* SonarQube, open source software quality management tool.
* Copyright (C) 2008-2014 SonarSource
* mailto:contact AT sonarsource DOT com
*
* SonarQube is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* SonarQube is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package org.sonar.server.computation;

import java.util.HashSet;
import java.util.Set;
import org.sonar.api.platform.ServerUpgradeStatus;
import org.sonar.api.server.ServerSide;
import org.sonar.api.utils.log.Logger;
import org.sonar.api.utils.log.Loggers;
import org.sonar.db.DbClient;
import org.sonar.db.DbSession;
import org.sonar.db.ce.CeQueueDto;
import org.sonar.db.ce.CeTaskTypes;
import org.sonar.server.computation.monitoring.CEQueueStatus;

/**
* Cleans-up the Compute Engine queue and resets the JMX counters.
* CE workers must not be started before execution of this class.
*/
@ServerSide
public class CeQueueInitializer {

private static final Logger LOGGER = Loggers.get(CeQueueInitializer.class);

private final DbClient dbClient;
private final ServerUpgradeStatus serverUpgradeStatus;
private final ReportFiles reportFiles;
private final CeQueue queue;
private final CEQueueStatus queueStatus;

public CeQueueInitializer(DbClient dbClient, ServerUpgradeStatus serverUpgradeStatus, ReportFiles reportFiles,
CeQueue queue, CEQueueStatus queueStatus) {
this.dbClient = dbClient;
this.serverUpgradeStatus = serverUpgradeStatus;
this.reportFiles = reportFiles;
this.queue = queue;
this.queueStatus = queueStatus;
}

/**
* Do not rename. Used at server startup.
*/
public void start() {
DbSession dbSession = dbClient.openSession(false);
try {
initJmxCounters(dbSession);

if (serverUpgradeStatus.isUpgraded()) {
cleanOnUpgrade();
} else {
verifyConsistency(dbSession);
}

} finally {
dbClient.closeSession(dbSession);
}
}

private void cleanOnUpgrade() {
// we assume that pending tasks are not compatible with the new version
// and can't be processed
LOGGER.info("Cancel all pending tasks (due to upgrade)");
queue.clear();
}

private void verifyConsistency(DbSession dbSession) {
// server is not being upgraded
dbClient.ceQueueDao().resetAllToPendingStatus(dbSession);
dbSession.commit();

// verify that the report files are available for the tasks in queue
Set<String> uuidsInQueue = new HashSet<>();
for (CeQueueDto queueDto : dbClient.ceQueueDao().selectAllInAscOrder(dbSession)) {
uuidsInQueue.add(queueDto.getUuid());
if (CeTaskTypes.REPORT.equals(queueDto.getTaskType()) && !reportFiles.fileForUuid(queueDto.getUuid()).exists()) {
// the report is not available on file system
queue.cancel(dbSession, queueDto);
}
}

// clean-up filesystem
for (String uuid : reportFiles.listUuids()) {
if (!uuidsInQueue.contains(uuid)) {
reportFiles.deleteIfExists(uuid);
}
}
}

private void initJmxCounters(DbSession dbSession) {
queueStatus.initPendingCount(dbClient.ceQueueDao().countAll(dbSession));
}
}

0 comments on commit 024672b

Please sign in to comment.