Skip to content

Commit

Permalink
Merge 9bb4450 into dee0592
Browse files Browse the repository at this point in the history
  • Loading branch information
ttiurani committed Dec 22, 2020
2 parents dee0592 + 9bb4450 commit 3fa4e72
Show file tree
Hide file tree
Showing 11 changed files with 177 additions and 61 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

**Highlights**

- `nflow-rest-api-spring-web` and `nflow-netty`
- Change REST API calls to use a dedicated thread pool for all blocking database operations to avoid blocking the netty EventLoop thread.

**Details**

- `nflow-engine`
Expand Down Expand Up @@ -31,6 +34,9 @@
- http-proxy 1.18.1
- ini 1.3.7
- bl 1.2.3
- nflow-rest-api-spring-web
- Change deendency from spring-web to spring-webflux to be able to use Project Reactor's types.
- Introduce a thread pool in SchedulerService and wrap all blocking database calls in the REST API to it.

## 7.2.0 (2020-04-27)

Expand Down
28 changes: 24 additions & 4 deletions nflow-netty/src/test/java/io/nflow/netty/StartNflowTest.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package io.nflow.netty;

import static io.nflow.rest.v1.ResourcePaths.NFLOW_WORKFLOW_DEFINITION_PATH;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.springframework.http.HttpStatus.OK;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -10,6 +13,10 @@
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ApplicationContextEvent;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;

import com.fasterxml.jackson.databind.JsonNode;

public class StartNflowTest {

Expand All @@ -28,18 +35,31 @@ public void startNflowNetty() throws Exception {
.registerSpringClasspathPropertySource("external.properties")
.registerSpringApplicationListener(testListener);
Map<String, Object> properties = new HashMap<>();
properties.put("nflow.db.create_on_startup", false);
String restApiPrefix = "nflow/api";
properties.put("nflow.db.create_on_startup", true);
properties.put("nflow.autostart", false);
properties.put("nflow.autoinit", false);
properties.put("nflow.autoinit", true);
properties.put("nflow.rest.path.prefix", restApiPrefix);

ApplicationContext ctx = startNflow.startNetty(7500, "local", "", properties);

assertNotNull(testListener.applicationContextEvent);
assertEquals("7500", ctx.getEnvironment().getProperty("port"));
assertEquals("local", ctx.getEnvironment().getProperty("env"));
assertEquals("externallyDefinedExecutorGroup", ctx.getEnvironment().getProperty("nflow.executor.group"));
assertEquals("false", ctx.getEnvironment().getProperty("nflow.db.create_on_startup"));
assertEquals("true", ctx.getEnvironment().getProperty("nflow.db.create_on_startup"));
assertEquals("false", ctx.getEnvironment().getProperty("nflow.autostart"));
assertEquals("false", ctx.getEnvironment().getProperty("nflow.autoinit"));
assertEquals("true", ctx.getEnvironment().getProperty("nflow.autoinit"));

smokeTestRestApi(restApiPrefix);
}

private void smokeTestRestApi(String restApiPrefix) {
WebClient client = WebClient.builder().baseUrl("http://localhost:7500").build();
ClientResponse response = client.get().uri(restApiPrefix + NFLOW_WORKFLOW_DEFINITION_PATH).exchange().block();
assertEquals(OK, response.statusCode());
JsonNode responseBody = response.bodyToMono(JsonNode.class).block();
assertTrue(responseBody.isArray());
}

}
2 changes: 1 addition & 1 deletion nflow-rest-api-spring-web/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<artifactId>spring-webflux</artifactId>
</dependency>
<dependency>
<groupId>javax.validation</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package io.nflow.rest.config.springweb;

import static java.lang.Math.max;
import static org.slf4j.LoggerFactory.getLogger;
import static reactor.core.publisher.Mono.fromCallable;
import static reactor.core.scheduler.Schedulers.fromExecutor;

import java.util.concurrent.Callable;
import java.util.concurrent.Executors;

import javax.inject.Inject;

