Skip to content
Permalink
Browse files

Improved: Remove unwanted error log by ‘ServiceSemaphore’ waiting pro…

…cess

(OFBIZ-11204)

This commit rewrites the error handling of the semaphore release/acquire methods
to return a boolean value indicating the result of the action instead of
throwing when waiting timeout is exceeded or if other issue is met. This with
the removal of confusing error log, that are normal and intended ones.

Javadoc has been added with some refactoring in regards to checkstyle rules.

git-svn-id: https://svn.apache.org/repos/asf/ofbiz/ofbiz-framework/trunk@1868942 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
gilPts committed Oct 25, 2019
1 parent 1965e3a commit 41618b1208f94e8cefef23dabf449f20763d9220
@@ -592,11 +592,7 @@ public void runSyncIgnore(String localName, ModelService service, Map<String, ?
} finally {
if (lock != null) {
// release the semaphore lock
try {
lock.release();
} catch (GenericServiceException e) {
Debug.logWarning(e, "Exception thrown while unlocking semaphore: ", module);
}
lock.release();
}

// resume the parent transaction
@@ -36,30 +36,38 @@
/**
* ServiceSemaphore
*/
public class ServiceSemaphore {
// TODO: add something to make sure semaphores are cleaned up on failures and when the thread somehow goes away without cleaning it up
// TODO: write service engine test cases to make sure semaphore both blocking and timing out (use config to set sleep and wait to low values so it times out quickly)
public final class ServiceSemaphore {
// TODO: add something to make sure semaphores are cleaned up on failures
// and when the thread somehow goes away without cleaning it up
// TODO: write service engine test cases to make sure semaphore both blocking
// and timing out (use config to set sleep and wait to low values so it times out quickly)

public static final String module = ServiceSemaphore.class.getName();
public static final int SEMAPHORE_MODE_FAIL = 0;
public static final int SEMAPHORE_MODE_WAIT = 1;
public static final int SEMAPHORE_MODE_NONE = 2;
private static final String MODULE = ServiceSemaphore.class.getName();
private static final int SEMAPHORE_MODE_FAIL = 0;
private static final int SEMAPHORE_MODE_WAIT = 1;
private static final int SEMAPHORE_MODE_NONE = 2;

protected Delegator delegator;
protected GenericValue lock;
protected ModelService model;
private Delegator delegator;
private GenericValue lock;
private ModelService model;

protected int wait = 0;
protected int mode = SEMAPHORE_MODE_NONE;
protected Timestamp lockTime = null;
private int wait = 0;
private int mode;
private Timestamp lockTime = null;

public ServiceSemaphore(Delegator delegator, ModelService model) {
this.delegator = delegator;
this.mode = "wait".equals(model.semaphore) ? SEMAPHORE_MODE_WAIT : ("fail".equals(model.semaphore) ? SEMAPHORE_MODE_FAIL : SEMAPHORE_MODE_NONE);
this.mode = "wait".equals(model.semaphore) ? SEMAPHORE_MODE_WAIT
: ("fail".equals(model.semaphore) ? SEMAPHORE_MODE_FAIL : SEMAPHORE_MODE_NONE);
this.model = model;
this.lock = null;
}

/**
* Try to acquire semaphore lock
* @throws SemaphoreWaitException @link SemaphoreWaitException
* @throws SemaphoreFailException @link SemaphoreFailException
*/
public void acquire() throws SemaphoreWaitException, SemaphoreFailException {
if (mode == SEMAPHORE_MODE_NONE) {
return;
@@ -72,17 +80,25 @@ public void acquire() throws SemaphoreWaitException, SemaphoreFailException {
}
}

public synchronized void release() throws SemaphoreFailException {
if (mode == SEMAPHORE_MODE_NONE) {
return;
}

/**
* Release semaphore locks
* @return {@code true} if release is success
*/
public synchronized boolean release() {
// remove the lock file
if (lock != null) {
dbWrite(lock, true);
if (mode != SEMAPHORE_MODE_NONE && lock != null) {
return dbWrite(lock, true);
}
return true;
}

/**
* Try to get lock ownership corresponding to semaphore type
* Throw exception when failure.
*
* @throws SemaphoreWaitException @link SemaphoreWaitException
* @throws SemaphoreFailException @link SemaphoreFailException
*/
private void waitOrFail() throws SemaphoreWaitException, SemaphoreFailException {
if (SEMAPHORE_MODE_FAIL == mode) {
// fail
@@ -98,7 +114,7 @@ private void waitOrFail() throws SemaphoreWaitException, SemaphoreFailException
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
Debug.logInfo(e, "Sleep interrupted: ServiceSemaphone.waitOrFail()", module);
Debug.logInfo(e, "Sleep interrupted: ServiceSemaphore.waitOrFail()", MODULE);
}

// try again
@@ -109,47 +125,57 @@ private void waitOrFail() throws SemaphoreWaitException, SemaphoreFailException
}
if (timedOut) {
double waitTimeSec = ((System.currentTimeMillis() - lockTime.getTime()) / 1000.0);
String errMsg = "Service [" + model.name + "] with wait semaphore exceeded wait timeout, waited [" + waitTimeSec + "], wait started at " + lockTime;
String errMsg = "Service [" + model.name + "] with wait semaphore exceeded wait timeout, waited ["
+ waitTimeSec + "], wait started at " + lockTime;
throw new SemaphoreWaitException(errMsg);
}
} else if (SEMAPHORE_MODE_NONE == mode) {
Debug.logWarning("Semaphore mode [none] attempted to aquire a lock; but should not have!", module);
Debug.logWarning("Semaphore mode [none] attempted to aquire a lock; but should not have!", MODULE);
} else {
throw new SemaphoreFailException("Found invalid Semaphore mode [" + mode + "]");
}
}

/**
* Check the absence of the lock, if true, try to insert the lock in the synchronized way.
* Return {@code true} if lock already in place or failed during insertion.
* @return boolean indicating if more wait is needed
* @throws SemaphoreFailException @link SemaphoreFailException
*/
private boolean checkLockNeedToWait() throws SemaphoreFailException {
String threadName = Thread.currentThread().getName();
GenericValue semaphore;

try {
semaphore = EntityQuery.use(delegator).from("ServiceSemaphore").where("serviceName", model.name).queryOne();
if (EntityQuery.use(delegator).from("ServiceSemaphore")
.where("serviceName", model.name).queryCount() == 0) {
semaphore = delegator.makeValue("ServiceSemaphore", "serviceName", model.name,
"lockedByInstanceId", JobManager.instanceId, "lockThread", threadName, "lockTime", lockTime);

// use the special method below so we can reuse the unique tx functions
// if semaphore successfully owned no need to wait anymore.
return !dbWrite(semaphore, false);
}
// found a semaphore, need to wait
return true;
} catch (GenericEntityException e) {
throw new SemaphoreFailException(e);
}

if (semaphore == null) {
semaphore = delegator.makeValue("ServiceSemaphore", "serviceName", model.name, "lockedByInstanceId", JobManager.instanceId, "lockThread", threadName, "lockTime", lockTime);

// use the special method below so we can reuse the unqiue tx functions
try {
dbWrite(semaphore, false);
} catch (SemaphoreFailException e) {
// can't write a new semaphore, need to wait
return true;
}

// we own the lock, no waiting
return false;
}
// found a semaphore, need to wait
return true;
}

private synchronized void dbWrite(GenericValue value, boolean delete) throws SemaphoreFailException {
/**
* Operates synchronized jdbc access (create/remove) method to ensure unique semaphore token management
* The same method is used for creating or removing the lock.
*
* @param value the value that will be operated
* @param delete specify the action
* {@code true} for removal
* {@code false} for insertion
* @return boolean if operation is success
*/
private synchronized boolean dbWrite(GenericValue value, boolean delete) {
Transaction parent = null;
boolean beganTx = false;
boolean beganTx;
boolean isError = false;

try {
@@ -159,7 +185,8 @@ private synchronized void dbWrite(GenericValue value, boolean delete) throws Sem
}
beganTx = TransactionUtil.begin();
if (!beganTx) {
throw new SemaphoreFailException("Cannot obtain unique transaction for semaphore logging");
Debug.logError("Cannot obtain unique transaction for semaphore logging", MODULE);
return false;
}

// store the value
@@ -169,38 +196,38 @@ private synchronized void dbWrite(GenericValue value, boolean delete) throws Sem
value.remove();
lock = null;
} else {
lock = value.create();
// Last check before inserting data in this transaction to avoid error log
isError = EntityQuery.use(delegator).from("ServiceSemaphore")
.where("serviceName", model.name).queryCount() != 0;
if (!isError) {
lock = value.create();
}
}
} catch (GenericEntityException e) {
Debug.logError(e, module);
Debug.logError("Cannot obtain unique transaction for semaphore logging", MODULE);
isError = true;
throw new SemaphoreFailException("Cannot obtain unique transaction for semaphore logging");
} finally {
if (isError) {
try {
TransactionUtil.rollback(beganTx, "ServiceSemaphore: dbWrite()", new Exception());
} catch (GenericTransactionException e) {
Debug.logError(e, module);
}
}
if (!isError) {
try {
TransactionUtil.commit(beganTx);
} catch (GenericTransactionException e) {
Debug.logError(e, module);
try {
if (isError) {
TransactionUtil.rollback();
} else {
TransactionUtil.commit();
}
} catch (GenericTransactionException e) {
Debug.logError(e, MODULE);
}
}
} catch (GenericTransactionException e) {
Debug.logError(e, module);
Debug.logError(e, MODULE);
} finally {
if (parent != null) {
try {
TransactionUtil.resume(parent);
} catch (GenericTransactionException e) {
Debug.logError(e, module);
Debug.logError(e, MODULE);
}
}
}
return !isError;
}
}

0 comments on commit 41618b1

Please sign in to comment.
You can’t perform that action at this time.