Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…duled messages that match the filter.
  • Loading branch information
andytaylor committed Jan 11, 2010
1 parent 0f28f7f commit 1c2f8fc
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 43 deletions.
Expand Up @@ -12,6 +12,8 @@
*/
package org.hornetq.core.server;

import org.hornetq.core.filter.Filter;

import java.util.List;

/**
Expand All @@ -21,13 +23,11 @@ public interface ScheduledDeliveryHandler
{
boolean checkAndSchedule(MessageReference ref);

void reSchedule();

int getScheduledCount();

List<MessageReference> getScheduledReferences();

List<MessageReference> cancel();
List<MessageReference> cancel(Filter filter);

MessageReference removeReferenceWithID(long id);
}
25 changes: 9 additions & 16 deletions src/main/org/hornetq/core/server/impl/QueueImpl.java
Expand Up @@ -54,7 +54,6 @@
import org.hornetq.core.transaction.TransactionPropertyIndexes;
import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.utils.ConcurrentHashSet;
import org.hornetq.utils.ConcurrentSet;

/**
* Implementation of a Queue
Expand Down Expand Up @@ -646,15 +645,12 @@ public int deleteMatchingReferences(final Filter filter) throws Exception
}
}

List<MessageReference> cancelled = scheduledDeliveryHandler.cancel();
List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(filter);
for (MessageReference messageReference : cancelled)
{
if (filter == null || filter.match(messageReference.getMessage()))
{
deliveringCount.incrementAndGet();
acknowledge(tx, messageReference);
count++;
}
deliveringCount.incrementAndGet();
acknowledge(tx, messageReference);
count++;
}

tx.commit();
Expand Down Expand Up @@ -801,16 +797,13 @@ public int moveReferences(final Filter filter, final SimpleString toAddress) thr
}
}

List<MessageReference> cancelled = scheduledDeliveryHandler.cancel();
List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(filter);
for (MessageReference ref : cancelled)
{
if (filter == null || filter.match(ref.getMessage()))
{
deliveringCount.incrementAndGet();
move(toAddress, tx, ref, false);
acknowledge(tx, ref);
count++;
}
deliveringCount.incrementAndGet();
move(toAddress, tx, ref, false);
acknowledge(tx, ref);
count++;
}

tx.commit();
Expand Down
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.hornetq.core.filter.Filter;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.ScheduledDeliveryHandler;
Expand All @@ -42,8 +43,6 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler

private final Map<Long, ScheduledDeliveryRunnable> scheduledRunnables = new LinkedHashMap<Long, ScheduledDeliveryRunnable>();

private boolean rescheduled;

public ScheduledDeliveryHandlerImpl(final ScheduledExecutorService scheduledExecutor)
{
this.scheduledExecutor = scheduledExecutor;
Expand Down Expand Up @@ -74,22 +73,6 @@ public boolean checkAndSchedule(final MessageReference ref)
return false;
}

public void reSchedule()
{
synchronized (scheduledRunnables)
{
if (!rescheduled)
{
for (ScheduledDeliveryRunnable runnable : scheduledRunnables.values())
{
scheduleDelivery(runnable, runnable.getReference().getScheduledDeliveryTime());
}

rescheduled = true;
}
}
}

public int getScheduledCount()
{
return scheduledRunnables.size();
Expand All @@ -109,20 +92,26 @@ public List<MessageReference> getScheduledReferences()
return refs;
}

public List<MessageReference> cancel()
public List<MessageReference> cancel(final Filter filter)
{
List<MessageReference> refs = new ArrayList<MessageReference>();

synchronized (scheduledRunnables)
{
for (ScheduledDeliveryRunnable runnable : scheduledRunnables.values())
Map<Long, ScheduledDeliveryRunnable> copy = new LinkedHashMap<Long, ScheduledDeliveryRunnable>(scheduledRunnables);
for (ScheduledDeliveryRunnable runnable : copy.values())
{
runnable.cancel();
if (filter == null || filter.match(runnable.getReference().getMessage()))
{
runnable.cancel();

refs.add(runnable.getReference());
refs.add(runnable.getReference());
}
}
for (MessageReference ref : refs)
{
scheduledRunnables.remove(ref.getMessage().getMessageID());
}

scheduledRunnables.clear();
}
return refs;
}
Expand Down

0 comments on commit 1c2f8fc

Please sign in to comment.