<img src="img/celery-logo.png">
https://docs.celeryproject.org/

# Celery

Celery is a library for building distributed message queue applications. It allows you to define handler functions for specific messages in queues and provides a convenient interface for creating such messages and saving the results.

Celery does not know how to work on its own - it needs the queuing system itself to work. For these purposes, we will use Redis, since Redis can work both as a distributed message queue (message broker) and as a database for storing results. In the current environment, Redis is already running and processing requests on port 6379.

There are also two scripts for the background launch of our service components. `launch-server.sh` works the same as in the previous lab and launches the Flask application. `start-worker.sh` works in a similar way. This script assumes that there is a `server.py` script and a `celery_app` variable has been created in which the main celery application is written. If the conditions are met, the command starts a worker in the background to monitor the queue.

IMPORTANT - the server writes its logs to `log.txt`, and the celery handler to `log-worker.txt`. If something breaks, then the first place to look is in these files. Restarting the server or handler can also solve the problem - sometimes they simply do not have time to restart and therefore the request to them falls with an error.

Let's try to create at first just a set of tasks and start them in manual mode.

In [1]:
%%writefile server.py
from celery import Celery
import time

celery_app = Celery('server', backend='redis://localhost', broker='redis://localhost')  # both database and broker are redis

@celery_app.task
def add(x, y):
    time.sleep(7.0)
    return x + y

Overwriting server.py


Now we start the worker with our handler

In [2]:
! start-worker.sh

Success!


Now that we have a running worker. We can import tasks and send several tasks to the queue.

In [3]:
from server import add

In [4]:
r = add.delay(5, 10)

In [5]:
r

<AsyncResult: 7dd50c73-1505-4ad5-837b-d674e75265e5>

In [6]:
r.id

'7dd50c73-1505-4ad5-837b-d674e75265e5'

In [7]:
r.ready()

False

In [8]:
r.result

In [9]:
r.ready()

False

In [10]:
r.result

Excellent! After a while, our task was counted and we got its result.

In order to get the result, it is enough to know only its identifier. Let's try to get the result knowing only the identifier.

In [11]:
from celery.result import AsyncResult
from server import celery_app

In [12]:
task_id = add.delay(12, 13).id
print(task_id)

72e9bde7-f276-4ab9-8ccb-0e0aeb45af65


In [13]:
r = AsyncResult(task_id, app=celery_app)
print(r.ready())

False


In [14]:
r.result

# Connect the web server¶
This knowledge should be enough for us to start the service together with the web server. The web server, receiving a request from the user, will create a new task and send it to the queue, returning the task ID to the user.

Then the user can come back and ask the web server for the status of the task.

In [15]:
%%writefile server.py
from celery import Celery
from celery.result import AsyncResult
import time
from flask import Flask, request
import json


celery_app = Celery('server', backend='redis://localhost', broker='redis://localhost')
app = Flask(__name__)


@celery_app.task
def add(numbers):
    time.sleep(7.0)
    result = 0
    for n in numbers:
        result += n
    return result


@app.route('/sum', methods=["GET", "POST"])
def sum_handler():
    if request.method == 'POST':
        data = request.get_json(force=True)
        numbers = data['numbers']
        
        task = add.delay(numbers) 
            
        response = {
            "task_id": task.id
        }
        return json.dumps(response)
    
    
@app.route('/sum/<task_id>')
def sum_check_handler(task_id):
    task = AsyncResult(task_id, app=celery_app)
    if task.ready():
        response = {
            "status": "DONE",
            "result": task.result
        }
    else:
        response = {
            "status": "IN_PROGRESS"
        }
    return json.dumps(response)


if __name__ == "__main__":
    app.run('0.0.0.0', 8000)

Overwriting server.py


In [16]:
! start-worker.sh

Success!


In [17]:
! launch-server.sh server.py

Success!


Let's write a function for the client who expects the result

# Iris Classification models
Loading more complex models is practically no different from what we did in the previous lab. Let's try to reproduce exactly the same service, but now using a message queue.

In [17]:
%%writefile freqmeter.py

import re

class FrequencyMeter:
    def __init__(self):
        self._counter = {}
        self._word_pattern = re.compile(r"[a-z]+")
        
    def fit(self, data):
        for match in self._word_pattern.finditer(data.lower()):
            word = match.group(0)
            if word in self._counter:
                self._counter[word] += 1
            else:
                self._counter[word] = 1
    
    def compute(self, word):
        if word not in self._counter:
            return 0
        return self._counter[word]

Overwriting freqmeter.py


In [21]:
from freqmeter import FrequencyMeter
import pickle

fmeter = FrequencyMeter()

with open('iris.csv') as f:
    data = f.read()
    
fmeter.fit(data)

raw_data = pickle.dumps(fmeter)

with open('fmeter-model.pickle', 'wb') as f:
    f.write(raw_data)

In [22]:
%%writefile server.py
from celery import Celery
from celery.result import AsyncResult
import time
from flask import Flask, request
import json
import pickle
import re


celery_app = Celery('server', backend='redis://localhost', broker='redis://localhost')
app = Flask(__name__)


def load_model(pickle_path):
    with open(pickle_path, 'rb') as f:
        raw_data = f.read()
        model = pickle.loads(raw_data)
    return model

model = load_model('fmeter-model.pickle')


@celery_app.task
def freq(sentence):
    result = {}
    word_pattern = re.compile(r"[a-z]+")
    for match in word_pattern.finditer(sentence.lower()):
        word = match.group(0)
        result[word] = model.compute(word)
    return result


@app.route('/frequency', methods=["GET", "POST"])
def frequency_handler():
    if request.method == 'POST':
        data = request.get_json(force=True)
        sentence = data['sentence']
        
        task = freq.delay(sentence) 
            
        response = {
            "task_id": task.id
        }
        return json.dumps(response)
    
    
@app.route('/frequency/<task_id>')
def frequency_check_handler(task_id):
    task = AsyncResult(task_id, app=celery_app)
    if task.ready():
        response = {
            "status": "DONE",
            "result": task.result
        }
    else:
        response = {
            "status": "IN_PROGRESS"
        }
    return json.dumps(response)


if __name__ == "__main__":
    app.run('0.0.0.0', 8000)

Overwriting server.py


In [23]:
! start-worker.sh

Success!


In [24]:
! launch-server.sh server.py

Success!


In [25]:
import time

def calc(sentence):
    response = requests.post("http://localhost:8000/frequency", json={'sentence': sentence})
    task_id = response.json()['task_id']
    print("Task {}".format(task_id))
    status = "IN_PROGRESS"
    while status != "DONE":
        time.sleep(2.0)
        r = requests.get('http://localhost:8000/frequency/{}'.format(task_id))
        status = r.json()['status']
        print('Status - {}'.format(status))
    return r.json()['result']

## Test the Iris Data wirh variety of flower

In [28]:
calc("Setosa")

Task 03c90830-40f0-42e7-8882-bfe7a4ceeeb5
Status - DONE


{'setosa': 50}

In [31]:
calc("Versicolor")

Task 2e993655-0d95-46c3-be7f-66904fd98409
Status - DONE


{'versicolor': 50}