Skip to content

Commit

Permalink
HORNETQ-1017 - Prepared transactions are not cleaning properly up on …
Browse files Browse the repository at this point in the history
…paging

and Optimize expiry process on paged messages
  • Loading branch information
clebertsuconic committed Sep 6, 2012
1 parent bbd4c7d commit c15b502
Show file tree
Hide file tree
Showing 14 changed files with 1,055 additions and 307 deletions.
96 changes: 84 additions & 12 deletions src/main/org/hornetq/core/paging/PrintPages.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ public static void main(final String arg[])
try
{

Pair<Map<Long, Set<PagePosition>>, Set<Long>> cursorACKs = PrintPages.loadCursorACKs(arg[1]);
PageCursorsInfo cursorACKs = PrintPages.loadCursorACKs(arg[1]);

Set<Long> pgTXs = cursorACKs.getB();
Set<Long> pgTXs = cursorACKs.getPgTXs();

ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1);
final ExecutorService executor = Executors.newFixedThreadPool(10);
Expand Down Expand Up @@ -139,7 +139,7 @@ public Executor getExecutor()

boolean acked = false;

Set<PagePosition> positions = cursorACKs.getA().get(q[i]);
Set<PagePosition> positions = cursorACKs.getCursorRecords().get(q[i]);
if (positions != null)
{
acked = positions.contains(posCheck);
Expand All @@ -149,6 +149,12 @@ public Executor getExecutor()
{
System.out.print(" (ACK)");
}

if (cursorACKs.getCompletePages(q[i]).contains(new Long(pgid)))
{
System.out.println(" (PG-COMPLETE)");
}


if (i + 1 < q.length)
{
Expand All @@ -174,13 +180,67 @@ public Executor getExecutor()
e.printStackTrace();
}
}

private static class PageCursorsInfo
{
private final Map<Long, Set<PagePosition>> cursorRecords = new HashMap<Long, Set<PagePosition>>();

private final Set<Long> pgTXs = new HashSet<Long>();

private final Map<Long, Set<Long>> completePages = new HashMap<Long, Set<Long>>();

public PageCursorsInfo()
{
}


/**
* @return the pgTXs
*/
public Set<Long> getPgTXs()
{
return pgTXs;
}


/**
* @return the cursorRecords
*/
public Map<Long, Set<PagePosition>> getCursorRecords()
{
return cursorRecords;
}


/**
* @return the completePages
*/
public Map<Long, Set<Long>> getCompletePages()
{
return completePages;
}

public Set<Long> getCompletePages(Long queueID)
{
Set<Long> completePagesSet = completePages.get(queueID);

if (completePagesSet == null)
{
completePagesSet = new HashSet<Long>();
completePages.put(queueID, completePagesSet);
}

return completePagesSet;
}

}

/**
* @param journalLocation
* @return
* @throws Exception
*/
protected static Pair<Map<Long, Set<PagePosition>>, Set<Long>> loadCursorACKs(final String journalLocation) throws Exception
protected static PageCursorsInfo loadCursorACKs(final String journalLocation) throws Exception
{
SequentialFileFactory messagesFF = new NIOSequentialFileFactory(journalLocation, null);

Expand All @@ -202,10 +262,9 @@ protected static Pair<Map<Long, Set<PagePosition>>, Set<Long>> loadCursorACKs(fi
ArrayList<PreparedTransactionInfo> txs = new ArrayList<PreparedTransactionInfo>();

messagesJournal.load(records, txs, null, false);

Map<Long, Set<PagePosition>> cursorRecords = new HashMap<Long, Set<PagePosition>>();

Set<Long> pgTXs = new HashSet<Long>();
PageCursorsInfo cursorInfo = new PageCursorsInfo();


for (RecordInfo record : records)
{
Expand All @@ -218,24 +277,37 @@ protected static Pair<Map<Long, Set<PagePosition>>, Set<Long>> loadCursorACKs(fi
CursorAckRecordEncoding encoding = new CursorAckRecordEncoding();
encoding.decode(buff);

Set<PagePosition> set = cursorRecords.get(encoding.queueID);
Set<PagePosition> set = cursorInfo.getCursorRecords().get(encoding.queueID);

if (set == null)
{
set = new HashSet<PagePosition>();
cursorRecords.put(encoding.queueID, set);
cursorInfo.getCursorRecords().put(encoding.queueID, set);
}

set.add(encoding.position);
}
else if (record.userRecordType == JournalStorageManager.PAGE_CURSOR_COMPLETE)
{
CursorAckRecordEncoding encoding = new CursorAckRecordEncoding();
encoding.decode(buff);

Long queueID = new Long(encoding.queueID);
Long pageNR = new Long(encoding.position.getPageNr());

if (!cursorInfo.getCompletePages(queueID).add(pageNR))
{
System.err.println("Page " + pageNR + " has been already set as complete on queue " + queueID);
}
}
else if (record.userRecordType == JournalStorageManager.PAGE_TRANSACTION)
{
if (record.isUpdate)
{
PageUpdateTXEncoding pageUpdate = new PageUpdateTXEncoding();

pageUpdate.decode(buff);
pgTXs.add(pageUpdate.pageTX);
cursorInfo.getPgTXs().add(pageUpdate.pageTX);
}
else
{
Expand All @@ -244,12 +316,12 @@ else if (record.userRecordType == JournalStorageManager.PAGE_TRANSACTION)
pageTransactionInfo.decode(buff);

pageTransactionInfo.setRecordID(record.id);
pgTXs.add(pageTransactionInfo.getTransactionID());
cursorInfo.getPgTXs().add(pageTransactionInfo.getTransactionID());
}
}
}

return new Pair<Map<Long, Set<PagePosition>>, Set<Long>>(cursorRecords, pgTXs);
return cursorInfo;
}

// Package protected ---------------------------------------------
Expand Down
3 changes: 0 additions & 3 deletions src/main/org/hornetq/core/paging/cursor/PagePosition.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,5 @@ public interface PagePosition extends Comparable<PagePosition>
PagePosition nextMessage();

PagePosition nextPage();

/** This will just test if the current position is the immediate next to the parameter position */
boolean isRightAfter(PagePosition previous);

}
9 changes: 9 additions & 0 deletions src/main/org/hornetq/core/paging/cursor/PageSubscription.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.util.concurrent.Executor;

