Skip to content

Commit

Permalink
ARTEMIS-3591: stop paging checkMemory task executing twice
Browse files Browse the repository at this point in the history
  • Loading branch information
gemmellr authored and clebertsuconic committed Mar 14, 2022
1 parent 913a87c commit 36dcb30
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -717,15 +717,16 @@ public boolean checkMemory(boolean runOnFailure, final Runnable runWhenAvailable
runWhenBlocking.run();
}

onMemoryFreedRunnables.add(AtomicRunnable.checkAtomic(runWhenAvailable));
AtomicRunnable atomicRunWhenAvailable = AtomicRunnable.checkAtomic(runWhenAvailable);
onMemoryFreedRunnables.add(atomicRunWhenAvailable);

// We check again to avoid a race condition where the size can come down just after the element
// has been added, but the check to execute was done before the element was added
// NOTE! We do not fix this race by locking the whole thing, doing this check provides
// MUCH better performance in a highly concurrent environment
if (!pagingManager.isGlobalFull() && (sizeInBytes.get() < maxSize || maxSize < 0)) {
// run it now
runWhenAvailable.run();
atomicRunWhenAvailable.run();
} else {
if (usingGlobalMaxSize || pagingManager.isDiskFull()) {
pagingManager.addBlockedStore(this);
Expand All @@ -740,6 +741,7 @@ public boolean checkMemory(boolean runOnFailure, final Runnable runWhenAvailable
blocking = true;
}
}

return true;
}
}
Expand Down
5 changes: 5 additions & 0 deletions tests/unit-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@
<version>${netty-tcnative-version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

import static org.apache.activemq.artemis.logs.AssertionLoggerHandler.findText;

Expand Down Expand Up @@ -1194,4 +1195,61 @@ public ByteBuffer allocateDirectBuffer(final int size) {
public void freeDirectuffer(final ByteBuffer buffer) {
}
}

private static final class CountingRunnable implements Runnable {
final AtomicInteger calls = new AtomicInteger();

@Override
public void run() {
calls.incrementAndGet();
}

public int getCount() {
return calls.get();
}
}

@Test(timeout = 10000)
public void testCheckExecutionIsNotRepeated() throws Exception {
SequentialFileFactory factory = new FakeSequentialFileFactory();

PagingStoreFactory storeFactory = new FakeStoreFactory(factory);

PagingManager mockManager = Mockito.mock(PagingManager.class);

ArtemisExecutor sameThreadExecutor = Runnable::run;
PagingStoreImpl store = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100,
mockManager, createStorageManagerMock(), factory, storeFactory,
PagingStoreImplTest.destinationTestName,
new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK),
sameThreadExecutor, true);

store.start();
try {
store.applySetting(new AddressSettings().setMaxSizeBytes(1000).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK));

Mockito.when(mockManager.addSize(Mockito.anyInt())).thenReturn(mockManager);
store.addSize(100);

// Do an initial check
final CountingRunnable trackMemoryCheck1 = new CountingRunnable();
assertEquals(0, trackMemoryCheck1.getCount());
store.checkMemory(trackMemoryCheck1);
assertEquals(1, trackMemoryCheck1.getCount());

// Do another check, this time indicate the disk is full during the first couple
// requests, making the task initially be retained for later but then executed.
final CountingRunnable trackMemoryCheck2 = new CountingRunnable();
Mockito.when(mockManager.isDiskFull()).thenReturn(true, true, false);
assertEquals(0, trackMemoryCheck2.getCount());
store.checkMemory(trackMemoryCheck2);
assertEquals(1, trackMemoryCheck2.getCount());

// Now run the released memory checks. The task should NOT execute again, verify it doesnt.
store.checkReleasedMemory();
assertEquals(1, trackMemoryCheck2.getCount());
} finally {
store.stop();
}
}
}

0 comments on commit 36dcb30

Please sign in to comment.