Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARTEMIS-3285 potential duplicate messages with LVQ + non-destructive #3569

Closed
wants to merge 1 commit into from

Conversation

jbertram
Copy link
Contributor

@jbertram jbertram commented May 6, 2021

No description provided.


@Test
public void testMultipleLastValuesCore() throws Exception {
testMultipleLastValues(CoreConnection);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you instead use CFUtil.createConnectionFactory("AMQP", "tcp://localhost:61616")

or

CFUtil.createConnectionFactory("CORE", "tcp://localhost:61616")

We should probably remove the Connection supplier. It's a bit simpler to read.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used the ConnectionSupplier because that's what all the other tests in the class use and I find that consistency helps readability. I wouldn't necessarily mind removing the ConnectionSupplier altogether, but it's used in around 80 other places so that change is beyond the scope of this PR. All things being equal I think the ConnectionSupplier is at least as readable as the alternative you have proposed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather deprecate it. .and move to CFUtil everywhere.

if (tm == null) {
break;
}
System.out.println("Received: " + tm.getStringProperty("data") + "; total: " + ++received);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logger.debug please

I have cleaned up the test suite recently to not use System.out.prinltn

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

if (occurrences > 1 && !dups.containsValue(Integer.parseInt(s))) {
dups.put(s, occurrences);
}
System.out.print(s + ",");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use log.info

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

private static class HolderReference implements MessageReference {

private final SimpleString prop;

private volatile boolean delivered = false;
private volatile AtomicBoolean delivered = new AtomicBoolean(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the AtomicLongFieldUpdater instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can tell you can't use AtomicLongFieldUpdater on a boolean.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theres a way to do something, ill try dig it out.

packetSize +
" from credits, available now is " +
availableCredits);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to ack the message if not?

This will create a big leakage on the journal

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.. and paging

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to at least test what would happen on journal... try the issue out, and look at ./artemis print data if there are duplicates on the journal. I believe you would hit worsened issues here.

you need better tests where you would look at the result on the journal I am afraid.

@@ -497,4 +497,8 @@ default void errorProcessing(Consumer consumer, Throwable t, MessageReference me
default QueueConfiguration getQueueConfiguration() {
return null;
}

default boolean canBeDelivered(MessageReference ref) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also breaking the contract on the Queue...

You have Queue::deliver already checking the message on handle(ref, consumer);

Whatever happens here needs to be treated inside handle.

Perhaps having a specialized version of handle would be a better choice.

having this canBeDelivered happened at the proceedDeliver can create a lot of other issues... like I'm predicting with leaving references and message on the journal and paging.

}
Assert.fail("Duplicate messages received " + sb);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the following code here:

      HashMap<Integer, AtomicInteger> map = countJournal(server.getConfiguration());

      map.forEach((a, b) -> System.out.println("record type " + a + ", counts=" + b));

That would be all you need to check on the journal...

Right now the output here shows exactly what I predicted:

record type 32, counts=100
record type 33, counts=97
record type 34, counts=3
record type 45, counts=100

You will need some assertion on those records. You need to ack the previous message that was already delivered.

If you can do that on handle(MessageReference), make it specialized on LastValueQueue would be better than the method you added on the consumer.

@clebertsuconic
Copy link
Contributor

there are also a few failures added on this PR:

org.apache.activemq.artemis.tests.integration.amqp.JMSNonDestructiveTest.testNonDestructiveCoreProducerCoreConsumer[persistenceEnabled=false, scanPeriod=100]
org.apache.activemq.artemis.tests.integration.amqp.JMSNonDestructiveTest.testNonDestructiveAMQPProducerAMQPConsumer[persistenceEnabled=false, scanPeriod=100]
org.apache.activemq.artemis.tests.integration.amqp.JMSNonDestructiveTest.testNonDestructiveLVQWithConsumerFirstAMQP[persistenceEnabled=false, scanPeriod=100]
org.apache.activemq.artemis.tests.integration.amqp.JMSNonDestructiveTest.testNonDestructiveLVQWithConsumerFirstCore[persistenceEnabled=false, scanPeriod=100]
org.apache.activemq.artemis.tests.integration.amqp.JMSNonDestructiveTest.testNonDestructiveCoreProducerAMQPConsumer[persistenceEnabled=false, scanPeriod=100]
org.apache.activemq.artemis.tests.integration.amqp.JMSNonDestructiveTest.testNonDestructiveAMQPProducerCoreConsumer[persistenceEnabled=false, scanPeriod=100]
org.apache.activemq.artemis.tests.integration.amqp.JMSNonDestructiveTest.testNonDestructiveCoreProducerCoreConsumer[persistenceEnabled=true, scanPeriod=100]
org.apache.activemq.artemis.tests.integration.amqp.JMSNonDestructiveTest.testNonDestructiveAMQPProducerAMQPConsumer[persistenceEnabled=true, scanPeriod=100]
org.apache.activemq.artemis.tests.integration.amqp.JMSNonDestructiveTest.testNonDestructiveLVQWithConsumerFirstAMQP[persistenceEnabled=true, scanPeriod=100]
org.apache.activemq.artemis.tests.integration.amqp.JMSNonDestructiveTest.testNonDestructiveLVQWithConsumerFirstCore[persistenceEnabled=true, scanPeriod=100]
org.apache.activemq.artemis.tests.integration.amqp.JMSNonDestructiveTest.testNonDestructiveCoreProducerAMQPConsumer[persistenceEnabled=true, scanPeriod=100]
org.apache.activemq.artemis.tests.integration.amqp.JMSNonDestructiveTest.testNonDestructiveAMQPProducerCoreConsumer[persistenceEnabled=true, scanPeriod=100]
org.apache.activemq.artemis.tests.integration.amqp.JMSNonDestructiveTest.testNonDestructiveCoreProducerCoreConsumer[persistenceEnabled=true, scanPeriod=-1]
org.apache.activemq.artemis.tests.integration.amqp.JMSNonDestructiveTest.testNonDestructiveAMQPProducerAMQPConsumer[persistenceEnabled=true, scanPeriod=-1]
org.apache.activemq.artemis.tests.integration.amqp.JMSNonDestructiveTest.testNonDestructiveLVQWithConsumerFirstAMQP[persistenceEnabled=true, scanPeriod=-1]
org.apache.activemq.artemis.tests.integration.amqp.JMSNonDestructiveTest.testNonDestructiveLVQWithConsumerFirstCore[persistenceEnabled=true, scanPeriod=-1]
org.apache.activemq.artemis.tests.integration.amqp.JMSNonDestructiveTest.testNonDestructiveCoreProducerAMQPConsumer[persistenceEnabled=true, scanPeriod=-1]
org.apache.activemq.artemis.tests.integration.amqp.JMSNonDestructiveTest.testNonDestructiveAMQPProducerCoreConsumer[persistenceEnabled=true, scanPeriod=-1]
org.apache.activemq.artemis.tests.integration.cli.RecoverTest.org.apache.activemq.artemis.tests.integration.cli.RecoverTest
org.apache.activemq.artemis.tests.integration.cluster.failover.ReplicatedMultipleServerFailoverExtraBackupsTest.testStartLiveFirst
org.apache.activemq.artemis.tests.integration.jms.client.LVQTest.testLVQandNonDestructive

@jbertram
Copy link
Contributor Author

I'll go back to the drawing board and rework this.

@jbertram jbertram closed this May 11, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants