Skip to content

Commit

Permalink
basic task queue and consumer implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
gmlove committed Jul 1, 2022
1 parent d261bb1 commit 47c332f
Show file tree
Hide file tree
Showing 14 changed files with 449 additions and 67 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Expand Up @@ -15,6 +15,8 @@ repositories {
dependencies {
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
compileOnly 'org.mapstruct:mapstruct:1.5.1.Final'
annotationProcessor 'org.mapstruct:mapstruct-processor:1.5.1.Final'

implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
implementation 'org.springframework.boot:spring-boot-starter-web'
Expand Down
103 changes: 103 additions & 0 deletions src/main/java/com/brightliao/taskqueue/JpaTaskRepository.java
@@ -0,0 +1,103 @@
package com.brightliao.taskqueue;

import com.brightliao.taskqueue.Task.TaskStatus;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import org.mapstruct.Mapper;
import org.mapstruct.MappingConstants;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
import org.springframework.stereotype.Service;

import java.time.LocalDateTime;
import java.util.List;
import java.util.stream.Collectors;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.EntityManager;
import javax.persistence.EnumType;
import javax.persistence.Enumerated;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.LockModeType;
import javax.persistence.PersistenceContext;

@RequiredArgsConstructor
@Service
public class JpaTaskRepository implements TaskRepository {

@Autowired
private final TaskEntityMapper entityMapper;
@Autowired
private final InnerJpaTaskRepository taskRepository;

private EntityManager entityManager;

@PersistenceContext
public final void setEntityManager(EntityManager entityManager) {
this.entityManager = entityManager;
}

@Override
public List<Task> findNewTasks(int maxCount) {
var query = entityManager.createQuery(
"from JpaTaskRepository$TaskEntity "
+ "where status = :status "
+ "order by createdAt asc",
TaskEntity.class);
query.setParameter("status", TaskStatus.PENDING);
query.setLockMode(LockModeType.PESSIMISTIC_WRITE);
query.setMaxResults(maxCount);
return query.getResultList().stream()
.map(entityMapper::toTask)
.collect(Collectors.toList());
}

@Override
public Task save(Task task) {
return entityMapper.toTask(taskRepository.save(entityMapper.fromTask(task)));
}

@Override
public List<Task> saveAll(List<Task> tasks) {
final List<TaskEntity> taskEntities = tasks.stream().map(entityMapper::fromTask).collect(Collectors.toList());
return taskRepository.saveAll(taskEntities).stream()
.map(entityMapper::toTask)
.collect(Collectors.toList());
}

@Repository
public interface InnerJpaTaskRepository extends JpaRepository<TaskEntity, Long> {
}

@Mapper(componentModel = MappingConstants.ComponentModel.SPRING)
public interface TaskEntityMapper {

TaskEntity fromTask(Task task);

Task toTask(TaskEntity task);

}

@Entity
@Data
public static class TaskEntity {

@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String taskType;
@Column(columnDefinition = "TEXT")
private String taskArg;
@Enumerated(EnumType.STRING)
private TaskStatus status;
@Column(columnDefinition = "TEXT")
private String message;
private LocalDateTime createdAt;
private LocalDateTime startedAt;
private LocalDateTime runAt;
private LocalDateTime endedAt;
}
}
78 changes: 77 additions & 1 deletion src/main/java/com/brightliao/taskqueue/Task.java
@@ -1,8 +1,84 @@
package com.brightliao.taskqueue;

import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

import java.time.LocalDateTime;

@EqualsAndHashCode
@AllArgsConstructor
@NoArgsConstructor
@Setter
@Getter
public class Task {

private Long id;
private String taskType;
private String taskArg;
private TaskStatus status;
private String message;
private LocalDateTime createdAt;
private LocalDateTime startedAt;
private LocalDateTime runAt;
private LocalDateTime endedAt;

public Task(String taskType, String taskArg) {
this(null, taskType, taskArg, TaskStatus.PENDING);
}

public Task(Long id, String taskType, String taskArg, TaskStatus status) {
this.id = id;
this.taskType = taskType;
this.taskArg = taskArg;
this.status = status;
this.createdAt = LocalDateTime.now();
}

public boolean isSucceeded() {
return false;
return status == TaskStatus.SUCCEEDED;
}

public String getType() {
return taskType;
}

public String getArg() {
return taskArg;
}

public void markRunning() {
this.status = TaskStatus.RUNNING;
this.runAt = LocalDateTime.now();
}

public void markStarted() {
this.status = TaskStatus.STARTED;
this.startedAt = LocalDateTime.now();
}

public void markSucceeded() {
this.status = TaskStatus.SUCCEEDED;
this.endedAt = LocalDateTime.now();
}

public void markFailed(Exception e) {
this.status = TaskStatus.FAILED;
this.endedAt = LocalDateTime.now();
this.message = this.message == null ? e.getMessage() : this.message + "\n" + e.getMessage();
}

public Long getId() {
return id;
}

public enum TaskStatus {
PENDING,
STARTED,
RUNNING,
SUCCEEDED,
FAILED
}
}
69 changes: 67 additions & 2 deletions src/main/java/com/brightliao/taskqueue/TaskQueue.java
@@ -1,11 +1,76 @@
package com.brightliao.taskqueue;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.support.TransactionTemplate;

import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;

@Slf4j
@Component
@RequiredArgsConstructor
public class TaskQueue {

public TaskQueue(TaskRepository taskRepository, int tasksToFetchPerTime) {
private final TaskRepository taskRepository;
private final TransactionTemplate transactionTemplate;
private final ObjectMapper objectMapper;
private ConcurrentLinkedDeque<Runnable> newTaskListeners = new ConcurrentLinkedDeque<>();

public <T> void addTask(String taskType, T taskArg) {
transactionTemplate.executeWithoutResult(status -> {
try {
taskRepository.save(new Task(taskType, objectMapper.writer().writeValueAsString(taskArg)));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
});
newTaskListeners.forEach(listener -> {
try {
listener.run();
} catch (Exception e) {
log.error("run new task listener failed.", e);
}
});
}

public List<Task> popTasks(int tasksToFetchPerTime) {
return transactionTemplate.execute(status -> {
var tasks = taskRepository.findNewTasks(tasksToFetchPerTime);
if (tasks.isEmpty()) {
return tasks;
}
tasks.forEach(Task::markStarted);
tasks = taskRepository.saveAll(tasks);
return tasks;
});
}

public <T> void addTask(String taskType, T taskArg) {
public void markSucceeded(Task task) {
transactionTemplate.executeWithoutResult(status -> {
task.markSucceeded();
taskRepository.save(task);
});
}

public void markFailed(Task task, Exception e) {
transactionTemplate.executeWithoutResult(status -> {
task.markFailed(e);
taskRepository.save(task);
});
}

public void onNewTask(Runnable listener) {
this.newTaskListeners.add(listener);
}

public void markStarted(Task task) {
transactionTemplate.executeWithoutResult(status -> {
task.markStarted();
taskRepository.save(task);
});
}
}
Expand Up @@ -2,8 +2,12 @@

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableJpaRepositories(considerNestedRepositories = true)
@EnableScheduling
public class TaskQueueApplication {

public static void main(String[] args) {
Expand Down
84 changes: 84 additions & 0 deletions src/main/java/com/brightliao/taskqueue/TaskQueueConsumer.java
@@ -0,0 +1,84 @@
package com.brightliao.taskqueue;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

@Slf4j
@Component
public class TaskQueueConsumer implements InitializingBean {

private final TaskQueue queue;
private final int tasksToFetchPerTime;
private final Map<String, TaskRunnable> registeredTasks = new HashMap<>();
private final Object consumerThreadCoordinator = new Object();
private boolean isStopping = false;
private Thread consumerThread;

public TaskQueueConsumer(TaskQueue queue, @Value("${task.tasksToFetchPerTime}") int tasksToFetchPerTime) {
this.queue = queue;
this.tasksToFetchPerTime = tasksToFetchPerTime;
queue.onNewTask(this::notifyNewTask);
}

public void registerTask(String taskType, TaskRunnable taskRunnable) {
if (registeredTasks.containsKey(taskType)) {
throw new RuntimeException("task has been registered already: " + taskType);
}
registeredTasks.put(taskType, taskRunnable);
}

public void start() {
consumerThread = new Thread(() -> {
while (!isStopping) {
log.info("start to find new tasks");
var tasks = queue.popTasks(tasksToFetchPerTime);
if (tasks.isEmpty()) {
try {
log.info("no new tasks found, will wait for next round to fetch tasks.");
synchronized (consumerThreadCoordinator) {
consumerThreadCoordinator.wait();
}
continue;
} catch (InterruptedException e) {
log.warn("Thread interrupted unexpectedly, will continue to run new tasks.", e);
continue;
}
}
log.info("found {} tasks.", tasks.size());
for (Task task : tasks) {
try {
log.info("start to run task {}(id={}).", task.getType(), task.getId());
queue.markStarted(task);
registeredTasks.get(task.getType()).run(task.getArg());
queue.markSucceeded(task);
log.info("run task {}(id={}) succeeded.", task.getType(), task.getId());
} catch (Exception e) {
queue.markFailed(task, e);
log.warn("run task {}(id={}) failed.", task.getType(), task.getId(), e);
}
}
}
});
consumerThread.setDaemon(false);
consumerThread.start();
}

@Scheduled(fixedRate = 10 * 1000)
public void notifyNewTask() {
synchronized (consumerThreadCoordinator) {
log.info("notify consumer of new tasks");
consumerThreadCoordinator.notifyAll();
}
}

@Override
public void afterPropertiesSet() throws Exception {
start();
}
}

0 comments on commit 47c332f

Please sign in to comment.