# ASCII Art from an Image (Part 2)

## Task Queue
Learn more about task queues here: [Celery Intro](http://docs.celeryproject.org/en/latest/getting-started/introduction.html)

* Celery is a task queue -> distributes work across threads or machines
* Celery communicates via messages, usually using a broker

## Here's a "simple" diagram of how Celery works:
![](./celery_architecture.jpg)
---

### In this case
- This Notebook is a **Client**. 
- RabbitMQ is the **Message transport/broker**
- We'll set up the **Workers** below
- RabbitMQ will store the **backend results** for now, but we'll extend this to DynamoDB

## Message Broker
Learn more about tasks queues with RabbitMQ: [First Steps with Celery](http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html#rabbitmq)

Then follow these docs to set up a user: [Using RabbitMQ with Celery](http://docs.celeryproject.org/en/latest/getting-started/brokers/rabbitmq.html)

#### I save my RabbitMQ config in the environment I'm working on.
This prevents accidentally pushing some private keys to the cloud and allows me to have separate settings for each project.

* export CELERY_BROKER_USERNAME='username'
* export CELERY_BROKER_PASSWORD='password'
* export CELERY_BROKER_SERVER='localhost'
* export CELERY_BROKER_PORT='5672'
* export CELERY_BROKER_VHOST='host'

---

## Set up Celery worker
Create a celery worker called celery.py in the project directory.

In [None]:
import os

from celery import Celery


CELERY_MAIN = 'tasks'
BROKER_DETAILS = dict(user=os.environ.get('CELERY_BROKER_USERNAME'),
                          password=os.environ.get('CELERY_BROKER_PASSWORD'),
                          server=os.environ.get('CELERY_BROKER_SERVER'),
                          port=os.environ.get('CELERY_BROKER_PORT'),
                          vhost=os.environ.get('CELERY_BROKER_VHOST'))
CELERY_BROKER_URL = 'amqp://{user}:{password}@{server}:{port}/{vhost}'.format(**BROKER_DETAILS)

CELERY_CONFIG = dict(
    CELERY_RESULT_BACKEND = 'amqp://{user}:{password}@{server}:{port}/{vhost}'.format(**BROKER_DETAILS),
    CELERY_TASK_SERIALIZER = 'json',
    CELERY_RESULT_SERIALIZER = 'json',
    CELERY_ACCEPT_CONTENT = ['json'],
    CELERY_TRACK_STARTED = True,
    CELERY_TIMEZONE = 'US/Pacific'
    )

celery = Celery(CELERY_MAIN, 
                broker=CELERY_BROKER_URL, 
                include=['proj.tasks.ascii'])
celery.conf.update(CELERY_CONFIG)

Note the line that includes the tasks in ```proj.tasks.ascii```

**To run the worker call:** ```celery -A proj worker -l info```

---

## Set up the Celery task
Create a task in the project directory.

See ```proj/tasks/ascii.py```

In [None]:
from proj.tasks.ascii import image_to_ascii_task

## Import helper functions
Just ```display_progress_bar_until_completed``` is new

In [None]:
from proj.helpers import print_ascii_html, display_progress_bar_until_completed

### Let's run the task synchronously

In [None]:
cat_image_filename = 'samples/images/cat01.jpg'
column_count = 200
task_result = image_to_ascii_task(cat_image_filename, column_count)
ascii_image = task_result.get('result')
ascii_image_text = '\n'.join(line for line in ascii_image)
print_ascii_html(ascii_image_text, font_size_pct=30, line_height_pct=100)

### Now let's try an asynchronous call

* use ```apply_sync``` and ```args``` to queue the task asynchronously
* get the results object using AsyncResult(task id)

#### First, 400 columns

In [None]:
image_filename = 'samples/images/cat01.jpg'
column_count = 400

task = image_to_ascii_task.apply_async(args=[image_filename, column_count])
task_400 = image_to_ascii_task.AsyncResult(task.id)

In [None]:
# Continue polling the task every 0.5s until the task is complete
display_progress_bar_until_completed(task_400)

# Then display the results
ascii_image = task_400.info.get('result')
ascii_image_text = '\n'.join(line for line in ascii_image)
print_ascii_html(ascii_image_text, font_size_pct=30, line_height_pct=100)

#### Then, 800 columns

In [None]:
image_filename = 'samples/images/cat01.jpg'
column_count = 800

task = image_to_ascii_task.apply_async(args=[image_filename, column_count])
task_800 = image_to_ascii_task.AsyncResult(task.id)

In [None]:
# Continue polling the task every 0.5s until the task is complete
display_progress_bar_until_completed(task_800)

# Then display the results
ascii_image = task_800.info.get('result')
ascii_image_text = '\n'.join(line for line in ascii_image)
print_ascii_html(ascii_image_text, font_size_pct=15, line_height_pct=100)

### Now let's queue a bunch of them

In [None]:
%%time
image_filename = 'samples/images/cat01.jpg'

task = image_to_ascii_task.apply_async(args=[image_filename, 100])
task_100 = image_to_ascii_task.AsyncResult(task.id)

task = image_to_ascii_task.apply_async(args=[image_filename, 200])
task_200 = image_to_ascii_task.AsyncResult(task.id)

task = image_to_ascii_task.apply_async(args=[image_filename, 400])
task_400 = image_to_ascii_task.AsyncResult(task.id)

task = image_to_ascii_task.apply_async(args=[image_filename, 800])
task_800 = image_to_ascii_task.AsyncResult(task.id)

task = image_to_ascii_task.apply_async(args=[image_filename, 1600])
task_1600 = image_to_ascii_task.AsyncResult(task.id)

display_progress_bar_until_completed(task_100)
display_progress_bar_until_completed(task_200)
display_progress_bar_until_completed(task_400)
display_progress_bar_until_completed(task_800)
display_progress_bar_until_completed(task_1600)

## Two immediate benefits here:
* First, you can **queue the several tasks** and **continue working** on the same notebook
* Second, since I'm running 4 celery workers, the **total run time is much shorter**: 35 versus 45-50s

---

## Bonus Rounds:

- [>> Flower](image_conversion_bonus_flower.ipynb)
- [>> DynamoDB](image_conversion_bonus_dynamodb.ipynb)