1- /**
1+ /*
22 * Licensed to the Apache Software Foundation (ASF) under one or more
33 * contributor license agreements. See the NOTICE file distributed with
44 * this work for additional information regarding copyright ownership.
1717package org .apache .activemq .broker .region .cursors ;
1818
1919import java .io .IOException ;
20+ import java .util .ArrayList ;
2021import java .util .Iterator ;
2122import java .util .LinkedList ;
23+ import java .util .List ;
2224import java .util .concurrent .atomic .AtomicBoolean ;
2325import java .util .concurrent .atomic .AtomicLong ;
26+
2427import org .apache .activemq .broker .Broker ;
2528import org .apache .activemq .broker .ConnectionContext ;
2629import org .apache .activemq .broker .region .Destination ;
3134import org .apache .activemq .filter .NonCachedMessageEvaluationContext ;
3235import org .apache .activemq .openwire .OpenWireFormat ;
3336import org .apache .activemq .store .PList ;
34- import org .apache .activemq .store .PListStore ;
3537import org .apache .activemq .store .PListEntry ;
38+ import org .apache .activemq .store .PListStore ;
3639import org .apache .activemq .usage .SystemUsage ;
3740import org .apache .activemq .usage .Usage ;
3841import org .apache .activemq .usage .UsageListener ;
42+ import org .apache .activemq .util .ByteSequence ;
3943import org .apache .activemq .wireformat .WireFormat ;
4044import org .slf4j .Logger ;
4145import org .slf4j .LoggerFactory ;
42- import org .apache .activemq .util .ByteSequence ;
4346
4447/**
4548 * persist pending messages pending message (messages awaiting dispatch to a
4649 * consumer) cursor
47- *
48- *
4950 */
5051public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener {
52+
5153 static final Logger LOG = LoggerFactory .getLogger (FilePendingMessageCursor .class );
54+
5255 private static final AtomicLong NAME_COUNT = new AtomicLong ();
56+
5357 protected Broker broker ;
5458 private final PListStore store ;
5559 private final String name ;
@@ -61,6 +65,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
6165 private boolean flushRequired ;
6266 private final AtomicBoolean started = new AtomicBoolean ();
6367 private final WireFormat wireFormat = new OpenWireFormat ();
68+
6469 /**
6570 * @param broker
6671 * @param name
@@ -374,9 +379,7 @@ public synchronized void clear() {
374379
375380 @ Override
376381 public synchronized boolean isFull () {
377-
378382 return super .isFull () || (!isDiskListEmpty () && systemUsage != null && systemUsage .getTempUsage ().isFull ());
379-
380383 }
381384
382385 @ Override
@@ -392,18 +395,25 @@ public void setSystemUsage(SystemUsage usageManager) {
392395 @ Override
393396 public void onUsageChanged (Usage usage , int oldPercentUsage , int newPercentUsage ) {
394397 if (newPercentUsage >= getMemoryUsageHighWaterMark ()) {
398+ List <MessageReference > expiredMessages = null ;
395399 synchronized (this ) {
396400 if (!flushRequired && size () != 0 ) {
397401 flushRequired =true ;
398402 if (!iterating ) {
399- expireOldMessages ();
403+ expiredMessages = expireOldMessages ();
400404 if (!hasSpace ()) {
401405 flushToDisk ();
402406 flushRequired = false ;
403407 }
404408 }
405409 }
406410 }
411+
412+ if (expiredMessages != null ) {
413+ for (MessageReference node : expiredMessages ) {
414+ discardExpiredMessage (node );
415+ }
416+ }
407417 }
408418 }
409419
@@ -412,26 +422,30 @@ public boolean isTransient() {
412422 return true ;
413423 }
414424
415- protected synchronized void expireOldMessages () {
425+ private synchronized List <MessageReference > expireOldMessages () {
426+ List <MessageReference > expired = new ArrayList <MessageReference >();
416427 if (!memoryList .isEmpty ()) {
417428 for (Iterator <MessageReference > iterator = memoryList .iterator (); iterator .hasNext ();) {
418429 MessageReference node = iterator .next ();
419430 if (node .isExpired ()) {
420431 node .decrementReferenceCount ();
421- discardExpiredMessage (node );
432+ expired . add (node );
422433 iterator .remove ();
423434 }
424435 }
425436 }
437+
438+ return expired ;
426439 }
427440
428441 protected synchronized void flushToDisk () {
429442 if (!memoryList .isEmpty () && store != null ) {
430443 long start = 0 ;
431- if (LOG .isTraceEnabled ()) {
444+ if (LOG .isTraceEnabled ()) {
432445 start = System .currentTimeMillis ();
433- LOG .trace ("{}, flushToDisk() mem list size: {} {}" , new Object []{ name , memoryList .size (), (systemUsage != null ? systemUsage .getMemoryUsage () : "" ) });
434- }
446+ LOG .trace ("{}, flushToDisk() mem list size: {} {}" , new Object [] { name , memoryList .size (),
447+ (systemUsage != null ? systemUsage .getMemoryUsage () : "" ) });
448+ }
435449 for (Iterator <MessageReference > iterator = memoryList .iterator (); iterator .hasNext ();) {
436450 MessageReference node = iterator .next ();
437451 node .decrementReferenceCount ();
0 commit comments