Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace Worker with a thread pool manager #397

Merged
merged 5 commits into from
Jul 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import org.comixed.service.file.FileService;
import org.comixed.task.model.DeleteComicsWorkerTask;
import org.comixed.task.model.UndeleteComicsWorkerTask;
import org.comixed.task.runner.Worker;
import org.comixed.task.runner.TaskManager;
import org.comixed.utils.FileTypeIdentifier;
import org.comixed.views.View;
import org.comixed.views.View.ComicDetails;
Expand All @@ -75,7 +75,7 @@ public class ComicController {
@Autowired private ScanTypeRepository scanTypeRepository;
@Autowired private ComicFormatRepository comicFormatRepository;
@Autowired private ComicDataAdaptor comicDataAdaptor;
@Autowired private Worker worker;
@Autowired private TaskManager taskManager;
@Autowired private ObjectFactory<DeleteComicsWorkerTask> deleteComicsWorkerTaskFactory;
@Autowired private ObjectFactory<UndeleteComicsWorkerTask> undeleteComicsWorkerTaskObjectFactory;

Expand Down Expand Up @@ -124,7 +124,7 @@ public boolean deleteMultipleComics(@RequestParam("comic_ids") List<Long> comicI
task.setComicIds(comicIds);

log.debug("Queueing the delete task");
this.worker.addTasksToQueue(task);
this.taskManager.runTask(task);

return true;
}
Expand Down Expand Up @@ -418,7 +418,7 @@ public UndeleteMultipleComicsResponse undeleteMultipleComics(

final UndeleteComicsWorkerTask task = this.undeleteComicsWorkerTaskObjectFactory.getObject();
task.setIds(ids);
this.worker.addTasksToQueue(task);
this.taskManager.runTask(task);

return new UndeleteMultipleComicsResponse(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import org.comixed.service.file.FileService;
import org.comixed.task.model.DeleteComicsWorkerTask;
import org.comixed.task.model.UndeleteComicsWorkerTask;
import org.comixed.task.runner.Worker;
import org.comixed.task.runner.TaskManager;
import org.comixed.utils.FileTypeIdentifier;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -78,7 +78,7 @@ public class ComicControllerTest {
@Mock private ComicService comicService;
@Mock private PageCacheService pageCacheService;
@Mock private LastReadDatesRepository lastReadRepository;
@Mock private Worker worker;
@Mock private TaskManager taskManager;
@Mock private List<Comic> comicList;
@Mock private Comic comic;
@Mock private Principal principal;
Expand All @@ -96,7 +96,7 @@ public class ComicControllerTest {
@Mock private Page page;
@Mock private LastReadDate lastReadDate;

private List<Comic> emptyComicList = new ArrayList<>();
private final List<Comic> emptyComicList = new ArrayList<>();

@Test
public void testGetComicsAddedSinceWithComicUpdate() throws ParseException, InterruptedException {
Expand Down Expand Up @@ -320,13 +320,13 @@ public void testRestoreComic() throws ComicException {
public void testDeleteMultipleComics() {
Mockito.when(this.deleteComicsTaskFactory.getObject()).thenReturn(this.deleteComicsTask);
Mockito.doNothing().when(this.deleteComicsTask).setComicIds(Mockito.anyList());
Mockito.doNothing().when(this.worker).addTasksToQueue(Mockito.any());
Mockito.doNothing().when(this.taskManager).runTask(Mockito.any());

assertTrue(controller.deleteMultipleComics(this.comicIds));

Mockito.verify(this.deleteComicsTaskFactory, Mockito.times(1)).getObject();
Mockito.verify(this.deleteComicsTask, Mockito.times(1)).setComicIds(this.comicIds);
Mockito.verify(this.worker, Mockito.times(1)).addTasksToQueue(this.deleteComicsTask);
Mockito.verify(this.taskManager, Mockito.times(1)).runTask(this.deleteComicsTask);
}

@Test
Expand All @@ -341,7 +341,7 @@ public void testUndeleteMultipleComics() {
assertTrue(result.isSuccess());

Mockito.verify(undeleteComicsWorkerTask, Mockito.times(1)).setIds(this.comicIds);
Mockito.verify(worker, Mockito.times(1)).addTasksToQueue(undeleteComicsWorkerTask);
Mockito.verify(taskManager, Mockito.times(1)).runTask(undeleteComicsWorkerTask);
}

@Test
Expand Down Expand Up @@ -540,7 +540,6 @@ public void testGetCoverImageForProcessedComic()
Mockito.verify(page, Mockito.times(1)).getContent();
Mockito.verify(pageCacheService, Mockito.times(1))
.saveByHash(TEST_PAGE_HASH, TEST_PAGE_CONTENT);
;
Mockito.verify(fileTypeIdentifier, Mockito.times(1)).typeFor(inputStreamCaptor.getValue());
Mockito.verify(fileTypeIdentifier, Mockito.times(1)).subtypeFor(inputStreamCaptor.getValue());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.comixed.task.TaskException;
import org.comixed.task.adaptors.TaskAdaptor;
import org.comixed.task.encoders.RescanComicTaskEncoder;
import org.comixed.task.runner.Worker;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.stereotype.Service;
Expand All @@ -50,7 +49,6 @@ public class ComicService {
@Autowired private LastReadDatesRepository lastReadDatesRepository;
@Autowired private TaskService taskService;
@Autowired private ComiXedUserRepository userRepository;
@Autowired private Worker worker;
@Autowired private TaskAdaptor taskAdaptor;

public List<Comic> getComicsUpdatedSince(final long timestamp, final int maximumResults) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.comixed.repositories.comic.ComicRepository;
import org.comixed.service.task.TaskService;
import org.comixed.task.model.QueueComicsWorkerTask;
import org.comixed.task.runner.Worker;
import org.comixed.task.runner.TaskManager;
import org.comixed.utils.ComicFileUtils;
import org.springframework.beans.factory.ObjectFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -46,7 +46,7 @@
public class FileService {
@Autowired private ComicFileHandler comicFileHandler;
@Autowired private ComicRepository comicRepository;
@Autowired private Worker worker;
@Autowired private TaskManager taskManager;
@Autowired private ObjectFactory<QueueComicsWorkerTask> taskFactory;
@Autowired private TaskService taskService;

Expand Down Expand Up @@ -153,7 +153,7 @@ public int importComicFiles(
task.setIgnoreMetadata(ignoreMetadata);

log.debug("Adding import task to queue");
this.worker.addTasksToQueue(task);
this.taskManager.runTask(task);

return filenames.length;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.comixed.service.user.UserService;
import org.comixed.task.model.ConvertComicsWorkerTask;
import org.comixed.task.model.MoveComicsWorkerTask;
import org.comixed.task.runner.Worker;
import org.comixed.task.runner.TaskManager;
import org.comixed.utils.Utils;
import org.springframework.beans.factory.ObjectFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -54,7 +54,7 @@ public class LibraryService {
@Autowired private LastReadDatesRepository lastReadDateRepository;
@Autowired private ReadingListService readingListService;
@Autowired private ObjectFactory<ConvertComicsWorkerTask> convertComicsWorkerTaskObjectFactory;
@Autowired private Worker worker;
@Autowired private TaskManager taskManager;
@Autowired private Utils utils;
@Autowired private PageCacheService pageCacheService;
@Autowired private ObjectFactory<MoveComicsWorkerTask> moveComicsTaskObjectFactory;
Expand Down Expand Up @@ -129,7 +129,7 @@ public void convertComics(
task.setRenamePages(renamePages);

log.debug("Queueing save comics worker task");
this.worker.addTasksToQueue(task);
this.taskManager.runTask(task);
}

@Transactional
Expand Down Expand Up @@ -182,6 +182,6 @@ public void moveComics(Boolean deletePhysicalFiles, String directory, String ren
log.debug("Setting renaming rule: {}", renamingRule);
task.setRenamingRule(renamingRule);
log.debug("Enqueuing task");
this.worker.addTasksToQueue(task);
this.taskManager.runTask(task);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.comixed.task.adaptors.TaskAdaptor;
import org.comixed.task.encoders.RescanComicTaskEncoder;
import org.comixed.task.model.RescanComicWorkerTask;
import org.comixed.task.runner.Worker;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -82,7 +81,6 @@ public class ComicServiceTest {
@Mock private Comic comicListEntry;
@Mock private Comic comic;
@Mock private Comic incomingComic;
@Mock private Worker worker;
@Mock private ComiXedUser user;
@Mock private List<LastReadDate> listLastReadDate;
@Captor private ArgumentCaptor<Date> lastUpdatedDateCaptor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.comixed.repositories.comic.ComicRepository;
import org.comixed.service.task.TaskService;
import org.comixed.task.model.QueueComicsWorkerTask;
import org.comixed.task.runner.Worker;
import org.comixed.task.runner.TaskManager;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
Expand Down Expand Up @@ -65,10 +65,10 @@ public class FileServiceTest {
@Mock private ArchiveAdaptor archiveAdaptor;
@Mock private ComicRepository comicRepository;
@Mock private Comic comic;
@Mock private Worker worker;
@Mock private TaskService taskService;
@Mock private ObjectFactory<QueueComicsWorkerTask> taskFactory;
@Mock private QueueComicsWorkerTask queueComicsWorkerTask;
@Mock private TaskManager taskManager;

@Test
public void testGetImportFileCoverWithNoHandler()
Expand Down Expand Up @@ -184,7 +184,7 @@ public void testImportComicFilesDeleteBlockedPages() throws UnsupportedEncodingE

Mockito.verify(taskFactory, Mockito.times(1)).getObject();
Mockito.verify(queueComicsWorkerTask, Mockito.times(1)).setDeleteBlockedPages(true);
Mockito.verify(worker, Mockito.times(1)).addTasksToQueue(queueComicsWorkerTask);
Mockito.verify(taskManager, Mockito.times(1)).runTask(queueComicsWorkerTask);
}

@Test
Expand All @@ -198,7 +198,7 @@ public void testImportComicFilesIgnoreComicInfoXmlFile() throws UnsupportedEncod

Mockito.verify(taskFactory, Mockito.times(1)).getObject();
Mockito.verify(queueComicsWorkerTask, Mockito.times(1)).setIgnoreMetadata(true);
Mockito.verify(worker, Mockito.timeout(1)).addTasksToQueue(queueComicsWorkerTask);
Mockito.verify(taskManager, Mockito.timeout(1)).runTask(queueComicsWorkerTask);
}

@Test
Expand All @@ -210,6 +210,6 @@ public void testImportComicFiles() throws UnsupportedEncodingException {
assertEquals(TEST_FILENAMES.length, result);

Mockito.verify(taskFactory, Mockito.times(1)).getObject();
Mockito.verify(worker, Mockito.times(1)).addTasksToQueue(queueComicsWorkerTask);
Mockito.verify(taskManager, Mockito.times(1)).runTask(queueComicsWorkerTask);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.comixed.task.model.ConvertComicsWorkerTask;
import org.comixed.task.model.MoveComicsWorkerTask;
import org.comixed.task.model.WorkerTask;
import org.comixed.task.runner.Worker;
import org.comixed.task.runner.TaskManager;
import org.comixed.utils.Utils;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -62,7 +62,6 @@ public class LibraryServiceTest {
@Mock private ConvertComicsWorkerTask convertComicsWorkerTask;
@Mock private Comic comic;
@Captor private ArgumentCaptor<List<Comic>> comicListArgumentCaptor;
@Mock private Worker worker;
@Mock private File file;
@Mock private Utils utils;
@Mock private List<LastReadDate> lastReadDateList;
Expand All @@ -71,6 +70,7 @@ public class LibraryServiceTest {
@Mock private PageCacheService pageCacheService;
@Mock private ObjectFactory<MoveComicsWorkerTask> moveComicsTaskObjectFactory;
@Mock private MoveComicsWorkerTask moveComicsWorkerTask;
@Mock private TaskManager taskManager;

private List<Comic> comicList = new ArrayList<>();
private Comic comic1 = new Comic();
Expand Down Expand Up @@ -163,7 +163,7 @@ public void testConvertComicArchiving() {
Mockito.verify(convertComicsWorkerTask, Mockito.times(1))
.setTargetArchiveType(TEST_ARCHIVE_TYPE);
Mockito.verify(convertComicsWorkerTask, Mockito.times(1)).setRenamePages(TEST_RENAME_PAGES);
Mockito.verify(worker, Mockito.times(1)).addTasksToQueue(convertComicsWorkerTask);
Mockito.verify(taskManager, Mockito.times(1)).runTask(convertComicsWorkerTask);
}

@Test
Expand Down Expand Up @@ -244,12 +244,12 @@ public void testClearImageCacheError() throws LibraryException, IOException {
@Test
public void testMoveComics() {
Mockito.when(moveComicsTaskObjectFactory.getObject()).thenReturn(moveComicsWorkerTask);
Mockito.doNothing().when(worker).addTasksToQueue(Mockito.any(WorkerTask.class));
Mockito.doNothing().when(taskManager).runTask(Mockito.any(WorkerTask.class));

libraryService.moveComics(TEST_DELETE_PHYSICAL_FILES, TEST_DIRECTORY, TEST_RENAMING_RULES);

Mockito.verify(moveComicsWorkerTask, Mockito.times(1)).setDirectory(TEST_DIRECTORY);
Mockito.verify(moveComicsWorkerTask, Mockito.times(1)).setRenamingRule(TEST_RENAMING_RULES);
Mockito.verify(worker, Mockito.times(1)).addTasksToQueue(moveComicsWorkerTask);
Mockito.verify(taskManager, Mockito.times(1)).runTask(moveComicsWorkerTask);
}
}
6 changes: 0 additions & 6 deletions comixed-tasks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,5 @@
<artifactId>comixed-library</artifactId>
<version>${comixed.version}</version>
</dependency>
<dependency>
<groupId>net.jodah</groupId>
<artifactId>concurrentunit</artifactId>
<version>0.4.2</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,7 @@ public String getDescription() {
* @return the description
*/
protected abstract String createDescription();

@Override
public void afterExecution() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* ComiXed - A digital comic book library management application.
* Copyright (C) 2020, The ComiXed Project
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses>
*/

package org.comixed.task.model;

import java.util.List;
import lombok.extern.log4j.Log4j2;
import org.comixed.model.tasks.Task;
import org.comixed.task.TaskException;
import org.comixed.task.adaptors.TaskAdaptor;
import org.comixed.task.encoders.TaskEncoder;
import org.comixed.task.runner.TaskManager;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
* <code>MonitorTaskQueue</code> is a task that continuous runs,looking for other tasks that have
* been queued. When it finds them it loads them, decodes them and adds them to the {@link
* TaskManager} for execution.
*
* @author Darryl L. Pierce
*/
@Component
@Log4j2
public class MonitorTaskQueue extends AbstractWorkerTask implements InitializingBean {
@Autowired private TaskManager taskManager;
@Autowired private TaskAdaptor taskAdaptor;

@Override
protected String createDescription() {
return "Task queue monitor";
}

@Override
public void afterPropertiesSet() throws Exception {
log.debug("Scheduling task queue monitor");
this.taskManager.runTask(this);
}

@Override
public void startTask() throws WorkerTaskException {
log.debug("Checking queue for waiting tasks");
final List<Task> taskQueue = this.taskAdaptor.getNextTask();
for (int index = 0; index < taskQueue.size(); index++) {
Task task = taskQueue.get(index);
log.debug("Rehydrating queued task");
TaskEncoder<?> decoder;
try {
decoder = this.taskAdaptor.getEncoder(task.getTaskType());
} catch (TaskException error) {
throw new WorkerTaskException("failed to get task decoder", error);
}
WorkerTask nextTask = decoder.decode(task);
log.debug("Passing task to task manager");
this.taskManager.runTask(nextTask);
}
try {
Thread.sleep(1000L);
} catch (InterruptedException error) {
log.error("monitor task queue interrupted", error);
Thread.currentThread().interrupt();
}
}

@Override
public void afterExecution() {
log.debug("Rescheduling task queue monitor");
this.taskManager.runTask(this);
}
}