Permalink
Browse files

Port of 3aede8a from v6 to v5:

Multitenant process engine configuration with isolated schema for each tenant (with dynamic tenant registration).
+ Two implementations of multi tenant (isolated schema) async executors: one with an executor/tenant, the other one with a shared executor service but separate acquisition threads
  • Loading branch information...
jbarrez committed Sep 20, 2015
1 parent 87450bc commit 93790fd372bc2aa91e0cb2bc1bc156e953118264
Showing with 1,386 additions and 1 deletion.
  1. +17 −1 ...activiti-engine/src/main/java/org/activiti/engine/impl/asyncexecutor/DefaultAsyncJobExecutor.java
  2. +26 −0 ...viti-engine/src/main/java/org/activiti/engine/impl/asyncexecutor/ExecuteAsyncRunnableFactory.java
  3. +201 −0 .../main/java/org/activiti/engine/impl/asyncexecutor/multitenant/ExecutorPerTenantAsyncExecutor.java
  4. +134 −0 ...n/java/org/activiti/engine/impl/asyncexecutor/multitenant/SharedExecutorServiceAsyncExecutor.java
  5. +46 −0 ...va/org/activiti/engine/impl/asyncexecutor/multitenant/TenantAwareAcquireAsyncJobsDueRunnable.java
  6. +46 −0 .../java/org/activiti/engine/impl/asyncexecutor/multitenant/TenantAwareAcquireTimerJobsRunnable.java
  7. +30 −0 ...ne/src/main/java/org/activiti/engine/impl/asyncexecutor/multitenant/TenantAwareAsyncExecutor.java
  8. +31 −0 ...main/java/org/activiti/engine/impl/asyncexecutor/multitenant/TenantAwareAsyncExecutorFactory.java
  9. +44 −0 ...main/java/org/activiti/engine/impl/asyncexecutor/multitenant/TenantAwareExecuteAsyncRunnable.java
  10. +41 −0 ...va/org/activiti/engine/impl/asyncexecutor/multitenant/TenantAwareExecuteAsyncRunnableFactory.java
  11. +61 −0 ...-engine/src/main/java/org/activiti/engine/impl/cfg/multitenant/ExecuteSchemaOperationCommand.java
  12. +165 −0 ...va/org/activiti/engine/impl/cfg/multitenant/MultiSchemaMultiTenantProcessEngineConfiguration.java
  13. +115 −0 ...activiti-engine/src/main/java/org/activiti/engine/impl/cfg/multitenant/TenantAwareDataSource.java
  14. +50 −0 modules/activiti-engine/src/main/java/org/activiti/engine/impl/cfg/multitenant/TenantInfoHolder.java
  15. +85 −0 ...activiti-engine/src/test/java/org/activiti/engine/test/cfg/multitenant/DummyTenantInfoHolder.java
  16. +208 −0 ...i-engine/src/test/java/org/activiti/engine/test/cfg/multitenant/MultiTenantProcessEngineTest.java
  17. +68 −0 ...es/activiti-engine/src/test/resources/org/activiti/engine/test/cfg/multitenant/jobTest.bpmn20.xml
  18. +18 −0 ...viti-engine/src/test/resources/org/activiti/engine/test/cfg/multitenant/oneTaskProcess.bpmn20.xml
@@ -51,6 +51,8 @@
protected AcquireTimerJobsRunnable timerJobRunnable;
protected AcquireAsyncJobsDueRunnable asyncJobsDueRunnable;

protected ExecuteAsyncRunnableFactory executeAsyncRunnableFactory;

protected boolean isAutoActivate = false;
protected boolean isActive = false;

@@ -71,8 +73,14 @@
protected CommandExecutor commandExecutor;

