queue duplicates #218

Closed
jdamick opened this Issue Feb 22, 2013 · 13 comments

2 participants

@jdamick

Using astyanax:1.56.25 i'm seeing duplicate callbacks to my subscriber on the MessageQueueDispatcher if I set the consumer count > 1 & threads > 1. If I keep the threads the same but limit the consumer count to 1 the issue goes away. The job that i'm running in the callback may take a several seconds and i've configured the lockTtl & lockTimeout to be fairly high. Is this a misconfiguration?

@jdamick

seems like it might be related to the "reprocessing" as i see this counter is > 0 when the duplicates occur.

@jdamick

here is a sample of what i'm doing:

            MessageQueueDispatcher subscriber = new MessageQueueDispatcher.Builder()
                .withBatchSize(1)
                .withCallback(new Function() {
                    @Override
                    public synchronized Boolean apply(MessageContext message) {
                        // Return true to 'ack' the message
                        // Return false to not 'ack' which will result in the message timing out 
                        // Throw any exception to put the message into a poison queue
                        boolean ack = false;
                        try {
                            String body = String.valueOf(message.getMessage().getParameters().get(MSG_BODY));
                            LOGGER.debug("message body: {}", body);
                            
                            MSG obj = ObjectUtils.fromJSON(body, clazz);
                            if (callback.apply(obj)) { ////// This can take multiple seconds..
                                ack = true;
                            } else {
                                 LOGGER.error("Error in callback to subscriber");
                            }
                        } catch (Throwable t) {
                            LOGGER.error("Caught exception on handleDelivery {}", t);
                        }
                        
                        return ack;
                    }
                })
                .withMessageQueue(this.queue)
                .withConsumerCount( 1 /* or handlerCount */)
                .withThreadCount(handlerCount) 
                .build();
            
            subscriber.start();

@elandau
Netflix, Inc. member

Can you also provide the producer code. I'm specifically interested in seeing how the message is created. I suspect that the message timeout is too low. The default is 5 seconds, which I need to change to something much higher.

@jdamick
MessageProducer producer = queue.createProducer();
        try {
            String body = ObjectUtils.toJson(message);
            Message queueMsg = new Message()
                    .addParameter(MSG_BODY, body)
                    .setPriority(priority);
            if (!Strings.isNullOrEmpty(key)) {
                queueMsg.setKey(key);
            }
            String messageId = producer.sendMessage(queueMsg);

            LOGGER.debug("produced message: {} body: {}", messageId, body);
        } catch (MessageQueueException e) {
            LOGGER.error("Error in produce {}", e);
            throw new RuntimeException(e);
        }
@elandau
Netflix, Inc. member

Try setting newMessage().setTimeout(2, TimeUnits.MINUTE)

@jdamick

will try, but in general is 1 consumer enough with multiple "threads" ?

@elandau
Netflix, Inc. member

It depends on your use case. The difference between consumer and thread count is that there is actually an implied thread per consumer and setThreadCount() refers to the processing threads. You would want more consumers if you have a lot of shards and want to parallelize reading from cassandra.

@jdamick

thanks, adjusting the message timeout alleviated the duplicate issue. My initial assumption was that the timeout didn't apply unless the ack was false or the process went away. Could this timeout (or a different one) be more of a watchdog where while the process was still working on the message the timeout would be reset? That way the timeout could be somewhat low yet have a more variable runtime?

@elandau
Netflix, Inc. member

This may be tricky (and very inefficient) because of how timeouts are handled in the queue data model. Each timeout is a separate message queue item so implementing a heartbeat will be implemented as a column delete + column insert for the next timeout.

@elandau elandau closed this Mar 27, 2013
@jdamick

Do you have a suggestion for doing variable running time jobs? My concern is that no matter how high you guess it may not be enough, or if it's so high than when it does fail it takes a very long time to retry..

@elandau
Netflix, Inc. member

How many of these jobs do you expect to have? My thoughts are to have a specialized timeout queue for a heartbeat mechanism. That way timeouts are handled by the the same queue algorithm but the message size is very small because it won't contain the actual message body.

@jdamick

They are created on-demand, most complete quickly but occasionally they take a while. I could try the special queue that would timeout and check if the job was done, that's a good idea - i'll dig around the code to see how i could pull that off. Is this something you'd want as part of the library? i.e. pull request?

@elandau
Netflix, Inc. member

If you expect events to be handled quickly and therefore the 'heartbeat' mechanism doesn't get called often it might make sense to just add it to the core queue API instead of complicating it with additional queues. I'd be glad to accept a pull request for this feature.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment