Skip to content

Commit

Permalink
ARTEMIS-2207 Page Showing Log.warns for regular acked messages
Browse files Browse the repository at this point in the history
(cherry picked from commit 40966c7)
  • Loading branch information
clebertsuconic authored and jbertram committed Dec 19, 2018
1 parent 5bfc00c commit ed6eaf1
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 32 deletions.
Expand Up @@ -1284,6 +1284,10 @@ private PagedReference moveNext() {
continue;
}

if (info != null && info.isAck(message.getPosition())) {
continue;
}

// 2nd ... if TX, is it committed?
if (valid && message.getPagedMessage().getTransactionID() >= 0) {
PageTransactionInfo tx = pageStore.getPagingManager().getTransaction(message.getPagedMessage().getTransactionID());
Expand Down
Expand Up @@ -71,6 +71,8 @@
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
import org.apache.activemq.artemis.core.paging.cursor.PageIterator;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl;
import org.apache.activemq.artemis.core.paging.cursor.impl.PagePositionImpl;
import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl;
Expand Down Expand Up @@ -277,6 +279,96 @@ public void testPageOnLargeMessageMultipleQueues() throws Exception {
}
}

@Test
public void testPageTX() throws Exception {
AssertionLoggerHandler.startCapture();

try {
Configuration config = createDefaultInVMConfig();

final int PAGE_MAX = 20 * 1024;

final int PAGE_SIZE = 10 * 1024;

ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX);
server.start();

final int numberOfBytes = 1024;

locator.setBlockOnNonDurableSend(false).setBlockOnDurableSend(false).setBlockOnAcknowledge(false);

ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));

ClientSession session = sf.createSession(null, null, false, false, false, false, 0);

session.createQueue(ADDRESS, ADDRESS.concat("-0"), null, true);

server.getPagingManager().getPageStore(ADDRESS).forceAnotherPage();
server.getPagingManager().getPageStore(ADDRESS).disableCleanup();
session.start();

ClientProducer producer = session.createProducer(ADDRESS);

ClientConsumer browserConsumer = session.createConsumer(ADDRESS.concat("-0"), true);

ClientMessage message = null;

for (int i = 0; i < 201; i++) {
message = session.createMessage(true);

message.getBodyBuffer().writerIndex(0);

message.getBodyBuffer().writeBytes(new byte[numberOfBytes]);

for (int j = 1; j <= numberOfBytes; j++) {
message.getBodyBuffer().writeInt(j);
}

producer.send(message);
session.commit();
}

ClientConsumer consumer = session.createConsumer(ADDRESS.concat("-0"));

session.start();

for (int i = 0; i < 201; i++) {
ClientMessage message2 = consumer.receive(10000);

Assert.assertNotNull(message2);

message2.acknowledge();

Assert.assertNotNull(message2);

session.commit();
}

consumer.close();

Queue queue = server.locateQueue(ADDRESS.concat("-0"));

PagingStore store = server.getPagingManager().getPageStore(ADDRESS);
PageCursorProvider provider = store.getCursorProvider();

PageSubscription cursorSubscription = provider.getSubscription(queue.getID());
PageIterator iterator = (PageIterator) cursorSubscription.iterator();

for (int i = 0; i < 5; i++) {
Assert.assertFalse(iterator.hasNext());
Assert.assertNull(browserConsumer.receiveImmediate());
}

session.close();
Assert.assertFalse(AssertionLoggerHandler.findText("Could not locate page"));
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222029"));
server.getPagingManager().getPageStore(ADDRESS).enableCleanup();
Wait.assertFalse(server.getPagingManager().getPageStore(ADDRESS)::isPaging);
} finally {
AssertionLoggerHandler.stopCapture();
}
}

