# Text classification process using Toloka running at Prefect

Toloka offers a library of Prefect-integrated functions to facilitate crowdsourcing. This example illustrates how one can build the whole project using these blocks.

This library provide Prefect tasks for Toloka. You can connect tasks by passing one task's result to another as argument.
For more details see Prefect [docs](https://docs.prefect.io/core/concepts/flows.html#functional-api).

The resulting workflow can be run either locally or in the Prefect cloud. You can also deploy your own Prefect cluster if needed.
We recomend to use Prefect cloud for this tutorial.

## Prefect environment setup

To use Prefect Cloud we'll need to login to (or set up an account for) Prefect Cloud at https://cloud.prefect.io. 

After logging in, the next step is to create an API key to access the cloud from your local machine.

To do this select at https://cloud.prefect.io/ portal `Team / Service accounts` and create new service account.
<img src="https://raw.githubusercontent.com/vlad-mois/toloka-prefect/main/example/images/s01-team-service-accounts.png" alt="Select service accounts page" width="360" align="center">

Create new service account with "Administrator" role.

<img src="https://raw.githubusercontent.com/vlad-mois/toloka-prefect/main/example/images/s02-new-service-account.png" alt="Create new administrator service account" width="360" align="center">

Having a service account, you can create an API key. Create new key and copy it's value for further usage.

<img src="https://raw.githubusercontent.com/vlad-mois/toloka-prefect/main/example/images/s03-new-api-key.png" alt="Create new API key" width="360" align="center">

To start and monitor flow runs, you need a Prefect Agent. For this tutorial, you can create a Local Agent. You can find more agent types in prefect docs: https://docs.prefect.io/orchestration/agents/overview.html

To run the local agent you only need the `prefect` Python package installed. Let's install it and other packages required for this tutorial:

In [None]:
!pip install prefect pandas scikit-learn toloka-kit crowd-kit toloka-prefect

Prefect package also install `prefect` binary. Make sure it is present:

In [None]:
!prefect version  # Make sure this binary is installed.

Now open a new terminal to allow a Local Agent work in the background. And run the following.
```
# Run this in a new terminal window.
prefect backend cloud
prefect auth login --key <use-the-prefect-api-key-here>
prefect agent local start  # Leave it running in the background...
```

## Store Toloka credentials

To use Toloka from Prefect we should pass a Toloka OAuth token to it. If you do not have it yet, you can obtain it from your Toloka account at the `Profile / External Services Integration` page by clicking at `Get OAuth token`.
<img src="https://raw.githubusercontent.com/vlad-mois/toloka-prefect/main/example/images/s04-toloka-secret.png" alt="Obtain Toloka OAuth key" width="720" align="center">

You can store it in Prefect as a secret.

<img src="https://raw.githubusercontent.com/vlad-mois/toloka-prefect/main/example/images/s05-secrets.png" alt="Open Prefect secrets page" width="360" align="center">

Add it as a new secret at `Team / Secrets` page to use from workflow by it's name.

<img src="https://raw.githubusercontent.com/vlad-mois/toloka-prefect/main/example/images/s06-add-new-secret.png" alt="Add Toloka OAuth token as Prefect secret" width="360" align="center">

## Text classification project

### Create Prefect project

Prefect hierarchy contains the following entities:
* **Project** is a top-level entity to group flows.
* **Flow** shares default DAG topology, parameters and scheduling settings across multiple runs.
* **Flow version**. You may update the same flow several times. Each version is being saved.
* **Run** represent exact run of the flow: it's progress and result.

You may create a new Prefect project either in the UI, Prefect CLI or with the `prefect.Client`.
For more details see Prefect docs: https://docs.prefect.io/orchestration/concepts/projects.html

In [None]:
from prefect import Client

client = Client()
client.create_project(project_name='Toloka text classification')

### Take a look at data

In this example we got a bunch of news headlines. We will try to determine which of them are clickbait and which are not.

We use dataset from "SVM clickbait classifier", distributed under a MIT license.
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)

> Abhijnan Chakraborty, Bhargavi Paranjape, Sourya Kakarla, and Niloy Ganguly. "Stop Clickbait: Detecting and Preventing Clickbaits in Online News Media”. In Proceedings of the 2016 IEEE/ACM International Conference on Advances in Social Networks Analysis and Mining (ASONAM), San Fransisco, US, August 2016.


Let's load the dataset and look at it's structure.

