# Text classification process using Toloka running at Airflow

Toloka offers a library of Airflow-integrated functions to facilitate crowdsourcing.
This example illustrates how one can build the whole project using these blocks.

This library provides Airflow tasks for Toloka.
You can connect tasks by passing one task's result to another as argument. For more details see Airflow docs.

Airflow can be run either locally or in docker with possibility for distribution.
We recommend to use docker, and it will be used in our example.

To get acquainted with Toloka tools for free,
you can use the promo code **TOLOKAKIT1** on $20 on your [profile page](https://toloka.yandex.com/requester/profile?utm_source=github&utm_medium=site&utm_campaign=tolokakit) after registration.

## Airflow configuration

First, you need to do basic configuration for running Airflow in docker.

Follow the instruction below. If you have any troubles, see [docs](https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html).

1. Install [Docker Community Edition (CE)](https://docs.docker.com/engine/install/) on your workstation.
Depending on the OS, you may need to configure your Docker instance to use 4.00 GB of memory for all containers to run properly.
Please refer to the Resources section if using [Docker for Windows](https://docs.docker.com/desktop/windows/) or [Docker for Mac](https://docs.docker.com/desktop/mac/) for more information.
2. Install [Docker Compose v1.29.1](https://docs.docker.com/compose/install/) and newer on your workstation.
    > Default amount of memory available for Docker on MacOS is often not enough to get Airflow up and running.
    **If enough memory is not allocated, it might lead to airflow webserver continuously restarting.
    You should at least allocate 4GB memory for the Docker Engine (ideally 8GB). You can check and change the amount of memory in [Resources](https://docs.docker.com/desktop/mac/)**
3. Create airflow folder
```
mkdir airflow && cd airflow
```
4. Fetch [docker-compose.yaml](https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml)
```
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.2.4/docker-compose.yaml'
```
5. Create custom Dockerfile with commands for installation required packages:
```
FROM apache/airflow:2.2.4-python3.9
RUN pip install --no-cache-dir airflow-provider-toloka
RUN pip install --no-cache-dir toloka-kit
RUN pip install --no-cache-dir crowd-kit
RUN pip install --no-cache-dir pandas
```
or can find Dockerfile in our files.
To build custom docker-image invoke the command below:
```
docker build . -f Dockerfile --tag toloka-airflow-image
```
6. Set docker image name in console
```
echo "AIRFLOW_IMAGE_NAME=toloka-airflow-image:latest" >> .env
```
or replace it in docker-compose file
```
✕ image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.2.4}
✓ image: ${AIRFLOW_IMAGE_NAME:-toloka-airflow-image:latest}
```
7. On Linux, the quick-start needs to know your host user id and needs to have group id set to 0.
Otherwise the files created in dags, logs and plugins will be created with root user. You have to make sure to configure them for the docker-compose:
```
mkdir -p ./dags ./logs ./plugins
echo -e "AIRFLOW_UID=$(id -u)" >> .env
```
For other operating systems, you will get warning that AIRFLOW_UID is not set, but you can ignore it. You can also manually create the .env file in the same folder your docker-compose.yaml is placed with this content to get rid of the warning:
```
AIRFLOW_UID=50000
```

8. Place file "text_classification.py" in `dags/` folder to let Airflow see it.

9. On all operating systems, you need to run database migrations and create the first user account. To do it, run.
```
docker-compose up airflow-init
```

10. Now you can start all services:
```
docker-compose up
```
The Airflow will be available at http://localhost:8080/home.
Default login and password are "airflow" and "airflow".

### Store Toloka Credentials

To use Toloka from Airflow you should pass a Toloka OAuth token to it.
If you do not have it yet, you can obtain it from your Toloka account at the `Profile / External Services Integration` page by clicking at `Get OAuth token`.

![Account information](./images/some_account.png)

You can store it in Airflow as a connection.

![Variable location](./images/connections.png)

Add a new connection at `Admin / Connections` page. Name it `toloka_default` and write Toloka token in password field.
Use fernet for proper security, see [docs](https://airflow.apache.org/docs/apache-airflow/stable/security/secrets/fernet.html).

![Token variable](./images/toloka_connection.png)

You can choose any connection type if `toloka` doesn't exist.

## Text classification project

Configuration is finished. Here you can see the whole "text_classification.py".

```python
from datetime import timedelta
import json

from airflow.decorators import dag, task
from airflow.utils.dates import days_ago

import toloka_provider.tasks.toloka as tlk_tasks

default_args = {
    'owner': 'airflow',
    'start_date': days_ago(5),
    'retries': 0,
}


@dag(default_args=default_args, schedule_interval=None, catchup=False, tags=['example'])
def text_classification():

    @task
    def download_json(url):
        """Download and parse json config stored at given url."""
        import requests

        response = requests.get(url)
        response.raise_for_status()
        return response.json()

    @task(multiple_outputs=True)
    def prepare_datasets(unlabeled_url: str, labeled_url: str):
        from sklearn.model_selection import train_test_split
        import pandas as pd

        labeled = pd.read_csv(labeled_url)
        labeled, exam_tasks = train_test_split(labeled, test_size=10, stratify=labeled.category)
        _, honeypots = train_test_split(labeled, test_size=20, stratify=labeled.category)

        main_tasks = pd.read_csv(unlabeled_url).sample(n=100)

        return {
            'main_tasks': main_tasks.to_json(),
            'exam_tasks': exam_tasks.to_json(),
            'honeypots': honeypots.to_json()
        }

    @task
    def prepare_tasks(main_tasks):
        main_tasks = json.loads(main_tasks)
        return [{'input_values': {'headline': headline}}
                for headline in main_tasks['headline'].values()]

    @task
    def prepare_exam_tasks(exam_tasks):
        exam_tasks = json.loads(exam_tasks)
        return [{'input_values': {'headline': headline},
                 'known_solutions': [{'output_values': {'category': category}}],
                 'message_on_unknown_solution': category}
                for headline, category in zip(exam_tasks['headline'].values(), exam_tasks['category'].values())]

    @task
    def prepare_honeypots(honeypots):
        honeypots = json.loads(honeypots)
        return [{'input_values': {'headline': headline},
                 'known_solutions': [{'output_values': {'category': category}}]}
                for headline, category in zip(honeypots['headline'].values(), honeypots['category'].values())]

    @task
    def aggregate_assignments(assignments):
        from crowdkit.aggregation import DawidSkene
        from toloka.client import structure, Assignment
        import pandas as pd

        assignments = [Assignment.from_json(assignment) for assignment in assignments]
        tasks = []
        labels = []
        performers = []
        for assignment in assignments:
            for task, solution in zip(assignment.tasks, assignment.solutions):
                tasks.append(task.input_values['headline'])
                labels.append(solution.output_values['category'])
                performers.append(assignment.user_id)
        assignments = {
            'task': tasks,
            'performer': performers,
            'label': labels
        }
        assignments = pd.DataFrame.from_dict(assignments)

        df = DawidSkene(n_iter=20).fit_predict(assignments).to_frame().reset_index()
        df.columns = ['headline', 'category']

        print('RESULT', df)

    project_conf = download_json(
        'https://raw.githubusercontent.com/Toloka/toloka-airflow/main/example/configs/project.json')
    exam_conf = download_json(
        'https://raw.githubusercontent.com/Toloka/toloka-airflow/main/example/configs/exam.json')
    pool_conf = download_json(
        'https://raw.githubusercontent.com/Toloka/toloka-airflow/main/example/configs/pool.json')

    project = tlk_tasks.create_project(project_conf)
    exam = tlk_tasks.create_exam_pool(exam_conf, project=project)
    pool = tlk_tasks.create_pool(pool_conf, project=project, exam_pool=exam, expiration=timedelta(days=1))

    dataset = prepare_datasets(
        unlabeled_url='https://raw.githubusercontent.com/Toloka/toloka-airflow/main/example/data/not_known.csv',
        labeled_url='https://raw.githubusercontent.com/Toloka/toloka-airflow/main/example/data/known.csv',
    )
    main_tasks, exam_tasks, honeypots = dataset['main_tasks'], dataset['exam_tasks'], dataset['honeypots']
    tasks = prepare_tasks(main_tasks)
    exam_tasks = prepare_exam_tasks(exam_tasks)
    honeypots = prepare_honeypots(honeypots)

    _exam_upload = tlk_tasks.create_tasks(exam_tasks, pool=exam, additional_args={'open_pool': True, 'allow_defaults': True})
    _honeypots_upload = tlk_tasks.create_tasks(honeypots, pool=pool, additional_args={'allow_defaults': True})
    _tasks_upload = tlk_tasks.create_tasks(tasks, pool=pool, additional_args={'allow_defaults': True})

    opened_pool = tlk_tasks.open_pool(pool)
    _waiting = tlk_tasks.wait_pool(opened_pool)

    assignments = tlk_tasks.get_assignments(pool)
    aggregate_assignments(assignments)

    [_exam_upload, _honeypots_upload, _tasks_upload] >> opened_pool
    _waiting >> assignments


dag = text_classification()
```

Then if you did everything all right and ran docker properly
you can go to http://localhost:8080/home and see `text_classification` dag that can be run.

![run button location](./images/run_button.png)

Then you can see statuses of tasks in graph tab.

![dag representation](./images/dag_representation.png)

When "aggregate_assignments" will be green it means that pipeline is finished.
You can click on `aggregate_assignments` then clink on `Log` and you will see some results of aggregation at the bottom of logs.

### Step by step explanation.

Basic configuration. `dag` decorator defines airflow dag.

```python
default_args = {
    'owner': 'airflow',
    'start_date': days_ago(5),
    'retries': 0,
}

@dag(default_args=default_args, schedule_interval=None, catchup=False, tags=['example'])
def text_classification():
    ...
```

This code downloads json configuration for project and pools.
We use it to not create project and pools by our own.

```python
@task
def download_json(url):
    """Download and parse json config stored at given url."""
    import requests

    response = requests.get(url)
    response.raise_for_status()
    return response.json()
```

Here are some user-defined functions to prepare input data.

```python
@task(multiple_outputs=True)
def prepare_datasets(unlabeled_url: str, labeled_url: str):
    ...

@task
def prepare_tasks(main_tasks):
    ...

@task
def prepare_exam_tasks(exam_tasks):
    ...

@task
def prepare_honeypots(honeypots):
    ...
```

`aggregate_assignments` function aggregates results by DawidSkene method.
Results will be printed in airflow logs.

To aggregate the results, we recommend using the methods of the [crowd-kit](https://github.com/Toloka/crowd-kit) package.
```python
@task
def aggregate_assignments(assignments):
    from crowdkit.aggregation import DawidSkene
    from toloka.client import structure, Assignment
    import pandas as pd

    assignments = [Assignment.from_json(assignment) for assignment in assignments]
    tasks = []
    labels = []
    performers = []
    for assignment in assignments:
        for task, solution in zip(assignment.tasks, assignment.solutions):
            tasks.append(task.input_values['headline'])
            labels.append(solution.output_values['category'])
            performers.append(assignment.user_id)
    assignments = {
        'task': tasks,
        'performer': performers,
        'label': labels
    }
    assignments = pd.DataFrame.from_dict(assignments)

    df = DawidSkene(n_iter=20).fit_predict(assignments).to_frame().reset_index()
    df.columns = ['headline', 'category']

    print('RESULT', df)
```

Here we define the topology of our pipeline.

```python
project_conf = download_json(
    'https://raw.githubusercontent.com/Toloka/toloka-airflow/main/example/configs/project.json')
exam_conf = download_json(
    'https://raw.githubusercontent.com/Toloka/toloka-airflow/main/example/configs/exam.json')
pool_conf = download_json(
    'https://raw.githubusercontent.com/Toloka/toloka-airflow/main/example/configs/pool.json')

project = tlk_tasks.create_project(project_conf)
exam = tlk_tasks.create_exam_pool(exam_conf, project=project)
pool = tlk_tasks.create_pool(pool_conf, project=project, exam_pool=exam, expiration=timedelta(days=1))

dataset = prepare_datasets(
    unlabeled_url='https://raw.githubusercontent.com/Toloka/toloka-airflow/main/example/data/not_known.csv',
    labeled_url='https://raw.githubusercontent.com/Toloka/toloka-airflow/main/example/data/known.csv',
)
main_tasks, exam_tasks, honeypots = dataset['main_tasks'], dataset['exam_tasks'], dataset['honeypots']
tasks = prepare_tasks(main_tasks)
exam_tasks = prepare_exam_tasks(exam_tasks)
honeypots = prepare_honeypots(honeypots)

_exam_upload = tlk_tasks.create_tasks(exam_tasks, pool=exam, additional_args={'open_pool': True, 'allow_defaults': True})
_honeypots_upload = tlk_tasks.create_tasks(honeypots, pool=pool, additional_args={'allow_defaults': True})
_tasks_upload = tlk_tasks.create_tasks(tasks, pool=pool, additional_args={'allow_defaults': True})

opened_pool = tlk_tasks.open_pool(pool)
_waiting = tlk_tasks.wait_pool(opened_pool)

assignments = tlk_tasks.get_assignments(pool)
aggregate_assignments(assignments)

[_exam_upload, _honeypots_upload, _tasks_upload] >> opened_pool
_waiting >> assignments
```

### What's next?

It was just an example of creating a workflow on Airflow with Toloka.
Further it can be expanded in the following directions:

* For production version you can configure custom XCom backend that lets pass heavy data across tasks. See example in `custom_xcom_backend.py`.
* You can [schedule your flow runs with Timetables](https://airflow.apache.org/docs/apache-airflow/stable/howto/timetable.html).
* You can configure [Logging and Monitoring](https://airflow.apache.org/docs/apache-airflow/stable/logging-monitoring/logging-architecture.html).
* Apache Airflow aims to be a very Kubernetes-friendly project, you can run Airflow from within
a Kubernetes cluster in order to take advantage of the increased stability and autoscaling options that Kubernetes provides, see [docs](https://airflow.apache.org/docs/apache-airflow/stable/kubernetes.html).
* And, of course, you can build much more advanced process with crowdsourcing using Toloka.