Skip to content

Commit

Permalink
ARTEMIS-2216 Use a specific executor for pageSyncTimer
Browse files Browse the repository at this point in the history
  • Loading branch information
qihongxu committed Jan 3, 2019
1 parent 2c7bd56 commit 3c01f25
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 14 deletions.
Expand Up @@ -71,6 +71,8 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener

boolean isPaging();

boolean isPagingDirtyRead();

/**
* Schedules sync to the file storage.
*/
Expand Down
Expand Up @@ -1350,7 +1350,7 @@ public synchronized boolean hasNext() {
return true;
}

if (!pageStore.isPaging()) {
if (!pageStore.isPagingDirtyRead()) {
return false;
}

Expand Down
Expand Up @@ -152,7 +152,7 @@ public PageCursorProvider newCursorProvider(PagingStore store,
@Override
public synchronized PagingStore newStore(final SimpleString address, final AddressSettings settings) {

return new PagingStoreImpl(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, null, this, address, settings, executorFactory.getExecutor(), syncNonTransactional);
return new PagingStoreImpl(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, null, this, address, settings, executorFactory.getExecutor(), executorFactory.getExecutor(), syncNonTransactional);
}

@Override
Expand Down Expand Up @@ -223,7 +223,7 @@ public List<PagingStore> reloadStores(final HierarchicalRepository<AddressSettin

AddressSettings settings = addressSettingsRepository.getMatch(address.toString());

PagingStore store = new PagingStoreImpl(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, factory, this, address, settings, executorFactory.getExecutor(), syncNonTransactional);
PagingStore store = new PagingStoreImpl(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, factory, this, address, settings, executorFactory.getExecutor(), executorFactory.getExecutor(), syncNonTransactional);

storesReturn.add(store);
}
Expand Down
Expand Up @@ -140,6 +140,21 @@ public PagingStoreImpl(final SimpleString address,
final AddressSettings addressSettings,
final ArtemisExecutor executor,
final boolean syncNonTransactional) {
this(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, fileFactory, storeFactory, storeName, addressSettings, executor, executor, syncNonTransactional);
}

public PagingStoreImpl(final SimpleString address,
final ScheduledExecutorService scheduledExecutor,
final long syncTimeout,
final PagingManager pagingManager,
final StorageManager storageManager,
final SequentialFileFactory fileFactory,
final PagingStoreFactory storeFactory,
final SimpleString storeName,
final AddressSettings addressSettings,
final ArtemisExecutor executor,
final ArtemisExecutor ioExecutor,
final boolean syncNonTransactional) {
if (pagingManager == null) {
throw new IllegalStateException("Paging Manager can't be null");
}
Expand Down Expand Up @@ -172,7 +187,7 @@ public PagingStoreImpl(final SimpleString address,
this.syncNonTransactional = syncNonTransactional;

if (scheduledExecutor != null && syncTimeout > 0) {
this.syncTimer = new PageSyncTimer(this, scheduledExecutor, executor, syncTimeout);
this.syncTimer = new PageSyncTimer(this, scheduledExecutor, ioExecutor, syncTimeout);
} else {
this.syncTimer = null;
}
Expand Down Expand Up @@ -278,21 +293,26 @@ public boolean isPaging() {
lock.readLock().lock();

try {
if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) {
return false;
}
if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) {
return isFull();
}
if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP) {
return isFull();
}
return paging;
return isPagingDirtyRead();
} finally {
lock.readLock().unlock();
}
}

@Override
public boolean isPagingDirtyRead() {
if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) {
return false;
}
if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) {
return isFull();
}
if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP) {
return isFull();
}
return paging;
}

@Override
public int getNumberOfPages() {
return numberOfPages;
Expand Down
Expand Up @@ -337,6 +337,11 @@ public boolean isPaging() {
return false;
}

@Override
public boolean isPagingDirtyRead() {
return false;
}

@Override
public void sync() throws Exception {

Expand Down

0 comments on commit 3c01f25

Please sign in to comment.