# Sprytny sposób na prokrastynację
## Praca z asynchroniczną kolejką zadań Celery
### Jakub Szponder
#### Python Level Up, 19.04.2018

![Logo kursu Python Level Up](https://raw.githubusercontent.com/daftcode/python_levelup_2018/master/logo.png)

![Plan zajęć](https://raw.githubusercontent.com/daftcode/python_levelup_2018/master/plan_zajec.png)

# http://praktyki.daftcode.pl/
Czekamy do 20 kwietnia!

# Aplikacja webowa
![Schemat prostej aplikacji webowej](./web_application_flow.png)

W aplikacjach webowych zależy nam na skracaniu czasu obsługi requestu, dzięki temu:
- użytkownicy nas bardziej lubią
- nie blokujemy workerów aplikacyjnych, przez co jesteśmy w stanie obsługiwać więcej requestów

# Aplikacja webowa wykorzystująca kolejkę tasków
![Schemat aplikacji webowej z wykorzystaniem Celery](./web_application_flow_celery.png)

Serwer zleca czasochłonne zadania (__Taski__) do wykonania __Workerowi__, czyli procesowi, którego zadaniem jest wykonywanie tego typu zadań.
Do komunikacji pomiędzy __Aplikacją__ a __Workerem__ używamy __Brokera__

Co może być dobrym __Taskiem__?
- zadanie, które możemy odłożyć na później (nie musi być zrobione w trakcie zwracania Responsu)
- czasochłonne zadanie, np. wysyłanie requestu na zewnętrzny serwer

# Przykłady z życia
- wysyłka maila po stworzeniu konta użytkownikowi
- generowanie raportu w PDFie
- import/eksport dużych plików
- serwis do skalowania obrazów

# Trochę większy przykład z życia - Twitter

![Przykład Twittera](./twitter_use_case.png)

# Celery
- najpopularniejsza kolejka tasków w Pythonie
- minimalny przykład jest bardzo krótki i łatwy do stworzenia
- pod spodem dzieje się trochę magii

# Architektura
- __Klient__
- __Broker__
- __Worker__

## Klient
Aplikacja, która korzysta wykorzystuje asynchroniczne zadania

## Broker
- Kolejka na której odkładane są zadania do wykonania
- Celery umożliwia wybranie jednego z wielu typów brokerów
- Narzędzia, które mogą być brokerem:
  * __Redis__
  * RabbitMQ
  * zwykła relacyjna baza danych (SQLite / PostgreSQL)    
- argumenty przekazywane do taska są serializowane (domyślnie do JSONa) i zapisywane przy pomocy brokera razem z taskiem 

## Worker
Proces, który pobiera zadania z kolejki, a następnie je wykonuje

# Redis
- struktura danych, która umożliwia trzymanie danych w pamięci operacyjnej
- używane do cachowania, jako broker do kolejek zadań, a także jako baza danych
- trzyma dane w postaci klucz-wartość
- jest interaktywny tutorial online: http://try.redis.io/
- a tutaj informacje o instalowaniu: https://redis.io/topics/quickstart

## Instalacja Celery
```bash
pip install celery
pip install -U "celery[redis]"
```

# Pierwszy kod!
1. Tworzymy aplikację Celery i definiujemy taska
2. Uruchamiamy Redisa
3. Startujemy Workera
4. Kolejkujemy taska

## Definicja aplikacji Celery
```python
from celery import Celery

app = Celery('tasks', broker='redis://localhost')

@app.task
def add(x, y):
    return x + y
```

## Starowanie Workera
```bash
celery -A tasks worker --loglevel=info
```

## Uruchamianie funkcji taska
- w dalszym ciągu można normalnie wywoływać funkcję taska
```python
add(3, 5)
```
- a tak się wywołuje taska z wykorzystaniem Celery
```python
add.delay(3, 5)
```

Wynik wywołania taska za pomocą delay to obiekt klasy __AsyncResult__
http://docs.celeryproject.org/en/latest/reference/celery.result.html#celery.result.AsyncResult

- __delay__ to "skrót", który pod spodem wywołuje bardziej potężną metodę __apply_async__
- __apply_async__ pozwala nam np. określić za ile sekund chcemy uruchomić taska (_countdown_)
```python
add.apply_async(args=(3, 5), countdown=10)
```
- więcej: http://docs.celeryproject.org/en/latest/userguide/calling.html#basics

# Używanie Celery z aplikacji we Flasku
```python
from flask import Flask
from celery import Celery

app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost'

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)

@celery.task()
def add(x, y):
    return x + y
```

# Przykład 1
Stworzyć endpointy:
- `/sync_request`, który wykona synchronicznie GET na adres `http://httpbin.org/delay/5`
- `/async_request`, który wykona request pod ten sam adres, ale z wykorzystaniem __Celery__

In [None]:
import requests

@celery.task()
def make_request():
    requests.get('http://httpbin.org/delay/5')


@app.route('/sync_request')
def make_sync_request():
    start = time.time()
    make_request()  # bez delay
    end = time.time()
    return str(end - start)

    
@app.route('/async_request')
def make_async_request():
    start = time.time()
    make_request.delay()  # z delay
    end = time.time()
    return str(end - start)

## Uruchamianie

```bash
celery -A app.celery worker --loglevel=info
```

# Przykład 2
Obsłużyć endpoint `/users`
- metoda `GET` - wyświetla formularz tworzenia użytkownika z jednym polem - `email`, po submicie idzie `POST` na `/users/`
- metoda `POST` - wysyła powitalnego maila z tematem `Python Level Up`, o treści `Welcome to Python Level Up Sample Website!` na podany adres

In [None]:
import smtplib

EMAIL_SENDER = os.environ.get('MAIL_USERNAME')
EMAIL_SENDER_PASSWD = os.environ.get('MAIL_PASSWORD')

@celery.task()
def send_invitation_email(email):
    server = smtplib.SMTP_SSL('smtp.gmail.com', 465)
    server.login(EMAIL_SENDER, EMAIL_SENDER_PASSWD)
    subject = 'Python Level Up'
    text = 'Welcome to Python Level Up Sample Website!'
    body = '\r\n'.join(
        [
            'To: %s' % email,
            'From: %s' % EMAIL_SENDER,
            'Subject: %s' % subject,
            '', text
        ]
    )
    server.sendmail(EMAIL_SENDER, [email], body)

In [None]:
@app.route('/users', methods=['GET', 'POST'])
def users():
    if request.method == 'POST':
        email = request.form['email']
        send_invitation_email.delay(email)
        return 'Konto założone'
    else:
        return render_template('add_user.html')

# Deploy na heroku
- https://devcenter.heroku.com/articles/celery-heroku

### Procfile
```
web: gunicorn app:app
worker: celery worker -A app.celery --loglevel=info
```

### Instalacja addona do Redisa
```
heroku addons:create heroku-redis -a nazwa_aplikacji
```

### requirements.txt

```
amqp==2.2.2
billiard==3.5.0.3
blinker==1.4
celery==4.1.0
certifi==2018.4.16
chardet==3.0.4
click==6.7
Flask==0.12.2
gunicorn==19.7.1
idna==2.6
itsdangerous==0.24
Jinja2==2.10
kombu==4.1.0
MarkupSafe==1.0
pytz==2018.4
redis==2.10.6
requests==2.18.4
urllib3==1.22
vine==1.1.4
Werkzeug==0.14.1
```

### Nie działa?
Po deployu może być wymagane ręczne włączenie dyna `worker`

# Powtarzanie tasków
- zdarzają się czasami sytuacje, w których nie udaje się wykonać taska i należy przełożyć go na później
- służy do tego metoda __retry__ dostępna na obiekcie tasku

## Jak dobrać się do obiektu taska?
- należy "zbindować" obiekt taska (`bind=True`), dzięki czemu jako pierwszy argument do funkcji zostanie przekazany obiekt taska
```python
@task(bind=True)
def add(self, x, y):
    logger.info(self.request.id)
```

## task.retry()

```python
import random

@app.task(bind=True)
def fails_sometimes(self):
    if random.random < 0.5:
        self.retry()
    return 'success'
```

- `Task.retry` pod spodem rzuca specjalny błąd typu `Retry`
- możliwe jest nadpisanie domyślnych wartości odpowiedzialnych za powtarzanie tasków:
```
@app.task(bind=True, default_retry_delay=30, max_retries=5) 
```

# Zapisywanie wyników wywołań poszczególnych tasków
- należy zdefiniować w instancji __Celery__ `backend`, czyli określić w jakim miejscu chcemy przechowywać rezultaty tasków

```python
app = Celery('tasks', broker='redis://localhost', backend='redis://localhost')
```
- dzięki temu możemy dostawać się do wybiku wywołania taska

```python
task_result = add.delay(3, 4)
print(task_result.result)  # wyświetla wynik taska

result2 = add.delay(4, 5).get() # czeka na wynik i zwraca go
```

Nie polecam korzystania z AsyncResult.get() - tracimy w ten sposób korzyść z asynchronicznego wykonywania zadania i wymuszmay wykonanie synchroniczne.

# Taski cykliczne 
- dzięki schedulerowi __celery beat__ można definiować cykliczne taski, tzn. takie, które mają wykonywać się np. co 30 sekund, w każdy czwartek o 17 itp.
- ważne jest określenie strefy czasowej z jakiej ma korzystać __Scheduler__ (domyślnie __UTC__)

```python
timezone = 'Europe/Warsaw'
```


http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html

## Startowanie schedulera
```bash
celery -A module_name beat
```

- można także wystartowac __Schedulera__ razem z __Workerem__

```bash
celery -A module_name worker -B
```

- __Celery Beat__ zapisuje czasy ostatnich wywołań __Tasków__ w lokalnym pliku (domyślnie _celerybeat-schedule_), więc musi mieć możliwość zapisu w katalogu, w którym ma zapisywać ten plik

- wybranie innej nazwy/ścieżki do pliku z historią:
```bash
celery -A module_name beat -s /different/directory/celerybeat-schedule
```

In [None]:
# Przyklad z http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#entries
from celery import Celery
from celery.schedules import crontab

app = Celery()

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # Calls test('hello') every 10 seconds.
    sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')

    # Calls test('world') every 30 seconds
    sender.add_periodic_task(30.0, test.s('world'), expires=10)

    # Executes every Monday morning at 7:30 a.m.
    sender.add_periodic_task(
        crontab(hour=7, minute=30, day_of_week=1),
        test.s('Happy Mondays!'),
    )

@app.task
def test(arg):
    print(arg)

Możliwe jest też definiowanie tasków w konfiguracji, np:

```python
# Przyklad z http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#entries
app.conf.beat_schedule = {
    'add-every-30-seconds': {
        'task': 'tasks.add',
        'schedule': 30.0,
        'args': (16, 16)
    },
}
```

# DZIĘKUJEMY!!!!

https://medium.freecodecamp.org/python-collection-of-my-favorite-articles-8469b8455939