Skip to content

Commit

Permalink
Fixing an issue with pipeline state read/save order (#3497)
Browse files Browse the repository at this point in the history
* Fixing an issue with pipeline state read/save order

While trying to lock/unlock a pipeline, we used to clear the cache and update the db entry within a synchronized block.
When an unlock thread exits the synchronized block but hasn't executed a transaction.commit, immediately followed by a read thread ie. pipelineStateFor, the (eh)cache would have been updated with a stale entry from the query-cache (since cacheable was set to true). This cache entry is not cleared thereafter, ie. even after the transaction.commit of unlock happens. This used to lead to a bug wherein some of the pipelines would show up as locked on dashboard even when they were unlocked.
Now, we clear off the ehcache entry only after a transaction commit. This means some of the reads could potentially get seemingly stale data even after the unlock thread exits the synchronized block and before the transaction.commit happens, but that should be ok.
  • Loading branch information
jyotisingh authored and maheshp committed May 12, 2017
1 parent 354576d commit 2b06534
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 24 deletions.
59 changes: 40 additions & 19 deletions server/src/com/thoughtworks/go/server/dao/PipelineStateDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
import com.rits.cloning.Cloner;
import com.thoughtworks.go.config.GoConfigDao;
import com.thoughtworks.go.database.Database;
import com.thoughtworks.go.domain.*;
import com.thoughtworks.go.domain.Pipeline;
import com.thoughtworks.go.domain.PipelineState;
import com.thoughtworks.go.domain.Stage;
import com.thoughtworks.go.domain.StageIdentifier;
import com.thoughtworks.go.server.cache.GoCache;
import com.thoughtworks.go.server.domain.StageStatusListener;
import com.thoughtworks.go.server.persistence.MaterialRepository;
Expand All @@ -39,8 +42,9 @@
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;

import java.util.*;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -89,17 +93,25 @@ public void lockPipeline(final Pipeline pipeline) {
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
transactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
clearLockedPipelineStateCache(pipeline.getName());
}
});
final String pipelineName = pipeline.getName();
PipelineState state = pipelineStateFor(pipelineName);
if (state != null && state.isLocked() && !pipeline.getIdentifier().equals(state.getLockedBy().pipelineIdentifier())) {
throw new RuntimeException(String.format("Pipeline '%s' is already locked (counter = %s)", pipelineName, state.getLockedBy().getPipelineCounter()));
PipelineState fromCache = pipelineStateFor(pipelineName);
if (fromCache != null && fromCache.isLocked() && !pipeline.getIdentifier().equals(fromCache.getLockedBy().pipelineIdentifier())) {
throw new RuntimeException(String.format("Pipeline '%s' is already locked (counter = %s)", pipelineName, fromCache.getLockedBy().getPipelineCounter()));
}
if (state == null) {
state = new PipelineState(pipelineName);
PipelineState toBeSaved = null;
if (fromCache == null) {
toBeSaved = new PipelineState(pipelineName);
} else {
toBeSaved = (PipelineState) sessionFactory.getCurrentSession().load(PipelineState.class, fromCache.getId());
}
clearLockedPipelineStateCache(pipelineName);
state.lock(pipeline.getId());
sessionFactory.getCurrentSession().saveOrUpdate(state);
toBeSaved.lock(pipeline.getId());
sessionFactory.getCurrentSession().saveOrUpdate(toBeSaved);
}
});
}
Expand All @@ -114,14 +126,23 @@ public void unlockPipeline(final String pipelineName) {
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
transactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
clearLockedPipelineStateCache(pipelineName);
}
});

final String cacheKey = pipelineLockStateCacheKey(pipelineName);
PipelineState state = pipelineStateFor(pipelineName);
if (state == null) {
state = new PipelineState(pipelineName);
PipelineState fromCache = pipelineStateFor(pipelineName);
PipelineState toBeSaved = null;
if (fromCache == null) {
toBeSaved = new PipelineState(pipelineName);
} else {
toBeSaved = (PipelineState) sessionFactory.getCurrentSession().load(PipelineState.class, fromCache.getId());
}
clearLockedPipelineStateCache(pipelineName);
state.unlock();
sessionFactory.getCurrentSession().saveOrUpdate(state);
toBeSaved.unlock();
sessionFactory.getCurrentSession().saveOrUpdate(toBeSaved);
}
});
}
Expand All @@ -145,7 +166,7 @@ public Object doInTransaction(TransactionStatus transactionStatus) {
return sessionFactory.getCurrentSession()
.createCriteria(PipelineState.class)
.add(Restrictions.eq("pipelineName", pipelineName))
.setCacheable(true).uniqueResult();
.setCacheable(false).uniqueResult();
}
});

Expand All @@ -170,10 +191,10 @@ public Object doInTransaction(TransactionStatus status) {
PropertyProjection pipelineName = Projections.property("pipelineName");
Criteria criteria = sessionFactory.getCurrentSession().createCriteria(PipelineState.class).setProjection(pipelineName).add(
Restrictions.eq("locked", true));
criteria.setCacheable(true);
criteria.setCacheable(false);
List<String> list = criteria.list();
return list;
}
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.springframework.orm.ibatis.SqlMapClientTemplate;
import org.springframework.transaction.support.SimpleTransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;

import java.util.UUID;

