Skip to content

Commit

Permalink
issue #22 - redesign of scheduler service
Browse files Browse the repository at this point in the history
  • Loading branch information
rsoika committed Feb 20, 2014
1 parent 404c9ba commit 93cd3e3
Showing 1 changed file with 111 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@

import org.imixs.workflow.ItemCollection;
import org.imixs.workflow.exceptions.AccessDeniedException;
import org.imixs.workflow.exceptions.PluginException;
import org.imixs.workflow.exceptions.ProcessingErrorException;
import org.imixs.workflow.jee.ejb.EntityService;
import org.imixs.workflow.jee.ejb.ModelService;
import org.imixs.workflow.jee.ejb.WorkflowService;
Expand Down Expand Up @@ -102,8 +104,7 @@ public class MartyWorkflowSchedulerService {
SessionContext ctx;

int iProcessWorkItems = 0;
int iScheduledWorkItems = 0;
List<String> processedIDs=null;
List<String> unprocessedIDs = null;

/**
* This method loads the current scheduler configuration. If no
Expand Down Expand Up @@ -379,7 +380,7 @@ public static boolean workItemInDue(ItemCollection doc,
if ("3".equals(sDelayUnit))
sDelayUnit = "days";

logger.fine("[WorkflowSchedulerService] " + suniqueid
logger.finest("[WorkflowSchedulerService] " + suniqueid
+ " delay =" + iActivityDelay + " " + sDelayUnit);

}
Expand All @@ -405,14 +406,14 @@ public static boolean workItemInDue(ItemCollection doc,
switch (iCompareType) {
// last process -
case 1: {
logger.fine("[WorkflowSchedulerService] " + suniqueid
logger.finest("[WorkflowSchedulerService] " + suniqueid
+ ": CompareType = last process");

if (!doc.hasItem("timWorkflowLastAccess"))
return false;

dateTimeCompare = doc.getItemValueDate("timWorkflowLastAccess");
System.out.println("[WorkflowSchedulerService] " + suniqueid
logger.finest("[WorkflowSchedulerService] " + suniqueid
+ ": timWorkflowLastAccess=" + dateTimeCompare);

// scheduled time
Expand All @@ -424,12 +425,12 @@ public static boolean workItemInDue(ItemCollection doc,
// last modification - es erfolgt kein Vergleich mit last
// Event, da dieses ja selbst der auslöser der Zeit ist
case 2: {
logger.fine("[WorkflowSchedulerService] " + suniqueid
logger.finest("[WorkflowSchedulerService] " + suniqueid
+ ": CompareType = last modify");

dateTimeCompare = doc.getItemValueDate("$modified");

logger.fine("[WorkflowSchedulerService] " + suniqueid
logger.finest("[WorkflowSchedulerService] " + suniqueid
+ ": modified=" + dateTimeCompare);

dateTimeCompare = adjustSecond(dateTimeCompare, iActivityDelay);
Expand All @@ -439,7 +440,7 @@ public static boolean workItemInDue(ItemCollection doc,

// creation
case 3: {
logger.fine("[WorkflowSchedulerService] " + suniqueid
logger.finest("[WorkflowSchedulerService] " + suniqueid
+ ": CompareType = creation");

dateTimeCompare = doc.getItemValueDate("$created");
Expand All @@ -456,29 +457,29 @@ public static boolean workItemInDue(ItemCollection doc,
case 4: {
String sNameOfField = docActivity
.getItemValueString("keyTimeCompareField");
logger.fine("[WorkflowSchedulerService] " + suniqueid
logger.finest("[WorkflowSchedulerService] " + suniqueid
+ ": CompareType = field: '" + sNameOfField + "'");

if (!doc.hasItem(sNameOfField)) {
logger.fine("[WorkflowSchedulerService] " + suniqueid
logger.finest("[WorkflowSchedulerService] " + suniqueid
+ ": CompareType =" + sNameOfField
+ " no value found!");
return false;
}

dateTimeCompare = doc.getItemValueDate(sNameOfField);

logger.fine("[WorkflowSchedulerService] " + suniqueid + ": "
logger.finest("[WorkflowSchedulerService] " + suniqueid + ": "
+ sNameOfField + "=" + dateTimeCompare);

dateTimeCompare = adjustSecond(dateTimeCompare, iActivityDelay);

logger.fine("[WorkflowSchedulerService] " + suniqueid
logger.finest("[WorkflowSchedulerService] " + suniqueid
+ ": Compare " + dateTimeCompare + " <-> "
+ dateTimeNow);

if (dateTimeCompare.before(dateTimeNow)) {
logger.fine("[WorkflowSchedulerService] " + suniqueid
logger.finest("[WorkflowSchedulerService] " + suniqueid
+ " isInDue!");
}
return dateTimeCompare.before(dateTimeNow);
Expand Down Expand Up @@ -508,21 +509,61 @@ public static boolean workItemInDue(ItemCollection doc,
@Timeout
void runTimer(javax.ejb.Timer timer) throws AccessDeniedException {


processedIDs=new ArrayList<String>();

ItemCollection configItemCollection = loadConfiguration();
logger.info("[MartyWorkflowSchedulerService] runTimer started....");
logger.info("[WorkflowSchedulerService] started....");

// test if imixsDayOfWeek is provided
// https://java.net/jira/browse/GLASSFISH-20673
if (!isImixsDayOfWeek(configItemCollection)) {
logger.info("[MartyWorkflowSchedulerService] runTimer skipped because today is no imixsDayOfWeek");
logger.info("[WorkflowSchedulerService] runTimer skipped because today is no imixsDayOfWeek");
return;
}

configItemCollection.replaceItemValue("datLastRun", new Date());
processWorkItems();

/*
* Now we process all scheduled worktitems for each model
*/
iProcessWorkItems = 0;
unprocessedIDs = new ArrayList<String>();
try {
// get all model versions...
List<String> modelVersions = modelService.getAllModelVersions();
for (String version : modelVersions) {
logger.info("[WorkflowSchedulerService] processing ModelVersion: "
+ version);
// find scheduled Activities
Collection<ItemCollection> colScheduledActivities = findScheduledActivities(version);
logger.info("[WorkflowSchedulerService] "
+ colScheduledActivities.size()
+ " scheduled activityEntities found in ModelVersion: "
+ version);
// process all workitems for coresponding activities
for (ItemCollection aactivityEntity : colScheduledActivities) {
processWorkListByActivityEntity(aactivityEntity);
}
}

} catch (Exception e) {
logger.severe("[WorkflowSchedulerService] error processing worklist: " + e.getMessage());
if (logger.isLoggable(Level.FINE)) {
e.printStackTrace();
}
}

logger.info("[WorkflowSchedulerService] finished successfull");

logger.info("[WorkflowSchedulerService] " + iProcessWorkItems
+ " workitems processed");

if (unprocessedIDs.size()>0) {
logger.warning("[WorkflowSchedulerService] " + unprocessedIDs.size()
+ " workitems could be processed!");
for (String aid: unprocessedIDs) {
logger.warning("[WorkflowSchedulerService] " + aid);
}

}

Date endDate = configItemCollection.getItemValueDate("datstop");
String sTimerID = configItemCollection.getItemValueString("$uniqueid");
Expand All @@ -531,8 +572,8 @@ void runTimer(javax.ejb.Timer timer) throws AccessDeniedException {

configItemCollection.replaceItemValue("numWorkItemsProcessed",
iProcessWorkItems);
configItemCollection.replaceItemValue("numWorkItemsScheduled",
iScheduledWorkItems);
configItemCollection.replaceItemValue("numWorkItemsUnprocessed",
unprocessedIDs.size());

/*
* Check if Timer should be canceld now? - only by interval
Expand Down Expand Up @@ -566,50 +607,6 @@ void runTimer(javax.ejb.Timer timer) throws AccessDeniedException {

}

/**
* This is the method which processed scheuduled workitems when the timer is
* called.
*
* @param timer
*/
void processWorkItems() {

iProcessWorkItems = 0;
iScheduledWorkItems = 0;

logger.info("[WorkflowSchedulerService] processing workitems...");

try {

List<String> modelVersions = modelService.getAllModelVersions();

for (String version : modelVersions) {
logger.info("[WorkflowSchedulerService] ModelVersion="
+ version);
// find scheduled Activities

Collection<ItemCollection> colScheduledActivities = findScheduledActivities(version);
logger.info("[WorkflowSchedulerService] "
+ colScheduledActivities.size()
+ " scheduled activityEntities found");
// process all workitems for coresponding activities
for (ItemCollection aactivityEntity : colScheduledActivities) {
processWorkList(aactivityEntity);
}
}

} catch (Exception e) {
e.printStackTrace();
}

logger.info("[MartyWorkflowSchedulerService] finished successfull");
logger.info("[MartyWorkflowSchedulerService] " + iScheduledWorkItems
+ " scheduled workitems ");
logger.info("[MartyWorkflowSchedulerService] " + iProcessWorkItems
+ " workitems processed");

}

/**
* Create an interval timer whose first expiration occurs at a given point
* in time and whose subsequent expirations occur after a specified
Expand Down Expand Up @@ -806,91 +803,77 @@ Timer findTimer(String id) {
* @param aProcessID
* @throws Exception
*/
void processWorkList(ItemCollection activityEntity) throws Exception {

logger.info("Marty WorkflowScheduler - select only type='workitem'");
void processWorkListByActivityEntity(ItemCollection activityEntity)
throws Exception {

// get processID
int iProcessID = activityEntity.getItemValueInteger("numprocessid");
int iActivityID = activityEntity.getItemValueInteger("numActivityID");
// get Modelversion
String sModelVersion = activityEntity
.getItemValueString("$modelversion");

// if a query is defined in the activityEntity then use the EQL
// statement
// to query the items. Otherwise use standard method
// getWorklistByProcessID()
String sQuery = activityEntity.getItemValueString("txtscheduledview");

// get all workitems...
Collection<ItemCollection> worklist = null;
if (sQuery != null && !"".equals(sQuery)) {
logger.fine("[WorkflowSchedulerService] Query=" + sQuery);
worklist = entityService.findAllEntities(sQuery, 0, -1);
} else {
logger.fine("[WorkflowSchedulerService] get WorkList for ProcessID:"
+ iProcessID);
logger.info("[WorkflowSchedulerService] processing " + iProcessID + "."
+ iActivityID + " (" + sModelVersion + ") ...");

// now we need to select by type, $ProcessID and by $modelVersion!
String sQuery = "SELECT wi FROM Entity as wi "
+ " JOIN wi.integerItems AS i " + " JOIN wi.textItems as t "
+ " WHERE wi.type='workitem' ";
sQuery += " AND i.itemName = '$processid' AND i.itemValue = '"
+ iProcessID + "'"
+ " AND t.itemName = '$modelversion' AND t.itemValue = '"
+ sModelVersion + "'";

logger.fine("[WorkflowSchedulerService] select: " + sQuery);

Collection<ItemCollection> worklist = entityService.findAllEntities(
sQuery, 0, -1);

// worklist = workflowService.getWorkListByProcessID(iProcessID, 0,
// -1, null, 0);
worklist = workflowService.getWorkListByProcessID(iProcessID, 0,
-1, "workitem", 0);
}
logger.fine("[WorkflowSchedulerService] " + worklist.size()
+ " workitems found");
iScheduledWorkItems += worklist.size();
for (ItemCollection workitem : worklist) {
// verify processID
if (iProcessID == workitem.getItemValueInteger("$processid")) {
// verify modelversion
if (sModelVersion.equals(workitem
.getItemValueString("$modelversion"))) {
// verify due date
if (workItemInDue(workitem, activityEntity)) {

int iActivityID = activityEntity
.getItemValueInteger("numActivityID");
workitem.replaceItemValue("$activityid", iActivityID);
processWorkitem(workitem);
iProcessWorkItems++;

// verify due date
if (workItemInDue(workitem, activityEntity)) {
String sID = workitem
.getItemValueString(EntityService.UNIQUEID);
logger.fine("[WorkflowSchedulerService] workitem " + sID
+ "is in due");
workitem.replaceItemValue("$activityid", iActivityID);
try {
logger.finest("[WorkflowSchedulerService] getBusinessObject.....");
// call from new instance because of transaction new...
// see: https://www.java.net/node/705304
ctx.getBusinessObject(MartyWorkflowSchedulerService.class)
.processSingelWorkitem(workitem);
iProcessWorkItems++;
} catch (Exception e) {
logger.warning("[WorkflowSchedulerService] error processing workitem: "
+ sID);
if (logger.isLoggable(Level.FINEST)) {
e.printStackTrace();
}
unprocessedIDs.add(sID);
}
}

}
}

/**
* start new Transaction for each process step
* This method process a single workIten in a new transaction. The method is
* called by processWorklist()
*
* @param aWorkitem
* @param aID
* @throws PluginException
* @throws ProcessingErrorException
* @throws AccessDeniedException
*/
@TransactionAttribute(value = TransactionAttributeType.REQUIRES_NEW)
void processWorkitem(ItemCollection aWorkitem) {

String sID = aWorkitem.getItemValueString(EntityService.UNIQUEID);


if (processedIDs.indexOf(sID)>-1) {

logger.warning("[MartyWorkflowScheduler] ERROR - processing twice : " +sID);
}

processedIDs.add(sID);

logger.fine("[MartyWorkflowScheduler] processing: " + sID);
try {

workflowService.processWorkItem(aWorkitem);
} catch (Exception e) {
logger.warning("[MartyWorkflowScheduler] error processing workitem: "
+ sID);

if (logger.isLoggable(Level.FINEST)) {
e.printStackTrace();
}
}
public void processSingelWorkitem(ItemCollection aWorkitem)
throws AccessDeniedException, ProcessingErrorException,
PluginException {
workflowService.processWorkItem(aWorkitem);
}

/**
Expand Down

0 comments on commit 93cd3e3

Please sign in to comment.