import org.hornetq.core.paging.Page;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.server.Queue;
Expand Down Expand Up @@ -86,6 +87,8 @@ public interface PageSubscription
* @param position
*/
void reloadACK(PagePosition position);

void reloadPageCompletion(PagePosition position);

/**
* To be called when the cursor decided to ignore a position.
Expand Down Expand Up @@ -138,4 +141,10 @@ public interface PageSubscription
* @return
*/
Executor getExecutor();

/**
* @param deletedPage
* @throws Exception
*/
void onDeletePage(Page deletedPage) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -342,11 +342,12 @@ public void cleanup()
log.debug("Asserting cleanup for address " + this.pagingStore.getAddress());
}

ArrayList<PageSubscription> cursorList = new ArrayList<PageSubscription>();
cursorList.addAll(activeCursors.values());
ArrayList<PageSubscription> cursorList = cloneSubscriptions();

long minPage = checkMinPage(cursorList);

// if the current page is being written...
// on that case we need to move to verify it in a different way
if (minPage == pagingStore.getCurrentWritingPage() && pagingStore.getCurrentPage().getNumberOfMessages() > 0)
{
boolean complete = true;
Expand All @@ -372,6 +373,8 @@ public void cleanup()
}
}


// All the pages on the cursor are complete.. so we will cleanup everything and store a bookmark
if (complete)
{

Expand All @@ -385,7 +388,7 @@ public void cleanup()

Page currentPage = pagingStore.getCurrentPage();

storePositions(cursorList, currentPage);
storeBookmark(cursorList, currentPage);

pagingStore.stopPaging();

Expand Down Expand Up @@ -486,6 +489,7 @@ public void cleanup()
}

depagedPage.delete(pgdMessages);
onDeletePage(depagedPage);

synchronized (softCache)
{
Expand All @@ -501,12 +505,31 @@ public void cleanup()

}

/**
* @return
*/
private synchronized ArrayList<PageSubscription> cloneSubscriptions()
{
ArrayList<PageSubscription> cursorList = new ArrayList<PageSubscription>();
cursorList.addAll(activeCursors.values());
return cursorList;
}

protected void onDeletePage(Page deletedPage) throws Exception
{
List<PageSubscription> subscriptions = cloneSubscriptions();
for (PageSubscription subs: subscriptions)
{
subs.onDeletePage(deletedPage);
}
}

/**
* @param cursorList
* @param currentPage
* @throws Exception
*/
protected void storePositions(ArrayList<PageSubscription> cursorList, Page currentPage) throws Exception
protected void storeBookmark(ArrayList<PageSubscription> cursorList, Page currentPage) throws Exception
{
try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@

package org.hornetq.core.paging.cursor.impl;

import java.lang.ref.WeakReference;

import org.hornetq.core.paging.cursor.PageCache;
import org.hornetq.core.paging.cursor.PagePosition;

/**
Expand All @@ -29,6 +26,13 @@ public class PagePositionImpl implements PagePosition
{
private long pageNr;

/**
* The index of the message on the page file.
*
* This can be used as -1 in cases where the message is irrelevant,
* for instance when a cursor is storing the next message to be received
* or when a page is marked as fully complete (as the ACKs are removed)
*/
private int messageNr;

/** ID used for storage */
Expand Down Expand Up @@ -85,11 +89,6 @@ public int getMessageNr()
{
return messageNr;
}

public boolean isRightAfter(final PagePosition previous)
{
return this.pageNr == previous.getPageNr() && this.messageNr == previous.getMessageNr() + 1;
}

/* (non-Javadoc)
* @see java.lang.Comparable#compareTo(java.lang.Object)
Expand Down
Loading

0 comments on commit c15b502

Please sign in to comment.