diff --git a/.gitignore b/.gitignore
index e231e612f..0c54f9fb7 100644
--- a/.gitignore
+++ b/.gitignore
@@ -7,4 +7,5 @@ target
.idea
*.iml
**/.DS_Store
-
+db2jcc4.jar
+ojdbc7.jar
diff --git a/.travis.yml b/.travis.yml
index 3f40195c1..d5ca1079b 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -11,6 +11,7 @@ env:
- DB=postgresql
- DB=mysql
- DB=sqlserver
+ - DB=db2
addons:
postgresql: "9.6"
services:
@@ -18,6 +19,8 @@ services:
before_script:
- export SPRING_PROFILES_ACTIVE=nflow.db.$DB
- ./travis/setup-db-$DB.sh
+script:
+ - mvn test -B -P $DB
after_script:
- for i in nflow-*/target/surefire-reports/*.txt; do echo ">>>>>>>>>>>>>>>>>>>"; echo $i; echo "<<<<<<<<<<<<<<<<<<<<<"; cat $i; done
notifications:
diff --git a/CHANGELOG.md b/CHANGELOG.md
index b8668ed14..9f1e6be02 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,6 +1,11 @@
-## 5.2.1 (yyyy-MM-dd)
+## 5.3.0 (yyyy-MM-dd)
**Highlights**
+- Add experimental DB2 support
+
+**Breaking changes**
+- nFlow `Datasource` uses now underlying database specific `Driver`s instead of `DataSource`s.
+ Make a corresponding change, if you have customized `nflow.db.*.driver` parameters.
**Details**
- Upgraded Spring to version 5.1.3.RELEASE
diff --git a/README.md b/README.md
index 3bc37247d..f9cd0fcbd 100644
--- a/README.md
+++ b/README.md
@@ -15,7 +15,7 @@ nFlow is a battle-proven solution for orchestrating business processes. Dependin
* High availability — the same workflows can be processed by multiple deployments
* Fault tolerant — automatic recovery if runtime environment crashes
* Atomic state updates — uses and requires a relational database for atomic state updates and locking
-* Multiple databases supported — PostgreSQL, MySQL, Oracle, Microsoft SQL Server, H2
+* Multiple databases supported — PostgreSQL, MySQL, Oracle, Microsoft SQL Server, DB2, H2
* Open Source under EUPL
# Getting Started
diff --git a/nflow-engine/src/main/java/io/nflow/engine/config/Profiles.java b/nflow-engine/src/main/java/io/nflow/engine/config/Profiles.java
index 160977926..898363b53 100644
--- a/nflow-engine/src/main/java/io/nflow/engine/config/Profiles.java
+++ b/nflow-engine/src/main/java/io/nflow/engine/config/Profiles.java
@@ -5,6 +5,11 @@
*/
public abstract class Profiles {
+ /**
+ * Profile to enable DB2 database.
+ */
+ public static final String DB2 = "nflow.db.db2";
+
/**
* Profile to enable H2 database.
*/
diff --git a/nflow-engine/src/main/java/io/nflow/engine/config/db/DatabaseConfiguration.java b/nflow-engine/src/main/java/io/nflow/engine/config/db/DatabaseConfiguration.java
index d546b5744..eeaddd58b 100644
--- a/nflow-engine/src/main/java/io/nflow/engine/config/db/DatabaseConfiguration.java
+++ b/nflow-engine/src/main/java/io/nflow/engine/config/db/DatabaseConfiguration.java
@@ -58,24 +58,24 @@ public DataSource nflowDatasource(Environment env, BeanFactory appCtx) {
logger.info("Database connection to {} using {}", dbType, url);
HikariConfig config = new HikariConfig();
config.setPoolName("nflow");
- config.setDataSourceClassName(property(env, "driver"));
- config.addDataSourceProperty("url", url);
+ config.setDriverClassName(property(env, "driver"));
+ config.setJdbcUrl(url);
config.setUsername(property(env, "user"));
config.setPassword(property(env, "password"));
config.setMaximumPoolSize(property(env, "max_pool_size", Integer.class));
config.setIdleTimeout(property(env, "idle_timeout_seconds", Long.class) * 1000);
config.setAutoCommit(true);
setMetricRegistryIfBeanFoundOnClassPath(config, appCtx);
- return new HikariDataSource(config);
+ DataSource nflowDataSource = new HikariDataSource(config);
+ checkDatabaseConfiguration(env, nflowDataSource);
+ return nflowDataSource;
}
private void setMetricRegistryIfBeanFoundOnClassPath(HikariConfig config, BeanFactory appCtx) {
try {
Class> metricClass = Class.forName("com.codahale.metrics.MetricRegistry");
Object metricRegistry = appCtx.getBean(metricClass);
- if (metricRegistry != null) {
- config.setMetricRegistry(metricRegistry);
- }
+ config.setMetricRegistry(metricRegistry);
} catch (@SuppressWarnings("unused") ClassNotFoundException | NoSuchBeanDefinitionException e) {
// ignored - metrics is an optional dependency
}
@@ -161,4 +161,12 @@ public DatabaseInitializer nflowDatabaseInitializer(@NFlow DataSource dataSource
return new DatabaseInitializer(dbType, dataSource, env);
}
+ /**
+ * Checks that the database is configured as nFlow expects.
+ * @param env The Spring environment.
+ * @param dataSource The nFlow datasource.
+ */
+ protected void checkDatabaseConfiguration(Environment env, DataSource dataSource) {
+ // no common checks for all databases
+ }
}
diff --git a/nflow-engine/src/main/java/io/nflow/engine/config/db/Db2DatabaseConfiguration.java b/nflow-engine/src/main/java/io/nflow/engine/config/db/Db2DatabaseConfiguration.java
new file mode 100644
index 000000000..730c599af
--- /dev/null
+++ b/nflow-engine/src/main/java/io/nflow/engine/config/db/Db2DatabaseConfiguration.java
@@ -0,0 +1,201 @@
+package io.nflow.engine.config.db;
+
+import static io.nflow.engine.config.Profiles.DB2;
+import static io.nflow.engine.internal.dao.DaoUtil.toTimestamp;
+import static java.lang.System.currentTimeMillis;
+import static java.util.concurrent.TimeUnit.HOURS;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import java.sql.*;
+import java.time.ZoneId;
+import java.util.Calendar;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.TimeZone;
+
+import javax.sql.DataSource;
+
+import org.joda.time.DateTime;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Profile;
+import org.springframework.core.env.Environment;
+import org.springframework.jdbc.core.JdbcTemplate;
+
+import io.nflow.engine.internal.storage.db.SQLVariants;
+import io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus;
+
+/**
+ * Configuration for DB2 database. Note: tested only using DB2 Express-C (Docker: ibmcom/db2express-c).
+ */
+@Profile(DB2)
+@Configuration
+public class Db2DatabaseConfiguration extends DatabaseConfiguration {
+
+ /**
+ * Create a new instance.
+ */
+ public Db2DatabaseConfiguration() {
+ super("db2");
+ }
+
+ /**
+ * Creates the SQL variants for DB2.
+ * @param env The Spring environment for getting the configuration property values.
+ * @return SQL variants optimized for DB2.
+ */
+ @Bean
+ public SQLVariants sqlVariants(Environment env) {
+ return new Db2SQLVariants(property(env, "timezone"));
+ }
+
+ @Override
+ protected void checkDatabaseConfiguration(Environment env, DataSource dataSource) {
+ JdbcTemplate jdbc = new JdbcTemplate(dataSource);
+ Long dbTimeZoneOffsetHours = jdbc.queryForObject("select current timezone from sysibm.sysdummy1", Long.class);
+ Long propsTimeZoneOffsetHours = HOURS.convert(
+ TimeZone.getTimeZone(property(env, "timezone")).getOffset(currentTimeMillis()), MILLISECONDS);
+ if (!Objects.equals(dbTimeZoneOffsetHours, propsTimeZoneOffsetHours)) {
+ throw new RuntimeException("Database has unexpected time zone - hour offset in DB2 is " + dbTimeZoneOffsetHours +
+ " but the expected hour offset based on timezone-property is " + propsTimeZoneOffsetHours +
+ ". Change the timezone-property to match with your DB2 time zone.");
+ }
+ }
+
+ /**
+ * SQL variants optimized for DB2.
+ */
+ public static class Db2SQLVariants implements SQLVariants {
+
+ private final ZoneId dbTimeZoneId;
+
+ public Db2SQLVariants(String dbTimeZoneIdStr) {
+ dbTimeZoneId = ZoneId.of(dbTimeZoneIdStr);
+ }
+
+ /**
+ * Returns SQL representing the current database time plus given amount of seconds.
+ */
+ @Override
+ public String currentTimePlusSeconds(int seconds) {
+ return "current_timestamp + " + seconds + " SECONDS";
+ }
+
+ /**
+ * Returns false as DB2 does not support update returning clause.
+ */
+ @Override
+ public boolean hasUpdateReturning() {
+ return false;
+ }
+
+ /**
+ * Returns false as DB2 does not support updateable CTEs.
+ */
+ @Override
+ public boolean hasUpdateableCTE() {
+ return false;
+ }
+
+ @Override
+ public String nextActivationUpdate() {
+ return "(case " //
+ + "when ? is null then null " //
+ + "when external_next_activation is null then ? " //
+ + "else least(?, external_next_activation) end)";
+ }
+
+ /**
+ * Returns the SQL representation for given workflow instance status.
+ */
+ @Override
+ public String workflowStatus(WorkflowInstanceStatus status) {
+ return "'" + status.name() + "'";
+ }
+
+ /**
+ * Returns SQL representing the workflow instance status parameter.
+ */
+ @Override
+ public String workflowStatus() {
+ return "?";
+ }
+
+ /**
+ * Returns SQL representing the action type parameter.
+ */
+ @Override
+ public String actionType() {
+ return "?";
+ }
+
+ /**
+ * Returns string for casting value to text.
+ */
+ @Override
+ public String castToText() {
+ return "";
+ }
+
+ /**
+ * Returns SQL for a query with a limit of results.
+ */
+ @Override
+ public String limit(String query, long limit) {
+ // note: limit must be a number, because NamedJdbcTemplate does not set variables (e.g. :limit) here
+ return query + " fetch first " + limit + " rows only";
+ }
+
+ /**
+ * Returns the SQL type for long text.
+ */
+ @Override
+ public int longTextType() {
+ return Types.VARCHAR;
+ }
+
+ /**
+ * Returns true as DB2 Express-C supports batch updates.
+ */
+ @Override
+ public boolean useBatchUpdate() {
+ return true;
+ }
+
+ @Override
+ public Object getTimestamp(ResultSet rs, String columnName) throws SQLException {
+ return Optional.ofNullable(rs.getTimestamp(columnName))
+ .map(ts -> new Timestamp(ts.getTime() + timeZoneMismatchInMillis()))
+ .orElse(null);
+ }
+
+ @Override
+ public DateTime getDateTime(ResultSet rs, String columnName) throws SQLException {
+ return Optional.ofNullable(rs.getTimestamp(columnName))
+ .map(ts -> new DateTime(ts.getTime() + timeZoneMismatchInMillis()))
+ .orElse(null);
+ }
+
+ @Override
+ public void setDateTime(PreparedStatement ps, int columnNumber, DateTime timestamp) throws SQLException {
+ ps.setTimestamp(columnNumber, toTimestamp(timestamp), Calendar.getInstance(TimeZone.getTimeZone("UTC")));
+ }
+
+ @Override
+ public Object toTimestampObject(DateTime timestamp) {
+ return Optional.ofNullable(timestamp)
+ .map(ts -> new Timestamp(timestamp.getMillis() - timeZoneMismatchInMillis()))
+ .orElse(null);
+ }
+
+ @Override
+ public Object tuneTimestampForDb(Object timestamp) {
+ return new Timestamp(((Timestamp)timestamp).getTime() - timeZoneMismatchInMillis());
+ }
+
+ private long timeZoneMismatchInMillis() {
+ long now = currentTimeMillis();
+ return TimeZone.getDefault().getOffset(now) - TimeZone.getTimeZone(dbTimeZoneId).getOffset(now);
+ }
+ }
+}
diff --git a/nflow-engine/src/main/java/io/nflow/engine/config/db/H2DatabaseConfiguration.java b/nflow-engine/src/main/java/io/nflow/engine/config/db/H2DatabaseConfiguration.java
index 1b4eb6fc3..102439ef6 100644
--- a/nflow-engine/src/main/java/io/nflow/engine/config/db/H2DatabaseConfiguration.java
+++ b/nflow-engine/src/main/java/io/nflow/engine/config/db/H2DatabaseConfiguration.java
@@ -144,7 +144,7 @@ public String castToText() {
* Returns SQL for a query with a limit of results.
*/
@Override
- public String limit(String query, String limit) {
+ public String limit(String query, long limit) {
return query + " limit " + limit;
}
diff --git a/nflow-engine/src/main/java/io/nflow/engine/config/db/MysqlDatabaseConfiguration.java b/nflow-engine/src/main/java/io/nflow/engine/config/db/MysqlDatabaseConfiguration.java
index 6a7bd113a..7e849f576 100644
--- a/nflow-engine/src/main/java/io/nflow/engine/config/db/MysqlDatabaseConfiguration.java
+++ b/nflow-engine/src/main/java/io/nflow/engine/config/db/MysqlDatabaseConfiguration.java
@@ -160,7 +160,7 @@ public String castToText() {
* Returns SQL for a query with a limit of results.
*/
@Override
- public String limit(String query, String limit) {
+ public String limit(String query, long limit) {
return query + " limit " + limit;
}
diff --git a/nflow-engine/src/main/java/io/nflow/engine/config/db/OracleDatabaseConfiguration.java b/nflow-engine/src/main/java/io/nflow/engine/config/db/OracleDatabaseConfiguration.java
index cca452a9d..a52ca252e 100644
--- a/nflow-engine/src/main/java/io/nflow/engine/config/db/OracleDatabaseConfiguration.java
+++ b/nflow-engine/src/main/java/io/nflow/engine/config/db/OracleDatabaseConfiguration.java
@@ -160,7 +160,7 @@ public String castToText() {
* Returns SQL for a query with a limit of results.
*/
@Override
- public String limit(String query, String limit) {
+ public String limit(String query, long limit) {
return "select * from (" + query + ") where rownum <= " + limit;
}
diff --git a/nflow-engine/src/main/java/io/nflow/engine/config/db/PgDatabaseConfiguration.java b/nflow-engine/src/main/java/io/nflow/engine/config/db/PgDatabaseConfiguration.java
index 10b289bdd..619c8e7a3 100644
--- a/nflow-engine/src/main/java/io/nflow/engine/config/db/PgDatabaseConfiguration.java
+++ b/nflow-engine/src/main/java/io/nflow/engine/config/db/PgDatabaseConfiguration.java
@@ -110,7 +110,7 @@ public String castToText() {
* Returns SQL for a query with a limit of results.
*/
@Override
- public String limit(String query, String limit) {
+ public String limit(String query, long limit) {
return query + " limit " + limit;
}
diff --git a/nflow-engine/src/main/java/io/nflow/engine/config/db/SqlServerDatabaseConfiguration.java b/nflow-engine/src/main/java/io/nflow/engine/config/db/SqlServerDatabaseConfiguration.java
index 3a8735513..73aca0d99 100644
--- a/nflow-engine/src/main/java/io/nflow/engine/config/db/SqlServerDatabaseConfiguration.java
+++ b/nflow-engine/src/main/java/io/nflow/engine/config/db/SqlServerDatabaseConfiguration.java
@@ -190,7 +190,7 @@ public String castToText() {
* Returns SQL for a query with a limit of results.
*/
@Override
- public String limit(String query, String limit) {
+ public String limit(String query, long limit) {
int idx = query.indexOf("select ");
return query.substring(0, idx + 7) + "top(" + limit + ") " + query.substring(idx + 7);
}
diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/ArchiveDao.java b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/ArchiveDao.java
index 92fe65c80..e18b80845 100644
--- a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/ArchiveDao.java
+++ b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/ArchiveDao.java
@@ -58,7 +58,7 @@ public List listArchivableWorkflows(DateTime before, int maxRows) {
" select 1 from nflow_workflow child where child.root_workflow_id = parent.id " +
" and (" + sqlVariants.dateLtEqDiff("?", "child.modified") + " or child.next_activation is not null)" +
" )" +
- " order by modified asc ", String.valueOf(maxRows)) +
+ " order by modified asc ", maxRows) +
") as archivable_parent " +
"where archivable_parent.id = w.id or archivable_parent.id = w.root_workflow_id",
new ArchivableWorkflowsRowMapper(), sqlVariants.toTimestampObject(before), sqlVariants.toTimestampObject(before));
diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java
index efe896c19..8e01409a4 100644
--- a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java
+++ b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java
@@ -548,7 +548,7 @@ String whereConditionForInstanceUpdate() {
private List pollNextWorkflowInstanceIdsWithUpdateReturning(int batchSize) {
String sql = updateInstanceForExecutionQuery() + " where id in ("
- + sqlVariants.limit("select id from nflow_workflow " + whereConditionForInstanceUpdate(), Integer.toString(batchSize))
+ + sqlVariants.limit("select id from nflow_workflow " + whereConditionForInstanceUpdate(), batchSize)
+ ") and executor_id is null returning id";
return jdbc.queryForList(sql, Integer.class);
}
@@ -557,14 +557,9 @@ private List pollNextWorkflowInstanceIdsWithTransaction(final int batch
return transaction.execute(new TransactionCallback>() {
@Override
public List doInTransaction(TransactionStatus transactionStatus) {
- String sql = sqlVariants.limit("select id, modified from nflow_workflow " + whereConditionForInstanceUpdate(),
- Integer.toString(batchSize));
- List instances = jdbc.query(sql, new RowMapper() {
- @Override
- public OptimisticLockKey mapRow(ResultSet rs, int rowNum) throws SQLException {
- return new OptimisticLockKey(rs.getInt("id"), sqlVariants.getTimestamp(rs, "modified"));
- }
- });
+ String sql = sqlVariants.limit("select id, modified from nflow_workflow " + whereConditionForInstanceUpdate(), batchSize);
+ List instances = jdbc.query(sql, (rs, rowNum) ->
+ new OptimisticLockKey(rs.getInt("id"), sqlVariants.getTimestamp(rs, "modified")));
if (instances.isEmpty()) {
return emptyList();
}
@@ -582,7 +577,7 @@ private void updateNextWorkflowInstancesWithMultipleUpdates(List instances, List ids) {
List