Skip to content
Permalink
Browse files
ARTEMIS-3761 Improve page cleanup to remove messages in the middle of…
… the stream as well

Paging only removes files at the beginning of the stream...
Say you have paged files 1 through 1000...
if all the messages are ack, but one message on file 1 is missing an ack, all the 999 subsequent files would not be removed until all the messages on file 1 is ack.
This was working as engineered, but sometimes devs don't have complete control on their app.
With this improvement we will now remove messages in the middle of the stream as well.

There is also some improvement to how browsing and page work with this
  • Loading branch information
clebertsuconic committed Apr 7, 2022
1 parent 1da68b3 commit cfdb710a085b7d5994bfeb1d65357388b0d643b5
Show file tree
Hide file tree
Showing 21 changed files with 779 additions and 130 deletions.
@@ -280,7 +280,7 @@ private static void printPages(DescribeJournal describeJournal,
}
out.println("******* Page " + pgid);
Page page = pgStore.createPage(pgid);
page.open();
page.open(false);
List<PagedMessage> msgs = page.read(sm);
page.close(false, false);

@@ -401,7 +401,7 @@ private void printPagedMessagesAsXML() {
for (int i = 0; i < pageStore.getNumberOfPages(); i++) {
ActiveMQServerLogger.LOGGER.debug("Reading page " + pageId);
Page page = pageStore.createPage(pageId);
page.open();
page.open(false);
List<PagedMessage> messages = page.read(storageManager);
page.close(false, false);

@@ -117,6 +117,8 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener
*/
Page depage() throws Exception;

Page removePage(int pageId);

void forceAnotherPage() throws Exception;

Page getCurrentPage();
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.activemq.artemis.core.paging.cursor;

// this is to expose PageSubscriptionImpl::PageCursorInfo
public interface ConsumedPage {

long getPageId();

boolean isDone();

}
@@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.paging.cursor;

import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.ToIntFunction;

import org.apache.activemq.artemis.core.paging.PagedMessage;
@@ -149,6 +150,8 @@ public interface PageSubscription {
*/
boolean isComplete(long page);

void forEachConsumedPage(Consumer<ConsumedPage> pageCleaner);

/**
* wait all the scheduled runnables to finish their current execution
*/
@@ -47,14 +47,9 @@
import org.apache.activemq.artemis.utils.ThreadDumpUtil;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
import org.apache.activemq.artemis.utils.collections.LongHashSet;
import org.jboss.logging.Logger;

/**
* A PageProviderIMpl
*
* TODO: this may be moved entirely into PagingStore as there's an one-to-one relationship here
* However I want to keep this isolated as much as possible during development
*/
public class PageCursorProviderImpl implements PageCursorProvider {


@@ -119,7 +114,7 @@ public PageCursorProviderImpl(final PagingStore pagingStore,
@Override
public synchronized PageSubscription createSubscription(long cursorID, Filter filter, boolean persistent) {
if (logger.isTraceEnabled()) {
logger.trace(this.pagingStore.getAddress() + " creating subscription " + cursorID + " with filter " + filter, new Exception("trace"));
logger.trace(this.pagingStore.getAddress() + " creating subscription " + cursorID + " with filter " + filter);
}

if (activeCursors.containsKey(cursorID)) {
@@ -228,7 +223,7 @@ private PageCache readPage(long pageId,
pageId, pagingStore.getAddress(), elapsedMillis);
}
}
page.open();
page.open(false);
final long startedReadPage = System.nanoTime();
List<PagedMessage> pgdMessages = page.read(storageManager);
final long elapsedReadPage = System.nanoTime() - startedReadPage;
@@ -356,9 +351,6 @@ public void close(PageSubscription cursor) {
@Override
public void scheduleCleanup() {

if (logger.isTraceEnabled()) {
logger.trace("scheduling cleanup", new Exception("trace"));
}
if (!cleanupEnabled || scheduledCleanup.intValue() > 2) {
// Scheduled cleanup was already scheduled before.. never mind!
// or we have cleanup disabled
@@ -423,9 +415,8 @@ public void resumeCleanup() {
@Override
public void cleanup() {

logger.tracef("performing page cleanup %s", this);

ArrayList<Page> depagedPages = new ArrayList<>();
LongHashSet depagedPagesSet = new LongHashSet();

// This read lock is required
// because in case of a replicated configuration
@@ -445,7 +436,7 @@ public void cleanup() {
return;
}

logger.tracef("%s locked", this);
logger.tracef(">>>> Cleanup %s", this.pagingStore.getAddress());

synchronized (this) {
try {
@@ -460,36 +451,16 @@ public void cleanup() {
ArrayList<PageSubscription> cursorList = cloneSubscriptions();

long minPage = checkMinPage(cursorList);
final long firstPage = pagingStore.getFirstPage();
deliverIfNecessary(cursorList, minPage);

logger.debugf("Asserting cleanup for address %s, firstPage=%d", pagingStore.getAddress(), minPage);

// if the current page is being written...
// on that case we need to move to verify it in a different way
if (minPage == pagingStore.getCurrentWritingPage() && pagingStore.getCurrentPage().getNumberOfMessages() > 0) {
boolean complete = checkPageCompletion(cursorList, minPage);
logger.tracef("firstPage=%s, minPage=%s, currentWritingPage=%s", firstPage, minPage, pagingStore.getCurrentWritingPage());

if (!pagingStore.isStarted()) {
return;
}
// First we cleanup regular streaming, at the beginning of set of files
cleanupRegularStream(depagedPages, depagedPagesSet, cursorList, minPage, firstPage);

// All the pages on the cursor are complete.. so we will cleanup everything and store a bookmark
if (complete) {

cleanupComplete(cursorList);
}
}

for (long i = pagingStore.getFirstPage(); i <= minPage; i++) {
if (!checkPageCompletion(cursorList, i)) {
break;
}
Page page = pagingStore.depage();
if (page == null) {
break;
}
depagedPages.add(page);
}
// Then we do some check on eventual pages that can be already removed but they are away from the streaming
cleanupMiddleStream(depagedPages, depagedPagesSet, cursorList, minPage, firstPage);

if (pagingStore.getNumberOfPages() == 0 || pagingStore.getNumberOfPages() == 1 && pagingStore.getCurrentPage().getNumberOfMessages() == 0) {
pagingStore.stopPaging();
@@ -503,6 +474,7 @@ public void cleanup() {
logger.warn(ex.getMessage(), ex);
return;
} finally {
logger.tracef("<<<< Cleanup end on %s", pagingStore.getAddress());
pagingStore.unlock();
}
}
@@ -511,6 +483,102 @@ public void cleanup() {

}

/**
* This cleanup process will calculate the min page for every cursor
* and then we remove the pages based on that.
* if we knew ahead all the queues belonging to every page we could remove this process.
* @param depagedPages
* @param depagedPagesSet
* @param cursorList
* @param minPage
* @param firstPage
* @throws Exception
*/
private void cleanupRegularStream(ArrayList<Page> depagedPages,
LongHashSet depagedPagesSet,
ArrayList<PageSubscription> cursorList,
long minPage,
long firstPage) throws Exception {
// if the current page is being written...
// on that case we need to move to verify it in a different way
Page currentPage = pagingStore.getCurrentPage();
if (minPage == pagingStore.getCurrentWritingPage() && currentPage != null && currentPage.getNumberOfMessages() > 0) {
boolean complete = checkPageCompletion(cursorList, minPage);

// All the pages on the cursor are complete.. so we will cleanup everything and store a bookmark
if (complete) {
cleanupComplete(cursorList);
}
}

for (long i = firstPage; i <= minPage; i++) {
if (!checkPageCompletion(cursorList, i)) {
break;
}
Page page = pagingStore.depage();
if (page == null) {
break;
}
if (logger.isDebugEnabled()) {
logger.debug("Depaging page " + page.getPageId());
}
depagedPagesSet.add(page.getPageId());
depagedPages.add(page);
}
}

/** The regular depaging will take care of removing messages in a regular streaming.
*
* if we had a list of all the cursors that belong to each page, this cleanup would be enough on every situation (with some adjustment to currentPages)
* So, this routing is to optimize removing pages when all the acks are made on every cursor.
* We still need regular depaging on a streamed manner as it will check the min page for all the existent cursors.
* */
private void cleanupMiddleStream(ArrayList<Page> depagedPages,
LongHashSet depagedPagesSet,
ArrayList<PageSubscription> cursorList,
long minPage,
long firstPage) {

final long currentPageId = pagingStore.getCurrentWritingPage();
LongObjectHashMap<AtomicInteger> counts = new LongObjectHashMap<>();

int subscriptions = cursorList.size();

cursorList.forEach(sub -> {
sub.forEachConsumedPage(consumedPage -> {
if (consumedPage.isDone()) {
AtomicInteger count = counts.get(consumedPage.getPageId());
if (count == null) {
count = new AtomicInteger(0);
counts.put(consumedPage.getPageId(), count);
}
count.incrementAndGet();
}
});
});

counts.forEach((pageID, counter) -> {
try {
// This check is to make sure we are not removing what has been already removed by depaging
if (pageID > minPage && pageID > firstPage && pageID != currentPageId) {
if (counter.get() >= subscriptions) {
if (!depagedPagesSet.contains(pageID.longValue())) {
Page page = pagingStore.removePage(pageID.intValue());
logger.debugf("Removing page %s", pageID);
if (page != null) {
depagedPages.add(page);
depagedPagesSet.add(page.getPageId());
}
}
}
}
} catch (Throwable e) {
logger.warn("Error while Issuing cleanupMiddlePages with " + pageID + ", counter = " + counter, e);
depagedPages.forEach(p -> logger.warn("page " + p));
}
});
}

// Protected as a way to inject testing
protected void cleanupComplete(ArrayList<PageSubscription> cursorList) throws Exception {
if (logger.isDebugEnabled()) {
@@ -550,7 +618,7 @@ protected void finishCleanup(ArrayList<Page> depagedPages) {

List<PagedMessage> pgdMessagesList = null;
try {
depagedPage.open();
depagedPage.open(false);
pgdMessagesList = depagedPage.read(storageManager, true);
} finally {
try {
@@ -583,23 +651,29 @@ protected void finishCleanup(ArrayList<Page> depagedPages) {

}

private boolean checkPageCompletion(ArrayList<PageSubscription> cursorList, long minPage) {
private boolean checkPageCompletion(ArrayList<PageSubscription> cursorList, long minPage) throws Exception {

logger.tracef("checkPageCompletion(%d)", minPage);

boolean complete = true;

Page page = pagingStore.createPage((int)minPage);
if (!page.getFile().exists()) {
logger.tracef("store %s did not have an existing file, considering it a complete file then", pagingStore.getAddress());
return true;
}

for (PageSubscription cursor : cursorList) {
if (!cursor.isComplete(minPage)) {
if (logger.isDebugEnabled()) {
logger.debug("Cursor " + cursor + " was considered incomplete at pageNr=" + minPage);
if (logger.isTraceEnabled()) {
logger.trace("Cursor " + cursor + " was considered incomplete at pageNr=" + minPage);
}

complete = false;
break;
} else {
if (logger.isDebugEnabled()) {
logger.debug("Cursor " + cursor + " was considered **complete** at pageNr=" + minPage);
if (logger.isTraceEnabled()) {
logger.trace("Cursor " + cursor + " was considered **complete** at pageNr=" + minPage);
}
}
}
@@ -662,8 +736,8 @@ private long checkMinPage(Collection<PageSubscription> cursorList) {

for (PageSubscription cursor : cursorList) {
long firstPage = cursor.getFirstPage();
if (logger.isDebugEnabled()) {
logger.debug(this.pagingStore.getAddress() + " has a cursor " + cursor + " with first page=" + firstPage);
if (logger.isTraceEnabled()) {
logger.trace(this.pagingStore.getAddress() + " has a cursor " + cursor + " with first page=" + firstPage);
}

// the cursor will return -1 if the cursor is empty
@@ -672,9 +746,7 @@ private long checkMinPage(Collection<PageSubscription> cursorList) {
}
}

if (logger.isDebugEnabled()) {
logger.debug(this.pagingStore.getAddress() + " has minPage=" + minPage);
}
logger.tracef("checkMinPage(%s) will have minPage=%s", pagingStore.getAddress(), minPage);

return minPage;

0 comments on commit cfdb710

Please sign in to comment.