import org.slf4j.Logger;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service;

import io.nflow.engine.internal.executor.WorkflowInstanceExecutor;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

/**
* Service to hold a Webflux Scheduler in order to make blocking calls.
*/
@Service
public class SchedulerService {

private static final Logger logger = getLogger(SchedulerService.class);
private final Scheduler scheduler;

@Inject
public SchedulerService(WorkflowInstanceExecutor workflowInstanceExecutor, Environment env) {
int dbPoolSize = env.getProperty("nflow.db.max_pool_size", Integer.class);
int dispatcherCount = workflowInstanceExecutor.getThreadCount();
int threadPoolSize = max(dbPoolSize - dispatcherCount, 2);
logger.info("Initializing REST API thread pool size to {}", threadPoolSize);
this.scheduler = fromExecutor(Executors.newFixedThreadPool(threadPoolSize));
}

public <T> Mono<T> callAsync(Callable<T> callable) {
return fromCallable(callable).subscribeOn(this.scheduler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE;
import static org.springframework.http.ResponseEntity.ok;

import org.springframework.beans.factory.annotation.Autowired;
import javax.inject.Inject;

import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.PostMapping;
Expand All @@ -16,33 +17,40 @@
import io.nflow.engine.service.MaintenanceConfiguration;
import io.nflow.engine.service.MaintenanceResults;
import io.nflow.engine.service.MaintenanceService;
import io.nflow.rest.config.springweb.SchedulerService;
import io.nflow.rest.v1.converter.MaintenanceConverter;
import io.nflow.rest.v1.msg.MaintenanceRequest;
import io.nflow.rest.v1.msg.MaintenanceResponse;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import reactor.core.publisher.Mono;

@RestController
@RequestMapping(value = NFLOW_SPRING_WEB_PATH_PREFIX + NFLOW_MAINTENANCE_PATH, produces = APPLICATION_JSON_VALUE)
@Api("nFlow maintenance")
@Component
public class MaintenanceResource extends SpringWebResource {

@Autowired
private MaintenanceService maintenanceService;
private final MaintenanceService maintenanceService;
private final MaintenanceConverter converter;

@Autowired
private MaintenanceConverter converter;
@Inject
public MaintenanceResource(SchedulerService scheduler, MaintenanceService maintenanceService, MaintenanceConverter converter) {
super(scheduler);
this.maintenanceService = maintenanceService;
this.converter = converter;
}

@PostMapping(consumes = APPLICATION_JSON_VALUE)
@ApiOperation(value = "Do maintenance on old workflow instances synchronously", response = MaintenanceResponse.class)
public ResponseEntity<?> cleanupWorkflows(
public Mono<ResponseEntity<?>> cleanupWorkflows(
@RequestBody @ApiParam(value = "Parameters for the maintenance process", required = true) MaintenanceRequest request) {
return handleExceptions(() -> {
return handleExceptions(() -> wrapBlocking(() -> {
MaintenanceConfiguration configuration = converter.convert(request);
MaintenanceResults results = maintenanceService.cleanupWorkflows(configuration);
return ok(converter.convert(results));
});
}));
}

}
Original file line number Diff line number Diff line change
@@ -1,21 +1,35 @@
package io.nflow.rest.v1.springweb;

import static org.springframework.http.ResponseEntity.status;
import static reactor.core.publisher.Mono.just;

import java.util.concurrent.Callable;
import java.util.function.Supplier;

import org.springframework.http.ResponseEntity;

import io.nflow.rest.config.springweb.SchedulerService;
import io.nflow.rest.v1.ResourceBase;
import io.nflow.rest.v1.msg.ErrorResponse;
import reactor.core.publisher.Mono;

public abstract class SpringWebResource extends ResourceBase {

protected ResponseEntity<?> handleExceptions(Supplier<ResponseEntity<?>> response) {
private final SchedulerService scheduler;

protected SpringWebResource(SchedulerService scheduler) {
this.scheduler = scheduler;
}

protected Mono<ResponseEntity<?>> wrapBlocking(Callable<ResponseEntity<?>> callable) {
return scheduler.callAsync(callable);
}

protected Mono<ResponseEntity<?>> handleExceptions(Supplier<Mono<ResponseEntity<?>>> response) {
return handleExceptions(response::get, this::toErrorResponse);
}

private ResponseEntity<?> toErrorResponse(int statusCode, ErrorResponse body) {
return status(statusCode).body(body);
private Mono<ResponseEntity<?>> toErrorResponse(int statusCode, ErrorResponse body) {
return just(status(statusCode).body(body));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,39 +17,47 @@
import org.springframework.web.bind.annotation.RestController;

import io.nflow.engine.service.StatisticsService;
import io.nflow.rest.config.springweb.SchedulerService;
import io.nflow.rest.v1.converter.StatisticsConverter;
import io.nflow.rest.v1.msg.StatisticsResponse;
import io.nflow.rest.v1.msg.WorkflowDefinitionStatisticsResponse;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import reactor.core.publisher.Mono;

@RestController
@RequestMapping(value = NFLOW_SPRING_WEB_PATH_PREFIX + NFLOW_STATISTICS_PATH, produces = APPLICATION_JSON_VALUE)
@Api("nFlow statistics")
@Component
public class StatisticsResource extends SpringWebResource {

private final StatisticsService statisticsService;
private final StatisticsConverter statisticsConverter;

@Inject
private StatisticsService statisticsService;
@Inject
private StatisticsConverter statisticsConverter;
public StatisticsResource(SchedulerService scheduler, StatisticsConverter statisticsConverter,
StatisticsService statisticsService) {
super(scheduler);
this.statisticsService = statisticsService;
this.statisticsConverter = statisticsConverter;
}

@GetMapping
@ApiOperation(value = "Get executor group statistics", response = StatisticsResponse.class, notes = "Returns counts of queued and executing workflow instances.")
public ResponseEntity<?> queryStatistics() {
return handleExceptions(() -> ok(statisticsConverter.convert(statisticsService.getStatistics())));
public Mono<ResponseEntity<?>> queryStatistics() {
return handleExceptions(() -> wrapBlocking(() -> ok(statisticsConverter.convert(statisticsService.getStatistics()))));
}

@GetMapping(path = "/workflow/{type}")
@ApiOperation(value = "Get workflow definition statistics", response = WorkflowDefinitionStatisticsResponse.class)
public ResponseEntity<?> getStatistics(
public Mono<ResponseEntity<?>> getStatistics(
@PathVariable("type") @ApiParam(value = "Workflow definition type", required = true) String type,
@RequestParam(value = "createdAfter", required = false) @ApiParam("Include only workflow instances created after given time") DateTime createdAfter,
@RequestParam(value = "createdBefore", required = false) @ApiParam("Include only workflow instances created before given time") DateTime createdBefore,
@RequestParam(value = "modifiedAfter", required = false) @ApiParam("Include only workflow instances modified after given time") DateTime modifiedAfter,
@RequestParam(value = "modifiedBefore", required = false) @ApiParam("Include only workflow instances modified before given time") DateTime modifiedBefore) {
return handleExceptions(() -> ok(statisticsConverter.convert(
statisticsService.getWorkflowDefinitionStatistics(type, createdAfter, createdBefore, modifiedAfter, modifiedBefore))));
return handleExceptions(() -> wrapBlocking(() -> ok(statisticsConverter.convert(
statisticsService.getWorkflowDefinitionStatistics(type, createdAfter, createdBefore, modifiedAfter, modifiedBefore)))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@

import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import javax.inject.Inject;

import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
Expand All @@ -17,11 +18,13 @@

import io.nflow.engine.internal.dao.WorkflowDefinitionDao;
import io.nflow.engine.service.WorkflowDefinitionService;
import io.nflow.rest.config.springweb.SchedulerService;
import io.nflow.rest.v1.converter.ListWorkflowDefinitionConverter;
import io.nflow.rest.v1.msg.ListWorkflowDefinitionResponse;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import reactor.core.publisher.Mono;

@RestController
@RequestMapping(value = NFLOW_SPRING_WEB_PATH_PREFIX + NFLOW_WORKFLOW_DEFINITION_PATH, produces = APPLICATION_JSON_VALUE)
Expand All @@ -33,19 +36,20 @@ public class WorkflowDefinitionResource extends SpringWebResource {
private final ListWorkflowDefinitionConverter converter;
private final WorkflowDefinitionDao workflowDefinitionDao;

@Autowired
public WorkflowDefinitionResource(WorkflowDefinitionService workflowDefinitions, ListWorkflowDefinitionConverter converter,
WorkflowDefinitionDao workflowDefinitionDao) {
@Inject
public WorkflowDefinitionResource(SchedulerService scheduler, WorkflowDefinitionService workflowDefinitions,
ListWorkflowDefinitionConverter converter, WorkflowDefinitionDao workflowDefinitionDao) {
super(scheduler);
this.workflowDefinitions = workflowDefinitions;
this.converter = converter;
this.workflowDefinitionDao = workflowDefinitionDao;
}

@GetMapping
@ApiOperation(value = "List workflow definitions", response = ListWorkflowDefinitionResponse.class, responseContainer = "List", notes = "Returns workflow definition(s): all possible states, transitions between states and other setting metadata. The workflow definition can deployed in nFlow engine or historical workflow definition stored in the database.")
public ResponseEntity<?> listWorkflowDefinitions(
public Mono<ResponseEntity<?>> listWorkflowDefinitions(
@RequestParam(value = "type", defaultValue = "") @ApiParam("Included workflow types") List<String> types) {
return handleExceptions(
() -> ok(super.listWorkflowDefinitions(types, workflowDefinitions, converter, workflowDefinitionDao)));
return handleExceptions(() -> wrapBlocking(
() -> ok(super.listWorkflowDefinitions(types, workflowDefinitions, converter, workflowDefinitionDao))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,21 @@
import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE;
import static org.springframework.http.ResponseEntity.ok;

import org.springframework.beans.factory.annotation.Autowired;
import javax.inject.Inject;

import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import io.nflow.engine.service.WorkflowExecutorService;
import io.nflow.rest.config.springweb.SchedulerService;
import io.nflow.rest.v1.converter.ListWorkflowExecutorConverter;
import io.nflow.rest.v1.msg.ListWorkflowExecutorResponse;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import reactor.core.publisher.Mono;

@RestController
@RequestMapping(value = NFLOW_SPRING_WEB_PATH_PREFIX + NFLOW_WORKFLOW_EXECUTOR_PATH, produces = APPLICATION_JSON_VALUE)
Expand All @@ -28,16 +31,18 @@ public class WorkflowExecutorResource extends SpringWebResource {
private final WorkflowExecutorService workflowExecutors;
private final ListWorkflowExecutorConverter converter;

@Autowired
public WorkflowExecutorResource(WorkflowExecutorService workflowExecutors, ListWorkflowExecutorConverter converter) {
@Inject
public WorkflowExecutorResource(SchedulerService scheduler, WorkflowExecutorService workflowExecutors,
ListWorkflowExecutorConverter converter) {
super(scheduler);
this.workflowExecutors = workflowExecutors;
this.converter = converter;
}

@GetMapping
@ApiOperation(value = "List workflow executors", response = ListWorkflowExecutorResponse.class, responseContainer = "List")
public ResponseEntity<?> listWorkflowExecutors() {
return handleExceptions(
() -> ok(workflowExecutors.getWorkflowExecutors().stream().map(converter::convert).collect(toList())));
public Mono<ResponseEntity<?>> listWorkflowExecutors() {
return handleExceptions(() -> wrapBlocking(
() -> ok(workflowExecutors.getWorkflowExecutors().stream().map(converter::convert).collect(toList()))));
}
}
Loading

0 comments on commit 3fa4e72

Please sign in to comment.