Skip to content

Commit

Permalink
3156 multiple tag race condition (#3297)
Browse files Browse the repository at this point in the history
* 3156 fix race condition that results in multiple tags being created

* 3156 added changelog

* cleanup

* added smile jira ticket

* 3156 review fixes

* review fixes

* 3156 added an additional integratoin test

* cleanup

* review fix and master merge

* threadsafe stuff

* small change

* upping min threads for tests to pass

* changing test

* test check

* fix

* test testing

* one more update

* using threadpoolutil

* temporary measure

* git push

* all builds

Co-authored-by: leif stawnyczy <leifstawnyczy@leifs-MacBook-Pro.local>
  • Loading branch information
2 people authored and michaelabuckley committed Jan 31, 2022
1 parent d7daf77 commit 2bb272b
Show file tree
Hide file tree
Showing 9 changed files with 578 additions and 33 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
issue: 3156
type: fix
jira: smile-3382
title: "Fixed race condition in BaseHapiFhirDao that resulted in exceptions
being thrown when multiple resources in a single bundle transaction
had the same tags."
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.api.server.storage.TransactionDetails;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException;
Expand Down Expand Up @@ -118,8 +119,12 @@
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.transaction.support.TransactionTemplate;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -148,6 +153,7 @@
import java.util.Set;
import java.util.StringTokenizer;
import java.util.UUID;
import java.util.stream.Collectors;

import static org.apache.commons.lang3.StringUtils.defaultIfBlank;
import static org.apache.commons.lang3.StringUtils.defaultString;
Expand Down Expand Up @@ -180,6 +186,9 @@
@Repository
public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStorageDao implements IDao, IJpaDao<T>, ApplicationContextAware {

// total attempts to do a tag transaction
private static final int TOTAL_TAG_READ_ATTEMPTS = 10;

public static final long INDEX_STATUS_INDEXED = 1L;
public static final long INDEX_STATUS_INDEXING_FAILED = 2L;
public static final String NS_JPA_PROFILE = "https://github.com/hapifhir/hapi-fhir/ns/jpa/profile";
Expand Down Expand Up @@ -247,6 +256,9 @@ public abstract class BaseHapiFhirDao<T extends IBaseResource> extends BaseStora
@Autowired(required = false)
private IFulltextSearchSvc myFulltextSearchSvc;

@Autowired
private PlatformTransactionManager myTransactionManager;

@VisibleForTesting
public void setSearchParamPresenceSvc(ISearchParamPresenceSvc theSearchParamPresenceSvc) {
mySearchParamPresenceSvc = theSearchParamPresenceSvc;
Expand Down Expand Up @@ -398,40 +410,15 @@ protected TagDefinition getTagOrNull(TransactionDetails theTransactionDetails, T

MemoryCacheService.TagDefinitionCacheKey key = toTagDefinitionMemoryCacheKey(theTagType, theScheme, theTerm);


TagDefinition retVal = myMemoryCacheService.getIfPresent(MemoryCacheService.CacheEnum.TAG_DEFINITION, key);


if (retVal == null) {
HashMap<MemoryCacheService.TagDefinitionCacheKey, TagDefinition> resolvedTagDefinitions = theTransactionDetails.getOrCreateUserData(HapiTransactionService.XACT_USERDATA_KEY_RESOLVED_TAG_DEFINITIONS, () -> new HashMap<>());
retVal = resolvedTagDefinitions.get(key);

if (retVal == null) {
CriteriaBuilder builder = myEntityManager.getCriteriaBuilder();
CriteriaQuery<TagDefinition> cq = builder.createQuery(TagDefinition.class);
Root<TagDefinition> from = cq.from(TagDefinition.class);

if (isNotBlank(theScheme)) {
cq.where(
builder.and(
builder.equal(from.get("myTagType"), theTagType),
builder.equal(from.get("mySystem"), theScheme),
builder.equal(from.get("myCode"), theTerm)));
} else {
cq.where(
builder.and(
builder.equal(from.get("myTagType"), theTagType),
builder.isNull(from.get("mySystem")),
builder.equal(from.get("myCode"), theTerm)));
}

TypedQuery<TagDefinition> q = myEntityManager.createQuery(cq);
try {
retVal = q.getSingleResult();
} catch (NoResultException e) {
retVal = new TagDefinition(theTagType, theScheme, theTerm, theLabel);
myEntityManager.persist(retVal);
}
// actual DB hit(s) happen here
retVal = getOrCreateTag(theTagType, theScheme, theTerm, theLabel);

TransactionSynchronization sync = new AddTagDefinitionToCacheAfterCommitSynchronization(key, retVal);
TransactionSynchronizationManager.registerSynchronization(sync);
Expand All @@ -443,6 +430,109 @@ protected TagDefinition getTagOrNull(TransactionDetails theTransactionDetails, T
return retVal;
}

/**
* Gets the tag defined by the fed in values, or saves it if it does not
* exist.
*
* Can also throw an InternalErrorException if something bad happens.
*/
private TagDefinition getOrCreateTag(TagTypeEnum theTagType, String theScheme, String theTerm, String theLabel) {
CriteriaBuilder builder = myEntityManager.getCriteriaBuilder();
CriteriaQuery<TagDefinition> cq = builder.createQuery(TagDefinition.class);
Root<TagDefinition> from = cq.from(TagDefinition.class);

if (isNotBlank(theScheme)) {
cq.where(
builder.and(
builder.equal(from.get("myTagType"), theTagType),
builder.equal(from.get("mySystem"), theScheme),
builder.equal(from.get("myCode"), theTerm)));
} else {
cq.where(
builder.and(
builder.equal(from.get("myTagType"), theTagType),
builder.isNull(from.get("mySystem")),
builder.equal(from.get("myCode"), theTerm)));
}

TypedQuery<TagDefinition> q = myEntityManager.createQuery(cq);

TransactionTemplate template = new TransactionTemplate(myTransactionManager);
template.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);

// this transaction will attempt to get or create the tag,
// repeating (on any failure) 10 times.
// if it fails more than this, we will throw exceptions
TagDefinition retVal;
int count = 0;
HashSet<Throwable> throwables = new HashSet<>();
do {
try {
retVal = template.execute(new TransactionCallback<TagDefinition>() {

// do the actual DB call(s) to read and/or write the values
private TagDefinition readOrCreate() {
TagDefinition val;
try {
val = q.getSingleResult();
} catch (NoResultException e) {
val = new TagDefinition(theTagType, theScheme, theTerm, theLabel);
myEntityManager.persist(val);
}
return val;
}

@Override
public TagDefinition doInTransaction(TransactionStatus status) {
TagDefinition tag = null;

try {
tag = readOrCreate();
} catch (Exception ex) {
// log any exceptions - just in case
// they may be signs of things to come...
ourLog.warn(
"Tag read/write failed: "
+ ex.getMessage() + ". "
+ "This is not a failure on its own, "
+ "but could be useful information in the result of an actual failure."
);
throwables.add(ex);
}

return tag;
}
});
} catch (Exception ex) {
// transaction template can fail if connections to db are exhausted
// and/or timeout
ourLog.warn("Transaction failed with: "
+ ex.getMessage() + ". "
+ "Transaction will rollback and be reattempted."
);
retVal = null;
}
count++;
} while (retVal == null && count < TOTAL_TAG_READ_ATTEMPTS);

if (retVal == null) {
// if tag is still null,
// something bad must be happening
// - throw
String msg = throwables.stream()
.map(Throwable::getMessage)
.collect(Collectors.joining(", "));
throw new InternalErrorException(
"Tag get/create failed after "
+ TOTAL_TAG_READ_ATTEMPTS
+ " attempts with error(s): "
+ msg
);
}

return retVal;
}

protected IBundleProvider history(RequestDetails theRequest, String theResourceType, Long theResourcePid, Date theRangeStartInclusive, Date theRangeEndInclusive, Integer theOffset) {

String resourceName = defaultIfBlank(theResourceType, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ private DaoMethodOutcome doCreateForPostOrPut(T theResource, String theIfNoneExi
}

// Perform actual DB update
// this call will also update the metadata
ResourceTable updatedEntity = updateEntity(theRequest, theResource, entity, null, thePerformIndexing, false, theTransactionDetails, false, thePerformIndexing);

// Store the resource forced ID if necessary
Expand Down Expand Up @@ -366,6 +367,8 @@ private DaoMethodOutcome doCreateForPostOrPut(T theResource, String theIfNoneExi

// Update the version/last updated in the resource so that interceptors get
// the correct version
// TODO - the above updateEntity calls updateResourceMetadata
// Maybe we don't need this call here?
updateResourceMetadata(entity, theResource);

// Populate the PID in the resource so it is available to hooks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ public class TestDstu2Config extends BaseJavaConfigDstu2 {
* We use a randomized number of maximum threads in order to try
* and catch any potential deadlocks caused by database connection
* starvation
*
* A minimum of 2 is required for most transactions.
*/
ourMaxThreads = (int) (Math.random() * 6.0) + 1;
ourMaxThreads = (int) (Math.random() * 6.0) + 2;

if ("true".equals(System.getProperty("single_db_connection"))) {
ourMaxThreads = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,11 @@ private void logGetConnectionStackTrace() {
/*
* We use a randomized number of maximum threads in order to try
* and catch any potential deadlocks caused by database connection
* starvation
* starvation.
*
* We need a minimum of 2 for most transactions, so 2 is added
*/
int maxThreads = (int) (Math.random() * 6.0) + 1;
int maxThreads = (int) (Math.random() * 6.0) + 2;

if ("true".equals(System.getProperty("single_db_connection"))) {
maxThreads = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,12 @@ public class TestR5Config extends BaseJavaConfigR5 {
* We use a randomized number of maximum threads in order to try
* and catch any potential deadlocks caused by database connection
* starvation
*
* A minimum of 2 is necessary for most transactions,
* so 2 will be our limit
*/
if (ourMaxThreads == null) {
ourMaxThreads = (int) (Math.random() * 6.0) + 1;
ourMaxThreads = (int) (Math.random() * 6.0) + 2;

if ("true".equals(System.getProperty("single_db_connection"))) {
ourMaxThreads = 1;
Expand Down

0 comments on commit 2bb272b

Please sign in to comment.