public void executeAsyncJob(JobEntity job) {
Runnable runnable = null;
if (isActive) {
executorService.execute(new ExecuteAsyncRunnable(job, commandExecutor));
if (executeAsyncRunnableFactory == null) {
runnable = new ExecuteAsyncRunnable(job, commandExecutor);
} else {
runnable = executeAsyncRunnableFactory.createExecuteAsyncRunnable(job, commandExecutor);
}
executorService.execute(runnable);
} else {
temporaryJobQueue.add(job);
}
@@ -337,4 +345,12 @@ public void setRetryWaitTimeInMillis(int retryWaitTimeInMillis) {
this.retryWaitTimeInMillis = retryWaitTimeInMillis;
}

public ExecuteAsyncRunnableFactory getExecuteAsyncRunnableFactory() {
return executeAsyncRunnableFactory;
}

public void setExecuteAsyncRunnableFactory(ExecuteAsyncRunnableFactory executeAsyncRunnableFactory) {
this.executeAsyncRunnableFactory = executeAsyncRunnableFactory;
}

}
@@ -0,0 +1,26 @@
/* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.activiti.engine.impl.asyncexecutor;

import org.activiti.engine.impl.interceptor.CommandExecutor;
import org.activiti.engine.impl.persistence.entity.JobEntity;

/**
* @author Joram Barrez
*/
public interface ExecuteAsyncRunnableFactory {

Runnable createExecuteAsyncRunnable(JobEntity jobEntity, CommandExecutor commandExecutor);

}
@@ -0,0 +1,201 @@
/* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.activiti.engine.impl.asyncexecutor.multitenant;

import java.util.HashMap;
import java.util.Map;

import org.activiti.engine.impl.asyncexecutor.AsyncExecutor;
import org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor;
import org.activiti.engine.impl.cfg.multitenant.TenantInfoHolder;
import org.activiti.engine.impl.interceptor.CommandExecutor;
import org.activiti.engine.impl.persistence.entity.JobEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* An {@link AsyncExecutor} that has one {@link AsyncExecutor} per tenant.
* So each tenant has its own acquiring threads and it's own threadpool for executing jobs.
*
* @author Joram Barrez
*/
public class ExecutorPerTenantAsyncExecutor implements TenantAwareAsyncExecutor {

private static final Logger logger = LoggerFactory.getLogger(ExecutorPerTenantAsyncExecutor.class);

protected TenantInfoHolder tenantInfoHolder;
protected TenantAwareAsyncExecutorFactory tenantAwareAyncExecutorFactory;

protected Map<String, AsyncExecutor> tenantExecutors = new HashMap<String, AsyncExecutor>();

protected CommandExecutor commandExecutor;
protected boolean active;
protected boolean autoActivate;

public ExecutorPerTenantAsyncExecutor(TenantInfoHolder tenantInfoHolder) {
this(tenantInfoHolder, null);
}

public ExecutorPerTenantAsyncExecutor(TenantInfoHolder tenantInfoHolder, TenantAwareAsyncExecutorFactory tenantAwareAyncExecutorFactory) {
this.tenantInfoHolder = tenantInfoHolder;
this.tenantAwareAyncExecutorFactory = tenantAwareAyncExecutorFactory;
}

public void addTenantAsyncExecutor(String tenantId, boolean startExecutor) {
AsyncExecutor tenantExecutor = null;

if (tenantAwareAyncExecutorFactory == null) {
tenantExecutor = new DefaultAsyncJobExecutor();
} else {
tenantExecutor = tenantAwareAyncExecutorFactory.createAsyncExecutor(tenantId);
}

if (tenantExecutor instanceof DefaultAsyncJobExecutor) {
DefaultAsyncJobExecutor defaultAsyncJobExecutor = (DefaultAsyncJobExecutor) tenantExecutor;
defaultAsyncJobExecutor.setAsyncJobsDueRunnable(new TenantAwareAcquireAsyncJobsDueRunnable(defaultAsyncJobExecutor, tenantInfoHolder, tenantId));
defaultAsyncJobExecutor.setTimerJobRunnable(new TenantAwareAcquireTimerJobsRunnable(defaultAsyncJobExecutor, tenantInfoHolder, tenantId));
defaultAsyncJobExecutor.setExecuteAsyncRunnableFactory(new TenantAwareExecuteAsyncRunnableFactory(tenantInfoHolder, tenantId));
}

tenantExecutor.setCommandExecutor(commandExecutor); // Needs to be done for job executors created after boot. Doesn't hurt on boot.

tenantExecutors.put(tenantId, tenantExecutor);

if (startExecutor) {
tenantExecutor.start();
}
}

protected AsyncExecutor determineAsyncExecutor() {
return tenantExecutors.get(tenantInfoHolder.getCurrentTenantId());
}

public void executeAsyncJob(JobEntity job) {
determineAsyncExecutor().executeAsyncJob(job);
}

public void setCommandExecutor(CommandExecutor commandExecutor) {
this.commandExecutor = commandExecutor;
for (AsyncExecutor asyncExecutor : tenantExecutors.values()) {
asyncExecutor.setCommandExecutor(commandExecutor);
}
}

public CommandExecutor getCommandExecutor() {
// Should never be accessed on this class, should be accessed on the actual AsyncExecutor
throw new UnsupportedOperationException();
}

public boolean isAutoActivate() {
return autoActivate;
}

public void setAutoActivate(boolean isAutoActivate) {
autoActivate = isAutoActivate;
}

public boolean isActive() {
return active;
}

public void start() {
for (AsyncExecutor asyncExecutor : tenantExecutors.values()) {
asyncExecutor.start();
}
active = true;
}

public synchronized void shutdown() {
for (String tenantId : tenantExecutors.keySet()) {
logger.info("Shutting down async executor for tenant " + tenantId);
tenantExecutors.get(tenantId).shutdown();
}
active = false;
}

public String getLockOwner() {
return determineAsyncExecutor().getLockOwner();
}

public int getTimerLockTimeInMillis() {
return determineAsyncExecutor().getTimerLockTimeInMillis();
}

public void setTimerLockTimeInMillis(int lockTimeInMillis) {
for (AsyncExecutor asyncExecutor : tenantExecutors.values()) {
asyncExecutor.setTimerLockTimeInMillis(lockTimeInMillis);
}
}

public int getAsyncJobLockTimeInMillis() {
return determineAsyncExecutor().getAsyncJobLockTimeInMillis();
}

public void setAsyncJobLockTimeInMillis(int lockTimeInMillis) {
for (AsyncExecutor asyncExecutor : tenantExecutors.values()) {
asyncExecutor.setAsyncJobLockTimeInMillis(lockTimeInMillis);
}
}

public int getDefaultTimerJobAcquireWaitTimeInMillis() {
return determineAsyncExecutor().getDefaultTimerJobAcquireWaitTimeInMillis();
}

public void setDefaultTimerJobAcquireWaitTimeInMillis(int waitTimeInMillis) {
for (AsyncExecutor asyncExecutor : tenantExecutors.values()) {
asyncExecutor.setDefaultTimerJobAcquireWaitTimeInMillis(waitTimeInMillis);
}
}

public int getDefaultAsyncJobAcquireWaitTimeInMillis() {
return determineAsyncExecutor().getDefaultAsyncJobAcquireWaitTimeInMillis();
}

public void setDefaultAsyncJobAcquireWaitTimeInMillis(int waitTimeInMillis) {
for (AsyncExecutor asyncExecutor : tenantExecutors.values()) {
asyncExecutor.setDefaultAsyncJobAcquireWaitTimeInMillis(waitTimeInMillis);
}
}

public int getMaxAsyncJobsDuePerAcquisition() {
return determineAsyncExecutor().getMaxAsyncJobsDuePerAcquisition();
}

public void setMaxAsyncJobsDuePerAcquisition(int maxJobs) {
for (AsyncExecutor asyncExecutor : tenantExecutors.values()) {
asyncExecutor.setMaxAsyncJobsDuePerAcquisition(maxJobs);
}
}

public int getMaxTimerJobsPerAcquisition() {
return determineAsyncExecutor().getMaxTimerJobsPerAcquisition();
}

public void setMaxTimerJobsPerAcquisition(int maxJobs) {
for (AsyncExecutor asyncExecutor : tenantExecutors.values()) {
asyncExecutor.setMaxTimerJobsPerAcquisition(maxJobs);
}
}

public int getRetryWaitTimeInMillis() {
return determineAsyncExecutor().getRetryWaitTimeInMillis();
}

public void setRetryWaitTimeInMillis(int retryWaitTimeInMillis) {
for (AsyncExecutor asyncExecutor : tenantExecutors.values()) {
asyncExecutor.setRetryWaitTimeInMillis(retryWaitTimeInMillis);
}
}

}
Oops, something went wrong.

0 comments on commit 93790fd

Please sign in to comment.