@Test
public void testPageCleanup() throws Exception {
clearDataRecreateServerDirs();
Expand Down Expand Up @@ -396,7 +488,6 @@ public void testPageCleanup() throws Exception {
System.out.println("pgComplete = " + pgComplete);
}


@Test
public void testPurge() throws Exception {
clearDataRecreateServerDirs();
Expand All @@ -410,7 +501,7 @@ public void testPurge() throws Exception {
String queue = "purgeQueue";
SimpleString ssQueue = new SimpleString(queue);
server.addAddressInfo(new AddressInfo(ssQueue, RoutingType.ANYCAST));
QueueImpl purgeQueue = (QueueImpl)server.createQueue(ssQueue, RoutingType.ANYCAST, ssQueue, null, true, false, 1, true, false);
QueueImpl purgeQueue = (QueueImpl) server.createQueue(ssQueue, RoutingType.ANYCAST, ssQueue, null, true, false, 1, true, false);

ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
Connection connection = cf.createConnection();
Expand Down Expand Up @@ -465,7 +556,6 @@ public void testPurge() throws Exception {

connection.start();


server.getStorageManager().getMessageJournal().scheduleCompactAndBlock(50000);
Assert.assertNotNull(consumer.receive(5000));
session.commit();
Expand All @@ -478,7 +568,6 @@ public void testPurge() throws Exception {

StorageManager sm = server.getStorageManager();


for (int i = 0; i < 1000; i++) {
long tx = sm.generateID();
PageTransactionInfoImpl txinfo = new PageTransactionInfoImpl(tx);
Expand All @@ -494,7 +583,6 @@ public void testPurge() throws Exception {
Assert.assertEquals(0, server.getPagingManager().getTransactions().size());
}


// First page is complete but it wasn't deleted
@Test
public void testFirstPageCompleteNotDeleted() throws Exception {
Expand Down Expand Up @@ -1741,17 +1829,13 @@ public PageCursorProvider newCursorProvider(PagingStore store,

for (int i = 0; i < 4; i++) {
message = consumer.receive(5000);
Assert.assertNotNull("Before restart - message " + i + " is empty.",message);
Assert.assertNotNull("Before restart - message " + i + " is empty.", message);
message.acknowledge();
}



server.stop();
mainCleanup.set(false);



// Deleting the paging data. Simulating a failure
// a dumb user, or anything that will remove the data
deleteDirectory(new File(getPageDir()));
Expand All @@ -1772,7 +1856,6 @@ public PageCursorProvider newCursorProvider(PagingStore store,

bodyLocal.writeBytes(new byte[MESSAGE_SIZE]);


producer.send(message);
}
session.commit();
Expand All @@ -1788,7 +1871,7 @@ public PageCursorProvider newCursorProvider(PagingStore store,

for (int i = 0; i < 4; i++) {
message = consumer.receive(5000);
Assert.assertNotNull("After restart - message " + i + " is empty.",message);
Assert.assertNotNull("After restart - message " + i + " is empty.", message);
message.acknowledge();
}

Expand Down Expand Up @@ -1852,7 +1935,6 @@ public void testMissingTXEverythingAcked() throws Exception {
session.commit();
session.close();


ArrayList<RecordInfo> records = new ArrayList<>();

List<PreparedTransactionInfo> list = new ArrayList<>();
Expand All @@ -1865,9 +1947,7 @@ public void testMissingTXEverythingAcked() throws Exception {

// Delete everything from the journal
for (RecordInfo info : records) {
if (!info.isUpdate && info.getUserRecordType() != JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE &&
info.getUserRecordType() != JournalRecordIds.PAGE_CURSOR_COUNTER_INC &&
info.getUserRecordType() != JournalRecordIds.PAGE_CURSOR_COMPLETE) {
if (!info.isUpdate && info.getUserRecordType() != JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE && info.getUserRecordType() != JournalRecordIds.PAGE_CURSOR_COUNTER_INC && info.getUserRecordType() != JournalRecordIds.PAGE_CURSOR_COMPLETE) {
jrn.appendDeleteRecord(info.id, false);
}
}
Expand Down Expand Up @@ -3502,8 +3582,6 @@ public PageCursorProvider newCursorProvider(PagingStore store,

server.stop();

// Thread.sleep(5000);

}

@Test
Expand Down Expand Up @@ -4775,9 +4853,7 @@ public void write(int b) throws IOException {
} catch (Throwable e) {
log.info("output bytes = " + bytesOutput);
log.info(threadDump("dump"));
fail("Couldn't finish large message receiving for id=" + message.getStringProperty("id") +
" with messageID=" +
message.getMessageID());
fail("Couldn't finish large message receiving for id=" + message.getStringProperty("id") + " with messageID=" + message.getMessageID());
}

}
Expand Down Expand Up @@ -6090,16 +6166,16 @@ public void testOnlyOnePageOnServerCrash() throws Throwable {
class NonStoppablePagingStoreImpl extends PagingStoreImpl {

NonStoppablePagingStoreImpl(SimpleString address,
ScheduledExecutorService scheduledExecutor,
long syncTimeout,
PagingManager pagingManager,
StorageManager storageManager,
SequentialFileFactory fileFactory,
PagingStoreFactory storeFactory,
SimpleString storeName,
AddressSettings addressSettings,
ArtemisExecutor executor,
boolean syncNonTransactional) {
ScheduledExecutorService scheduledExecutor,
long syncTimeout,
PagingManager pagingManager,
StorageManager storageManager,
SequentialFileFactory fileFactory,
PagingStoreFactory storeFactory,
SimpleString storeName,
AddressSettings addressSettings,
ArtemisExecutor executor,
boolean syncNonTransactional) {
super(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, fileFactory, storeFactory, storeName, addressSettings, executor, syncNonTransactional);
}

Expand Down Expand Up @@ -6452,7 +6528,6 @@ public void testStopPagingWithoutMsgsOnOneQueue() throws Exception {
server.stop();
}


@Override
protected Configuration createDefaultConfig(final int serverID, final boolean netty) throws Exception {
Configuration configuration = super.createDefaultConfig(serverID, netty);
Expand Down

0 comments on commit ed6eaf1

Please sign in to comment.