Skip to content

Commit

Permalink
QPID-5580 : [Java Broker] Introduce explicit type hierarchy for queue…
Browse files Browse the repository at this point in the history
…s in the ConfiguredObject model

git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1588234 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
rgodfrey committed Apr 17, 2014
1 parent 90a9dc3 commit 672f3f7
Show file tree
Hide file tree
Showing 42 changed files with 454 additions and 346 deletions.
Expand Up @@ -24,8 +24,7 @@

import org.apache.qpid.server.queue.QueueEntryVisitor;

@ManagedObject

@ManagedObject( defaultType = "standard" )
public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>
{

Expand All @@ -39,22 +38,15 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>
String MESSAGE_GROUP_KEY = "messageGroupKey";
String MESSAGE_GROUP_SHARED_GROUPS = "messageGroupSharedGroups";
String MESSAGE_GROUP_DEFAULT_GROUP = "messageGroupDefaultGroup";
String LVQ_KEY = "lvqKey";
String MAXIMUM_DELIVERY_ATTEMPTS = "maximumDeliveryAttempts";
String NO_LOCAL = "noLocal";
String OWNER = "owner";
String QUEUE_FLOW_CONTROL_SIZE_BYTES = "queueFlowControlSizeBytes";
String QUEUE_FLOW_RESUME_SIZE_BYTES = "queueFlowResumeSizeBytes";
String QUEUE_FLOW_STOPPED = "queueFlowStopped";
String SORT_KEY = "sortKey";
String QUEUE_TYPE = "queueType";
String PRIORITIES = "priorities";

String CREATE_DLQ_ON_CREATION = "x-qpid-dlq-enabled"; // TODO - this value should change

@ManagedAttribute
String getQueueType();

@ManagedAttribute
Exchange getAlternateExchange();

Expand All @@ -67,11 +59,6 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>
@ManagedAttribute
boolean getNoLocal();

@ManagedAttribute
String getLvqKey();

@ManagedAttribute
String getSortKey();

@ManagedAttribute
String getMessageGroupKey();
Expand Down Expand Up @@ -135,8 +122,6 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>
@ManagedAttribute( automate = true, defaultValue = "${queue.alertRepeatGap}")
long getAlertRepeatGap();

@ManagedAttribute
int getPriorities();

//children
Collection<? extends Binding> getBindings();
Expand Down
Expand Up @@ -82,21 +82,21 @@ private AMQQueue createOrRestoreQueue(Map<String, Object> attributes, boolean cr

AMQQueue queue;

if(attributes.containsKey(Queue.SORT_KEY))
if(attributes.containsKey(SortedQueue.SORT_KEY))
{
queue = new SortedQueue(_virtualHost, attributes);
queue = new SortedQueueImpl(_virtualHost, attributes);
}
else if(attributes.containsKey(Queue.LVQ_KEY))
else if(attributes.containsKey(LastValueQueue.LVQ_KEY))
{
queue = new ConflationQueue(_virtualHost, attributes);
queue = new LastValueQueueImpl(_virtualHost, attributes);
}
else if(attributes.containsKey(Queue.PRIORITIES))
else if(attributes.containsKey(PriorityQueue.PRIORITIES))
{
queue = new PriorityQueue(_virtualHost, attributes);
queue = new PriorityQueueImpl(_virtualHost, attributes);
}
else
{
queue = new StandardQueue(_virtualHost, attributes);
queue = new StandardQueueImpl(_virtualHost, attributes);
}
queue.open();
//Register the new queue
Expand Down
Expand Up @@ -89,9 +89,9 @@
import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;

public abstract class AbstractQueue
extends AbstractConfiguredObject<AbstractQueue>
implements AMQQueue<AbstractQueue>,
public abstract class AbstractQueue<X extends AbstractQueue<X>>
extends AbstractConfiguredObject<X>
implements AMQQueue<X>,
StateChangeListener<QueueConsumer<?>, State>,
MessageGroupManager.ConsumerResetHelper
{
Expand Down Expand Up @@ -543,40 +543,10 @@ else if(MESSAGE_GROUP_SHARED_GROUPS.equals(name))
//We only return the boolean value if message groups are actually in use
return _arguments.get(MESSAGE_GROUP_KEY) == null ? null : _arguments.get(MESSAGE_GROUP_SHARED_GROUPS);
}
else if(LVQ_KEY.equals(name))
{
if(this instanceof ConflationQueue)
{
return ((ConflationQueue)this).getConflationKey();
}
}
else if(QUEUE_FLOW_STOPPED.equals(name))
{
return isOverfull();
}
else if(SORT_KEY.equals(name))
{
if(this instanceof SortedQueue)
{
return ((SortedQueue)this).getSortedPropertyName();
}
}
else if(QUEUE_TYPE.equals(name))
{
if(this instanceof SortedQueue)
{
return "sorted";
}
if(this instanceof ConflationQueue)
{
return "lvq";
}
if(this instanceof PriorityQueue)
{
return "priority";
}
return "standard";
}
else if(STATE.equals(name))
{
return State.ACTIVE; // TODO
Expand All @@ -585,13 +555,6 @@ else if (DESCRIPTION.equals(name))
{
return getDescription();
}
else if(PRIORITIES.equals(name))
{
if(this instanceof PriorityQueue)
{
return ((PriorityQueue)this).getPriorities();
}
}

return super.getAttribute(name);
}
Expand Down Expand Up @@ -2678,12 +2641,6 @@ protected boolean setState(final State currentState, final State desiredState)
return false;
}

@Override
public String getQueueType()
{
return null;
}

@Override
public ExclusivityPolicy getExclusive()
{
Expand All @@ -2696,18 +2653,6 @@ public boolean getNoLocal()
return _noLocal;
}

@Override
public String getLvqKey()
{
return null;
}

@Override
public String getSortKey()
{
return null;
}

@Override
public String getMessageGroupKey()
{
Expand All @@ -2727,13 +2672,6 @@ public boolean isQueueFlowStopped()
return false;
}

@Override
public int getPriorities()
{
return 0;
}


@Override
public State getState()
{
Expand Down
@@ -0,0 +1,33 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.queue;

import org.apache.qpid.server.model.ManagedAttribute;
import org.apache.qpid.server.model.ManagedObject;

@ManagedObject( category = false, type="lvq" )
public interface LastValueQueue<X extends LastValueQueue<X>> extends AMQQueue<X>
{
String LVQ_KEY = "lvqKey";

@ManagedAttribute
String getLvqKey();
}
Expand Up @@ -23,35 +23,52 @@

import java.util.Map;

import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;

public class ConflationQueue extends AbstractQueue
public class LastValueQueueImpl extends AbstractQueue<LastValueQueueImpl> implements LastValueQueue<LastValueQueueImpl>
{
public static final String DEFAULT_LVQ_KEY = "qpid.LVQ_key";


protected ConflationQueue(VirtualHostImpl virtualHost,
Map<String, Object> attributes)
protected LastValueQueueImpl(VirtualHostImpl virtualHost,
Map<String, Object> attributes)
{
super(virtualHost, attributes, entryList(attributes));
}

private static ConflationQueueList.Factory entryList(final Map<String, Object> attributes)
private static LastValueQueueList.Factory entryList(final Map<String, Object> attributes)
{

String conflationKey = MapValueConverter.getStringAttribute(Queue.LVQ_KEY,
String conflationKey = MapValueConverter.getStringAttribute(LVQ_KEY,
attributes,
DEFAULT_LVQ_KEY);

// conflation key can still be null if it was present in the map with a null value
return new ConflationQueueList.Factory(conflationKey == null ? DEFAULT_LVQ_KEY : conflationKey);
return new LastValueQueueList.Factory(conflationKey == null ? DEFAULT_LVQ_KEY : conflationKey);
}

public String getConflationKey()
{
return ((ConflationQueueList)getEntries()).getConflationKey();
return ((LastValueQueueList)getEntries()).getConflationKey();
}

@Override
public Object getAttribute(final String name)
{
if(LVQ_KEY.equals(name))
{
if(this instanceof LastValueQueueImpl)
{
return getConflationKey();
}
}
return super.getAttribute(name);
}

@Override
public String getLvqKey()
{
return getConflationKey();
}
}
Expand Up @@ -21,28 +21,29 @@

package org.apache.qpid.server.queue;

import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;

public class ConflationQueueList extends OrderedQueueEntryList
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;

public class LastValueQueueList extends OrderedQueueEntryList
{
private static final Logger LOGGER = LoggerFactory.getLogger(ConflationQueueList.class);
private static final Logger LOGGER = LoggerFactory.getLogger(LastValueQueueList.class);

private static final HeadCreator HEAD_CREATOR = new HeadCreator()
{

@Override
public ConflationQueueEntry createHead(final QueueEntryList list)
{
return ((ConflationQueueList)list).createHead();
return ((LastValueQueueList)list).createHead();
}
};

Expand All @@ -53,7 +54,7 @@ public ConflationQueueEntry createHead(final QueueEntryList list)
private final ConflationQueueEntry _deleteInProgress = new ConflationQueueEntry(this);
private final ConflationQueueEntry _newerEntryAlreadyBeenAndGone = new ConflationQueueEntry(this);

public ConflationQueueList(ConflationQueue queue, String conflationKey)
public LastValueQueueList(LastValueQueueImpl queue, String conflationKey)
{
super(queue, HEAD_CREATOR);
_conflationKey = conflationKey;
Expand Down Expand Up @@ -199,12 +200,12 @@ final class ConflationQueueEntry extends OrderedQueueEntry

private AtomicReference<ConflationQueueEntry> _latestValueReference;

private ConflationQueueEntry(final ConflationQueueList queueEntryList)
private ConflationQueueEntry(final LastValueQueueList queueEntryList)
{
super(queueEntryList);
}

public ConflationQueueEntry(ConflationQueueList queueEntryList, ServerMessage message)
public ConflationQueueEntry(LastValueQueueList queueEntryList, ServerMessage message)
{
super(queueEntryList, message);
}
Expand Down Expand Up @@ -264,9 +265,9 @@ static class Factory implements QueueEntryListFactory
}

@Override
public ConflationQueueList createQueueEntryList(final AMQQueue<?> queue)
public LastValueQueueList createQueueEntryList(final AMQQueue<?> queue)
{
return new ConflationQueueList((ConflationQueue)queue, _conflationKey);
return new LastValueQueueList((LastValueQueueImpl)queue, _conflationKey);
}
}
}
Expand Up @@ -20,11 +20,11 @@
*/
package org.apache.qpid.server.queue;

import org.apache.qpid.server.virtualhost.VirtualHostImpl;

import java.util.Map;

public abstract class OutOfOrderQueue extends AbstractQueue
import org.apache.qpid.server.virtualhost.VirtualHostImpl;

public abstract class OutOfOrderQueue<X extends OutOfOrderQueue<X>> extends AbstractQueue<X>
{

protected OutOfOrderQueue(VirtualHostImpl virtualHost,
Expand Down

0 comments on commit 672f3f7

Please sign in to comment.