# TODO:
* Add some pictures!
* How to install wimq?

# WIMQ

### Szymon 

- Date: Thrursday, 9 July 2015, day 4th.
- Duration: 1h

### Agenda:
- 11:00 - 11:20 - Intro, our use cases and design.
- 11:20 - 11:50 - Your own worker.
- 11:50 - 12:00 - Conclusion.


### Objectives:
- understand wimq's rationale, structure and common usage
- know how to change worker's behaviour via parameters
- get hands-on experience creating your own worker

###URLs:
- http://doc.devcompany.com/wi-mq/master/index.html - offical documentation
- https://gitlab.devcompany.com/libraries/wi-mq - wimq source


# Intro

* What is wimq?
    * Messaging library for company apps.

* What is a producer and consumer?
    * Producer creates messages and publishes them to a queue. 
    * Consumer consumes the message from the queue it is subscribed to and performs the task.
    * Two main types: producer/consumer or consumer only.

* Use cases
    * Background tasks -> sending emails, tasks that don't get executed immediately.
    * Concurrency -> multiple threads and processes consuming from a queue.
    * Pipes -> connection between two projects that don't share codebase.

* Task structure
    * Queue name (= worker name) -> producer knows where to publish
    * Data (kwargs)
    * TTL

* Locking
    * MD5 is constructed from task to lock memcached
    * Prevents duplicates
    * Less db access
    * Lock -> Publish -> Queue -> Consume -> Unlock

# WI Example:

* eBay requests translation of `OriginalItem`
* `OI` gets translated
* Translator changes item state to 'T'
* Producer periodically fetches all items with STATE = 'T', publishes their ids to a queue
* Items from the queue are consumed by multiple consumers that list the item and set its state to 'L'


In [None]:
#Sample worker

# -*- coding: utf-8 -*-
import datetime
import logging

from portal.lib import mq
from portal.models.user import User
from portal.models.ebay.account import EbayAccount
from portal.services import TokenEnabler


log = logging.getLogger(__name__)


class AccountsReestablisherWorker(mq.BaseDBWorker):
    run_each = datetime.timedelta(hours=1)
    class_to_consume = EbayAccount
    GROUP = mq.BaseDBWorker.WORKER_GROUP_LIGHT

    TOKEN_ENABLER = TokenEnabler()

    def _get_query(self, session, o, id=None):
        filters = [
            User.ebay_state != User.ES_DELETED,
            EbayAccount.token_enabled == False,
            EbayAccount.enabled == True,
        ]

        if id:
            filters.append(EbayAccount.id == id)
        q = session().query(o).join(User).filter(*filters).distinct()
        return q

    def consume(self, **kwargs):
        account = kwargs.pop('_object_to_consume')
        try:
            account.ebay._get_user(False)
            self.TOKEN_ENABLER.enable(account)
            log.debug("Account %s was reestablished", account.id)
        except:
            pass

mq.app.register_worker(AccountsReestablisherWorker)


# More:

* How to customize worker beahaviour?

    * Queue definition:
        * Name
        * Durable # keep messages after rabbit server restart
        * DeadLetter
    * Lock config:
        * TTL
        * Lock name
    * Producer:
        * run_each
        * Working hours
        * Callable
    * Consumer:
        * Threads
        * Callable
    * Serializer:
        * Serialize
        * Deserialize

* Adding tasks to queue

    * Produce data from db, publish afterwards
    * Publish straight to queue (called directly or after Produce)

* Handling failures
        
    * Producer -> Dead Letter Queue -> Retry -> Archive(Finalize)
    * Consumer -> Fallback (for example save to db for later) -> Republish


In [None]:
#Install and setup

$cd summercamp-notebooks/wimq_extra_code

#In virtualenv:
$pip install ~/workspace/wimq-1.4.0.tar.gz
$python setup.py install

#Rabbit setup?
$. ~/workspace/wi_set_local_rabbit NAME

#main.py
wimq.user = NAME
wimq.password = NAME
wimq.vhost = NAME


In [None]:
# Exercise 1: create a super basic worker

import wimq

class WorkerX(wimq.BaseWorker):

    # need self.make_response(number=x)
    def produce(self):
        pass
    
    def consume(self, **kwargs):
        pass


In [None]:
#Running workers locally:

$wimq sample_project list_workers
$wimq sample_project run_producers
$wimq sample_project run_consumers

#Monitor worker behaviour
http://localhost:15672/

In [None]:
# Excercise 2: Complicate things

run_each = timedelta(seconds=x)
threads = 2
USE_DEAD_LETTER = True

In [None]:
# Excercise 3: What does this worker do?

import datetime
import logging

from portal.lib import mq
from portal.lib.locking.semaphore import AbortableSemaphore
from portal.models import Email
from portal.utils.email_smtp_sender import (
    PortalEmailSender,
    smtp_server_factory,
)


log = logging.getLogger(__name__)


class PortalEmailSenderWorker(mq.BaseDBWorker):
    run_each = datetime.timedelta(minutes=5)
    class_to_consume = Email
    GROUP = mq.BaseDBWorker.WORKER_GROUP_LIGHT
    threads = 1
    sender = PortalEmailSender(smtp_server_factory)

    def __init__(self, app):
        super(PortalEmailSenderWorker, self).__init__(app)
        config_provider = app.get_config_provider(self.__class__)
        self.lock = AbortableSemaphore(
            'portal_email_sender',
            hosts=[config_provider['locker_host']],
        )

    def _get_query(self, session, entity, id=None):
        filters = [
            self.class_to_consume.sent == False,
            self.class_to_consume.tries <= Email.MAX_TRIES_PER_EMAIL,
            self.class_to_consume.ets_id.is_(None),
            self.class_to_consume.dont_send == False,
        ]
        if id:
            filters.append(self.class_to_consume.id == id)
        return session().query(entity).filter(*filters)

    def consume(self, **kwargs):
        email = kwargs.pop('_object_to_consume')
        log.debug('Sending mail %s' % email.id)
        with self.lock:
            self.sender.log_in()
            email.try_to_send_using_sender(self.sender)
            self.sender.quit()


mq.app.register_worker(PortalEmailSenderWorker)


# todo

# Conclusion