Skip to content

Commit

Permalink
Add support for bulk updates
Browse files Browse the repository at this point in the history
  • Loading branch information
filiphr committed May 15, 2024
1 parent f0111e0 commit c542b1f
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.flowable.common.engine.impl.db;

import org.apache.ibatis.session.SqlSession;

/**
* Use this to execute a dedicated update statement. It is important to note there won't be any optimistic locking checks done for these kind of update operations!
*
* @author Filip Hrisafov
*/
public class BulkUpdateOperation {

protected String statement;
protected Object parameter;

public BulkUpdateOperation(String statement, Object parameter) {
this.statement = statement;
this.parameter = parameter;
}

public void execute(SqlSession sqlSession) {
sqlSession.update(statement, parameter);
}

public String getStatement() {
return statement;
}

public void setStatement(String statement) {
this.statement = statement;
}

public Object getParameter() {
return parameter;
}

public void setParameter(Object parameter) {
this.parameter = parameter;
}

@Override
public String toString() {
return "bulk update: " + statement + "(" + parameter + ")";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class DbSqlSession implements Session {
protected Map<Class<? extends Entity>, Map<String, Entity>> deletedObjects = new HashMap<>();
protected Map<Class<? extends Entity>, List<BulkDeleteOperation>> bulkDeleteOperations = new HashMap<>();
protected List<Entity> updatedObjects = new ArrayList<>();
protected List<BulkUpdateOperation> bulkUpdateOperations = new ArrayList<>();

public DbSqlSession(DbSqlSessionFactory dbSqlSessionFactory, EntityCache entityCache) {
this.dbSqlSessionFactory = dbSqlSessionFactory;
Expand Down Expand Up @@ -107,6 +108,13 @@ public void update(Entity entity) {
entity.setUpdated(true);
}

/**
* Executes a {@link BulkUpdateOperation}, with the sql in the statement parameter.
*/
public void update(String statement, Object parameter) {
bulkUpdateOperations.add(new BulkUpdateOperation(statement, parameter));
}

public int directUpdate(String statement, Object parameters) {
String updateStatement = dbSqlSessionFactory.mapStatement(statement);
return getSqlSession().update(updateStatement, parameters);
Expand Down Expand Up @@ -567,28 +575,49 @@ protected void incrementRevision(Entity insertedObject) {
}

protected void flushUpdates() {
for (Entity updatedObject : updatedObjects) {
String updateStatement = dbSqlSessionFactory.getUpdateStatement(updatedObject);
updateStatement = dbSqlSessionFactory.mapStatement(updateStatement);
if (updatedObjects.isEmpty() && bulkUpdateOperations.isEmpty()) {
return;
}

if (updateStatement == null) {
throw new FlowableException("no update statement for " + updatedObject.getClass() + " in the ibatis mapping files");
}
// Unlike bulk deletes, bulk updates are executed before the regular updates.
// The reason for that, is due to the fact that regular updates might change something that would lead to an invalid bulk update.

if (!bulkUpdateOperations.isEmpty()) {
bulkUpdateOperations.forEach(this::flushBulkUpdate);
}

LOGGER.debug("updating: {}", updatedObject);
if (!updatedObjects.isEmpty()) {
updatedObjects.forEach(this::flushUpdateEntity);
}

int updatedRecords = sqlSession.update(updateStatement, updatedObject);
if (updatedRecords == 0) {
throw new FlowableOptimisticLockingException(updatedObject + " was updated by another transaction concurrently");
}
updatedObjects.clear();
bulkUpdateOperations.clear();
}

// See https://activiti.atlassian.net/browse/ACT-1290
if (updatedObject instanceof HasRevision) {
((HasRevision) updatedObject).setRevision(((HasRevision) updatedObject).getRevisionNext());
}
protected void flushUpdateEntity(Entity updatedObject) {
String updateStatement = dbSqlSessionFactory.getUpdateStatement(updatedObject);
updateStatement = dbSqlSessionFactory.mapStatement(updateStatement);

if (updateStatement == null) {
throw new FlowableException("no update statement for " + updatedObject.getClass() + " in the ibatis mapping files");
}
updatedObjects.clear();

LOGGER.debug("updating: {}", updatedObject);

int updatedRecords = sqlSession.update(updateStatement, updatedObject);
if (updatedRecords == 0) {
throw new FlowableOptimisticLockingException(updatedObject + " was updated by another transaction concurrently");
}

// See https://activiti.atlassian.net/browse/ACT-1290
if (updatedObject instanceof HasRevision) {
((HasRevision) updatedObject).setRevision(((HasRevision) updatedObject).getRevisionNext());
}
}

protected void flushBulkUpdate(BulkUpdateOperation bulkUpdateOperation) {
// Bulk update
bulkUpdateOperation.execute(sqlSession);
}

protected void flushDeletes() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.List;

import org.flowable.common.engine.impl.db.BulkDeleteOperation;
import org.flowable.common.engine.impl.db.BulkUpdateOperation;
import org.flowable.common.engine.impl.db.DbSqlSession;
import org.flowable.common.engine.impl.db.DbSqlSessionFactory;
import org.flowable.common.engine.impl.persistence.cache.EntityCache;
Expand Down Expand Up @@ -119,14 +120,20 @@ protected void flushBulkInsert(Collection<Entity> entities, Class<? extends Enti
// UPDATES

@Override
protected void flushUpdates() {
protected void flushUpdateEntity(Entity updatedObject) {
if (getCurrentCommandExecution() != null) {
for (Entity persistentObject : updatedObjects) {
getCurrentCommandExecution().addDbUpdate(persistentObject.getClass().getName());
}
getCurrentCommandExecution().addDbUpdate(updatedObject.getClass().getName());
}
super.flushUpdateEntity(updatedObject);
}

super.flushUpdates();
@Override
protected void flushBulkUpdate(BulkUpdateOperation bulkUpdateOperation) {
// Bulk update
if (getCurrentCommandExecution() != null) {
getCurrentCommandExecution().addDbUpdate("Bulk-update-" + bulkUpdateOperation.getStatement());
}
super.flushBulkUpdate(bulkUpdateOperation);
}

// DELETES
Expand Down

0 comments on commit c542b1f

Please sign in to comment.