Expand Down Expand Up @@ -116,16 +118,30 @@ public void lockedPipeline_shouldReturnNullIfPipelineIsNotLocked() throws Except

@Test
public void lockPipeline_ShouldSavePipelineStateAndInvalidateCache() throws Exception {
final TransactionSynchronizationAdapter[] transactionSynchronizationAdapter = {null};
when(transactionTemplate.execute(any(org.springframework.transaction.support.TransactionCallbackWithoutResult.class))).thenAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
org.springframework.transaction.support.TransactionCallbackWithoutResult callback = (org.springframework.transaction.support.TransactionCallbackWithoutResult) invocation.getArguments()[0];
callback.doInTransaction(new SimpleTransactionStatus());
transactionSynchronizationAdapter[0].afterCommit();
return null;
}
});

doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
TransactionSynchronizationAdapter adapter= (TransactionSynchronizationAdapter) invocation.getArguments()[0];
transactionSynchronizationAdapter[0] = adapter;
return null;
}
}).when(transactionSynchronizationManager).registerSynchronization(any(TransactionSynchronization.class));

final Pipeline pipeline = PipelineMother.pipeline("mingle");
goCache.put(pipelineStateDao.pipelineLockStateCacheKey(pipeline.getName()), new PipelineState(pipeline.getName(), pipeline.getFirstStage().getIdentifier()));
PipelineState pipelineState = new PipelineState(pipeline.getName(), pipeline.getFirstStage().getIdentifier());
when(session.load(PipelineState.class, pipeline.getId())).thenReturn(pipelineState);
goCache.put(pipelineStateDao.pipelineLockStateCacheKey(pipeline.getName()), pipelineState);
pipelineStateDao.lockPipeline(pipeline);

assertThat(goCache.get(pipelineStateDao.pipelineLockStateCacheKey(pipeline.getName())), is(nullValue()));
Expand All @@ -147,16 +163,29 @@ public void lockPipeline_shouldHandleSerializationProperly() throws Exception {

@Test
public void unlockPipeline_shouldSavePipelineStateAndInvalidateCache() throws Exception {
final TransactionSynchronizationAdapter[] transactionSynchronizationAdapter = {null};
when(transactionTemplate.execute(any(org.springframework.transaction.support.TransactionCallbackWithoutResult.class))).thenAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
org.springframework.transaction.support.TransactionCallbackWithoutResult callback = (org.springframework.transaction.support.TransactionCallbackWithoutResult) invocation.getArguments()[0];
callback.doInTransaction(new SimpleTransactionStatus());
transactionSynchronizationAdapter[0].afterCommit();
return null;
}
});
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
TransactionSynchronizationAdapter adapter= (TransactionSynchronizationAdapter) invocation.getArguments()[0];
transactionSynchronizationAdapter[0] = adapter;
return null;
}
}).when(transactionSynchronizationManager).registerSynchronization(any(TransactionSynchronization.class));

final Pipeline pipeline = PipelineMother.pipeline("mingle");
goCache.put(pipelineStateDao.pipelineLockStateCacheKey(pipeline.getName()), new PipelineState(pipeline.getName(), pipeline.getFirstStage().getIdentifier()));
PipelineState pipelineState = new PipelineState(pipeline.getName(), pipeline.getFirstStage().getIdentifier());
goCache.put(pipelineStateDao.pipelineLockStateCacheKey(pipeline.getName()), pipelineState);
when(session.load(PipelineState.class, pipeline.getId())).thenReturn(pipelineState);
pipelineStateDao.unlockPipeline(pipeline.getName());

assertThat(goCache.get(pipelineStateDao.pipelineLockStateCacheKey(pipeline.getName())), is(nullValue()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,18 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
fail("save should have thrown an exception!");
} catch (Exception e) {
PipelineState stateFromCache = (PipelineState) goCache.get(pipelineStateDao.pipelineLockStateCacheKey(pipelineName));
assertThat(stateFromCache, is(nullValue()));
assertThat(stateFromCache.isLocked(), is(false));
assertThat(stateFromCache.getLockedByPipelineId(), is(0L));
assertThat(stateFromCache.getLockedBy(), is(nullValue()));
}
}

@Test
public void shouldNotCorruptCacheIfSaveFailsWhileUnLocking() {
String pipelineName = UUID.randomUUID().toString();
PipelineState pipelineState = new PipelineState(pipelineName);
pipelineState.lock(1);
long lockedByPipelineId = 1;
pipelineState.lock(lockedByPipelineId);
goCache.put(pipelineStateDao.pipelineLockStateCacheKey(pipelineName), pipelineState);

when(transactionTemplate.execute(any(org.springframework.transaction.support.TransactionCallbackWithoutResult.class))).thenAnswer(new Answer<Object>() {
Expand All @@ -110,7 +113,8 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
fail("save should have thrown an exception!");
} catch (Exception e) {
PipelineState stateFromCache = (PipelineState) goCache.get(pipelineStateDao.pipelineLockStateCacheKey(pipelineName));
assertThat(stateFromCache, is(nullValue()));
assertThat(stateFromCache.isLocked(), is(true));
assertThat(stateFromCache.getLockedByPipelineId(), is(lockedByPipelineId));
}
}
}

0 comments on commit 2b06534

Please sign in to comment.