In [None]:
import pandas as pd

df_labeled = pd.read_csv(
    'https://raw.githubusercontent.com/vlad-mois/toloka-prefect/main/example/data/known.csv'
)
df_unlabeled = pd.read_csv(
    'https://raw.githubusercontent.com/vlad-mois/toloka-prefect/main/example/data/not_known.csv'
)

In [None]:
df_labeled

In [None]:
df_unlabeled

Since we have a bunch of labeled data, we will use it to create an **exam** and **honeypots** (kind of trapping questions to filter untrustworthy workers).

We are not going to label all 30000 headlines here. For example purposes we will take only 100 of them.

### Prefect tasks description

The workflow consists of tasks representing a unit of work. Each task is performed independently (and possibly on a separate machine). 

The only way to connect them is to pass output from one task to another as an argument.

For more details see Prefect docs: https://docs.prefect.io/api/latest/core/task.html

Let's start from the task that load and transform initial dataset.

In [None]:
import pandas as pd
from prefect import task
from sklearn.model_selection import train_test_split
from typing import Tuple

@task
def prepare_datasets(
    unlabeled_url: str,
    labeled_url: str,
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:

    df_labeled = pd.read_csv(labeled_url)
    df_labeled, df_exam = train_test_split(df_labeled, test_size=10, stratify=df_labeled.category)
    _, df_honeypots = train_test_split(df_labeled, test_size=20, stratify=df_labeled.category)

    df_main = pd.read_csv(unlabeled_url).sample(n=100)

    return {'main': df_main, 'exam': df_exam, 'honeypots': df_honeypots}

We are going to create Toloka tasks based on this data.

In [None]:
from typing import List
from toloka.client import Task  # This is the Toloka `Task` object, not a Prefect one.

@task
def prepare_tasks(df_main: pd.DataFrame) -> List[Task]:
    return [Task(input_values={'headline': headline})
            for headline in df_main['headline']]

@task
def prepare_exam_tasks(df_exam: pd.DataFrame) -> List[Task]:
    return [Task(input_values={'headline': row.headline},
                 known_solutions=[{'output_values': {'category': row.category}}],
                 message_on_unknown_solution=row.category)
            for row in df_exam.itertuples(index=False)]

@task
def prepare_honeypots(df_honeypots: pd.DataFrame) -> List[Task]:
    return [Task(input_values={'headline': row.headline},
                 known_solutions=[{'output_values': {'category': row.category}}])
            for row in df_honeypots.itertuples(index=False)]

We have set _Toloka tasks'_ interface as well as other settings in a separate _Toloka project_ and _Toloka pool_ configs.

You may find it here:
* Toloka project config: https://github.com/vlad-mois/toloka-prefect/blob/main/example/configs/project.json
* Toloka pool config: https://github.com/vlad-mois/toloka-prefect/blob/main/example/configs/pool.json
* Exam config: https://github.com/vlad-mois/toloka-prefect/blob/main/example/configs/exam.json

We will load and use them later in separate Prefect tasks.

Now we have to take care of another stage of our process: results aggregation.
We use _overlap_, so we will receive several assignments for each headline.
Let's use _DavidSkeene_ method from **crowd-kit**.

Since this is the final task in our graph, we are going to store it's result. There are different ways to do it (add task, that put it to S3, for example). For simplicity, we save it locally using `LocalResult` object. Our dataframe will be saved in the pickle format. For other options see Prefect docs: https://docs.prefect.io/core/concepts/results.html

For AWS S3 tasks see docs: https://docs.prefect.io/api/latest/tasks/aws.html

In [None]:
from crowdkit.aggregation import DawidSkene
from prefect.engine.results import LocalResult

@task(result=LocalResult(dir='~/Documents/prefect-results/',
                         location='{flow_name}-{date:%Y-%m-%d-%H-%M-%S}.pickle'))
def aggregate_assignments(assignments: pd.DataFrame) -> pd.DataFrame:
    assignments = assignments[assignments['GOLDEN:category'].isna()]  # Ignore honeypots.
    assignments = assignments.rename(columns={'INPUT:headline': 'task',
                                              'OUTPUT:category': 'label',
                                              'ASSIGNMENT:worker_id': 'performer'})
    df = DawidSkene(n_iter=20).fit_predict(assignments).to_frame().reset_index()
    df.columns = ['headline', 'category']
    return df

The above tasks are all that we have to describe manually. We'll take the rest from the **toloka-prefect** library.

Let's build a DAG. Note, that tasks outputs are not a regular Python value, but a `prefect.Task` instances instead.

In [None]:
from datetime import timedelta
from prefect import Flow

import toloka_prefect.operations as tlk
from toloka_prefect.helpers import download_json

with Flow('text-classification') as flow:
    project_conf = download_json('https://raw.githubusercontent.com/Toloka/toloka-prefect/main/example/configs/project.json')
    exam_conf = download_json('https://raw.githubusercontent.com/Toloka/toloka-prefect/main/example/configs/exam.json')
    pool_conf = download_json('https://raw.githubusercontent.com/Toloka/toloka-prefect/main/example/configs/pool.json')

    project = tlk.create_project(project_conf)
    exam = tlk.create_exam_pool(exam_conf, project_id=project)
    pool = tlk.create_pool(pool_conf, project_id=project, exam_pool_id=exam, expiration=timedelta(days=1))

    datasets = prepare_datasets(
        unlabeled_url='https://raw.githubusercontent.com/Toloka/toloka-prefect/main/example/data/not_known.csv',
        labeled_url='https://raw.githubusercontent.com/Toloka/toloka-prefect/main/example/data/known.csv',
    )
    tasks = prepare_tasks(datasets['main'])
    exam_tasks = prepare_exam_tasks(datasets['exam'])
    honeypots = prepare_honeypots(datasets['honeypots'])

    _exam_upload = tlk.create_tasks(exam_tasks, pool_id=exam, open_pool=True, allow_defaults=True)
    _hp_upload = tlk.create_tasks(honeypots, pool_id=pool, allow_defaults=True)
    _tasks_upload = tlk.create_tasks(tasks, pool_id=pool, allow_defaults=True)

    _waiting = tlk.wait_pool(pool, open_pool=True, upstream_tasks=[_exam_upload, _hp_upload, _tasks_upload])
    assignments = tlk.get_assignments_df(pool, upstream_tasks=[_waiting])

    result = aggregate_assignments(assignments)

flow.register(project_name='Toloka text classification')

After a flow registration you may follow a given link and configure a flow runs. 

To make a single run you may click at `Quick run`.

<img src="https://raw.githubusercontent.com/vlad-mois/toloka-prefect/main/example/images/s07-quick-run.png" alt="Click at quick run" width="720" align="center">

After starting the workflow, you can see it's progress at `Overview` tab.

<img src="https://raw.githubusercontent.com/vlad-mois/toloka-prefect/main/example/images/s08-progress.png" alt="Flow run progress" width="720" align="center">

The resulting graph can be viewed at `Schematic` tab.

<img src="https://raw.githubusercontent.com/vlad-mois/toloka-prefect/main/example/images/s09-dag.png" alt="Flow run progress" width="720" align="center">

### Obtaining the results

When our flow run is completed, we will be able to find the `aggregate_assignments` result in the previously specified directory (the one we passed to the `LocalResult` object).

Or one can find it's exact path from the `aggregate_assignments` task run page.

<img src="https://raw.githubusercontent.com/vlad-mois/toloka-prefect/main/example/images/s10-task-run-result-path-crop.png" alt="Find LocalResult path at the Task Run page" width="720" align="center">

Let's get the dataframe saved at this path.

In [None]:
import pickle

# Change to your path.
with open('/Users/some-user/Documents/prefect-results/text-classification-2021-12-10-00-14-29.pickle', 'rb') as file:
    res = pickle.load(file)

res

## What's next?

It was just an example of creating a workflow on Prefect with Toloka. Further it can be expanded in the following directions:
* You can supply your input data from [PostgreSQL](https://docs.prefect.io/api/latest/tasks/postgres.html), [Kafka](https://docs.prefect.io/api/latest/tasks/kafka.html), [S3](https://docs.prefect.io/api/latest/tasks/aws.html) and other sources. There are a ready-made collection of these tasks in Prefect. The same is for output data.
* You can schedule your flow runs in the `Flow / Settings / Schedulers` tab.
* You can add notifications to be sent to Slack, email, or any other endpoint when your flow enters a given state at the `Flow / Settings / Cloud hooks` tab.
* Run Prefect agents in a cloud or deploy a whole Prefect Core in your cluster. For other Prefect Agent types (not only the Local one) see docs: 
* And, of course, you can build much more advanced process with crowdsourcing using Toloka.