Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MessageGroupId should be included as a variable in kale.publisher #49

Closed
kyleaquino opened this issue Jan 21, 2020 · 5 comments
Closed

Comments

@kyleaquino
Copy link

When trying to connect to a AWS SQS queue with an enabled content-based deduplication it returns error stated below:

ndkale-master/example/src/ndkale/kale/publisher.py", line 60:
botocore.exceptions.ClientError: An error occurred (MissingParameter) when calling the SendMessage operation: The request must contain the parameter MessageGroupId.

This issue can be easily fix by adding the MessageGroupId paramter on the sqs_queue.send_message() function on the publisher.py

@kyleaquino
Copy link
Author

Here's my fix for this issue. Kindly respond. Thank you!

# Added msg_group_id for customization of id
def publish(self, task_class, task_id, payload,
                current_retry_num=None, delay_sec=None, msg_group_id='default'): 
        """Publish the given task type to the queue with the provided payload.

        :param obj task_class: class of the task that we are publishing.
        :param str task_id: unique identifying string for this task.
        :param dict payload: dictionary for the task payload.
        :param int current_retry_num: current task retry count. If 0, this is
            the first attempt to run the task.
        :param int delay_sec: time (in seconds) that a task should stay
                in the queue before being released to consumers.
        :param str msg_group_id: MessageGroupId Parameter for Content-based 
                Deduplication enabled queues
        :raises: TaskTooChubbyException: This task is outrageously chubby.
                The publisher of the task should handle this exception and
                determine how to proceed.def publish(self, task_class, task_id, payload,
        """

        if delay_sec is not None and delay_sec > settings.SQS_MAX_TASK_DELAY_SEC:
            raise exceptions.InvalidTaskDelayException(
                'Invalid task delay_sec (%d > %d).' % (
                    delay_sec, settings.SQS_MAX_TASK_DELAY_SEC))

        queue_class = utils.class_import_from_path(settings.QUEUE_CLASS)
        q_info = queue_info.QueueInfo(settings.QUEUE_CONFIG, self, queue_class)
        queue_obj = q_info.get_queue(task_class.queue)

        if task_class.time_limit >= queue_obj.visibility_timeout_sec:
            raise exceptions.InvalidTimeLimitTaskException(
                'Invalid task time limit: %d >= %d from %s' % (
                    task_class.time_limit, queue_obj.visibility_timeout_sec,
                    settings.QUEUE_CONFIG))

        sqs_queue = self._get_or_create_queue(queue_obj.name)

        kale_msg = message.KaleMessage(
            task_class=task_class,
            task_id=task_id,
            payload=payload,
            current_retry_num=current_retry_num)

        # Updated Code Block
        #######################################################
        params = { "MessageBody": kale_msg.encode() }
        print(sqs_queue.attributes)

        if bool(sqs_queue.attributes['ContentBasedDeduplication']):
            params['MessageGroupId']=msg_group_id
        
        if not bool(sqs_queue.attributes['FifoQueue']):
            params['DelaySeconds']=delay_sec or 1

        sqs_queue.send_message(**params)
         #######################################################

        logger.debug('Published task. Task id: %s; Task name: %s' % (
            task_id, '%s.%s' % (task_class.__module__, task_class.__name__)))

@kyleaquino
Copy link
Author

Hello @stlava @timnd @davidkern what do you think of this fix? is this okay?

@kyleaquino
Copy link
Author

Made a PR: #51

@sabw8217
Copy link
Contributor

Is this still an issue? your fix looked basically OK to me, I think using .get() on the sqs_queue.attributes dict instead of indexing into it with [] to handle the possibility that attributes like FifoQueue aren't set (not sure if elasticmq sets that, for example) would be a good idea but otherwise this seems reasonable.

@kyleaquino
Copy link
Author

@sabw8217 I closed this for now since i'm still testing this library for our application. I might encounter some other problems in future that is specific in our use case. Thank you for the response btw.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants