# Task Queues

## Why do we need task queues?

When handling web requests, some operations takes longer time than we want to spend. We can defer these operations by putting information about our task to be performed inside a queue, which we process later. This method of deferring work to some task processer is called task queue.

## Types of Queues

- first-in, first-out (FIFO)
- last-in, first-out (LIFO)
- priority queues

In [1]:
def send_sold_email_via_queue(conn, seller, item, price, buyer):
    data = {
        'seller_id': seller,
        'item_id': item,
        'price': price,
        'buyer_id': buyer,
        'time': time.time()
    }
    conn.rpush('queue:email', json.dumps(data))

In [2]:
def process_sold_email(conn):
    while not QUIT:
        packed = conn.blpop(['queue:email'], 30)
        if not packed:
            continue
        
        to_send = json.loads(packed[1])
        try:
            fetch_data_and_send_sold_email(to_send)
        except EmailSendError as err:
            log_error('Failed to send sold email', err, to_send)
        else:
            log_success('Sent sold email', to_send)

## Generic task queue

We can implement a more generic task queue, rather than hardcoding the specific implementation.

In [1]:
def worker_watch_queue(conn, queue, callbacks):
    while not QUIT:
        packed = conn.blpop([queue], 30)
        if not packed:
            continue
        
        name, args = json.loads(packed[1])
        if name not in callbacks:
            log_error(f'Unknown callback {name}')
            continue
        callbacks[name](*args)

In [4]:
def worker_watch_queues(conn, queues, callbacks):
    while not QUIT:
        packed = conn.blpop(queues, 30)
        if not packed:
            continue
        
        name, args = json.loads(packed[1])
        if name not in callbacks:
            log_error(f'Unknown callback {name}')
            continue
        callbacks[name](*args)

## Delayed tasks

### Implementation

- we can include an execution time as part of queue items, and if a worker processes an item with an execution time later than now, it can wait for a brief period and then re-enqueue the item
- we can use a zset, since we sort the item by the score (timestamp)


In [3]:
def execute_later(conn, queue, name, args, delay=0):
    identifier = str(uuid.uuid4())
    item = json.dumps([identifier, queue, name, args])
    if delay > 0:
        conn.zadd('delayed:', item, time.time() + delay)
    else:
        conn.rpush(f'queue:{queue}', item)
    return identifier

In [7]:
def poll_queue(conn):
    while not QUIT:
        # Get the first item in the queue.
        item = conn.zrange('delayed:', 0, 0, withscores=True)
        
        # No item or the item is still to be executed in the future.
        if not item or item[0][1] > time.time():
            time.sleep(0.01)
            continue
        
        # Unpack the item so that we know where it should go.
        item = item[0][0]
        identifier, queue, function, args = json.loads(item)
        
        # Get the lock for the item.
        locked = acquire_lock(conn, identifier)
        if not locked:
            # If we couldn't get the lock, skip and retry again.
            continue
        
        # More the item to the proper list queue.
        if conn.zrem('delayed:', item):
            conn.rpush(f'queue:{queue}', item)
            
        # Release the lock.
        release_lock(conn, identifier, locked)