# Building streaming pipelines in Toloka

Let's solve the following task: find the goods in the online-store by given image and aggange found results by relevance.

It can be solved in 3 steps:
* For given image find corresponding goods in the online shop;
* Verfiy that the selected goods are correct;
* Arrange found goods by relevance using side-by-side comparison.

Each step is represented by Toloka pool. We should also connect those pools and move data between them.
<img src="https://avatars.mds.yandex.net/get-direct/4119367/JkCWTSuLpAkabWwEJ2pn-Q/orig" alt="Example pipeline steps" width="600">

### Call to action
If you found some bugs or have a new feature idea, don't hesitate to [open a new issue on Github](https://github.com/Toloka/toloka-kit/issues/new/choose).
Like our library and examples? Star [our repo on Github](https://github.com/Toloka/toloka-kit)

In [None]:
%%capture
!pip install toloka-kit==0.1.26
!pip install crowd-kit==1.0.0

In [None]:
import logging
import sys
import getpass
from toloka.client import TolokaClient

logging.basicConfig(format='%(levelname)s - %(asctime)s - %(name)s: %(message)s',
                    level=logging.INFO,
                    stream=sys.stdout)
client = toloka.TolokaClient(getpass.getpass('Enter your OAuth token: '), 'PRODUCTION') # Or switch to 'SANDBOX'

This example focuses on pools connections, so we don't pay much attention on projects and pools configuration here.
Let's just load configuration from files stored on GitHub.

In [None]:
import datetime
import requests
import os
from toloka.client import Pool, Project, structure

GITHUB_RAW = 'https://raw.githubusercontent.com'
GITHUB_BASE_PATH = 'Toloka/toloka-kit/main/examples/6.streaming_pipelines'

def _load_json_from_github(filename: str):
    response = requests.get(os.path.join(GITHUB_RAW, GITHUB_BASE_PATH, filename))
    response.raise_for_status()
    return response.json()

def create_project(filename: str) -> Project:
    return client.create_project(_load_json_from_github(filename))

def create_pool(filename: str, project_id: str, reward_per_assignment: float) -> Pool:
    pool = structure(_load_json_from_github(filename), Pool)
    pool.project_id = project_id
    pool.reward_per_assignment = reward_per_assignment
    pool.will_expire = datetime.datetime.now() + datetime.timedelta(days=3)
    return client.create_pool(pool)

find_items_project = create_project('find_items_project.json')
find_items_pool = create_pool('find_items_pool.json', find_items_project.id, 0.08)

verification_project = create_project('verification_project.json')
verification_pool = create_pool('verification_pool.json', verification_project.id, 0.02)

sbs_project = create_project('sbs_project.json')
sbs_pool = create_pool('sbs_pool.json', sbs_project.id, 0.04)


Some data flows may be implicitely implemented using pools quality control rules.

Here, if some assignment is rejected, the overlap of the corresponding tasks increases, that results in new microtasks appearance.

In [None]:
from toloka.client.actions import ChangeOverlap
from toloka.client.collectors import AssignmentsAssessment
from toloka.client.conditions import AssessmentEvent

find_items_pool.quality_control.add_action(
    collector=AssignmentsAssessment(),
    conditions=[AssessmentEvent == AssessmentEvent.REJECT],
    action=ChangeOverlap(delta=1, open_pool=True),
)
client.update_pool(find_items_pool.id, find_items_pool);

## Connections

Now define each connection as a separate callable.

Entire pipeline will be as follows:
<img src="https://avatars.mds.yandex.net/get-direct/5220563/U4kLMb8FxINJsxXTWeQGew/orig" alt="Example pipeline steps" width="800">

In [None]:
import collections
import itertools
import pandas as pd
from typing import List

from toloka.client.task import Task
from toloka.streaming.event import AssignmentEvent

OVERLAP_FIND_ITEMS = 12
OVERLAP_VERIFICATION = 3
OVERLAP_SBS = 3

In [None]:
def handle_found_items(events: List[AssignmentEvent]) -> None:
    """find_items_pool -> verification_pool"""
    verification_tasks = [
        Task(
            pool_id=verification_pool.id,
            unavailable_for=[event.assignment.user_id],
            overlap=OVERLAP_VERIFICATION,
            input_values={
                'image': task.input_values['image'],
                'found_link': solution.output_values['found_link'],
                'assignment_id': event.assignment.id
            },
        )
        for event in events
        for task, solution in zip(event.assignment.tasks, event.assignment.solutions)
    ]
    client.create_tasks(verification_tasks, open_pool=True)
    logging.info('Verification tasks created count: %d', len(verification_tasks))

In [None]:
from crowdkit.aggregation import MajorityVote
from toloka.client.exceptions import IncorrectActionsApiError


class VerificationDoneHandler:
    """verification_pool -> find_items_pool back using quality control rule"""
    def __init__(self, client: TolokaClient):
        self.client = client
        self.waiting = collections.defaultdict(list)

    def __call__(self, events: List[AssignmentEvent]) -> None:
        for event in events:
            for task, solution in zip(event.assignment.tasks, event.assignment.solutions):
                answer = (solution.output_values['result'], event.assignment.user_id)
                self.waiting[task.input_values['assignment_id']].append(answer)

        to_aggregate = []
        for assignment_id, answers in self.waiting.items():
            if len(answers) >= OVERLAP_VERIFICATION:
                to_aggregate.extend((assignment_id, result, user_id) for result, user_id in answers)

        if to_aggregate:
            to_aggregate_df = pd.DataFrame(to_aggregate, columns=['task', 'label', 'worker'])
            aggregated: pd.Series = MajorityVote().fit_predict(to_aggregate_df)
            logging.info('Statuses to apply count: %s', collections.Counter(aggregated.values))

            for assignment_id, result in aggregated.items():
                try:
                    if result == 'Yes':
                        self.client.accept_assignment(assignment_id, 'Well done!')
                    else:
                        self.client.reject_assignment(assignment_id, 'Incorrect object.')
                except IncorrectActionsApiError:  # You could have accepted or rejected it in the UI.
                    logging.exception('Can\'t set status %s at %s', result, assignment_id)
                del self.waiting[assignment_id]

        logging.info('Waiting for verification count: %d', len(self.waiting))

In [None]:
class AcceptedItemsToComparison:
    """find_items_pool -> sbs_pool"""
    def __init__(self, client: TolokaClient):
        self.client = client
        self.waiting = collections.defaultdict(list)

    def __call__(self, events: List[AssignmentEvent]) -> None:
        for event in events:
            for task, solution in zip(event.assignment.tasks, event.assignment.solutions):
                self.waiting[task.input_values['image']].append(solution.output_values['found_link'])

        to_sbs = [(image, found_links)
                  for image, found_links in self.waiting.items()
                  if len(found_links) >= OVERLAP_FIND_ITEMS]

        if to_sbs:
            logging.info('Got images ready for SbS count: %d', len(to_sbs))

            sbs_tasks = []
            for image, found_links in to_sbs:
                for left_link, right_link in itertools.combinations(found_links, 2):
                    input_values = {'image': image, 'left_link': left_link, 'right_link': right_link}
                    sbs_tasks.append(Task(pool_id=sbs_pool.id, overlap=OVERLAP_SBS, input_values=input_values))

            logging.info('SbS tasks to create count: %d', len(sbs_tasks))
            self.client.create_tasks(sbs_tasks, open_pool=True)

        for image, _ in to_sbs:
            del self.waiting[image]
        logging.info('Waiting for SbS count: %d', len(self.waiting))

In [None]:
from crowdkit.aggregation import BradleyTerry


class HandleSbS:
    """sbs_pool results aggregation"""
    def __init__(self, client: TolokaClient):
        self.client = client
        self.waiting = collections.defaultdict(list)
        self.scores_by_image = {}

    def __call__(self, events: List[AssignmentEvent]) -> None:
        for event in events:
            for task, solution in zip(event.assignment.tasks, event.assignment.solutions):
                answer = {'image': task.input_values['image'],
                          'worker': event.assignment.user_id,
                          'left': task.input_values['left_link'],
                          'right': task.input_values['right_link'],
                          'label': solution.output_values['result']}
                self.waiting[task.input_values['image']].append(answer)

        for image, answers in list(self.waiting.items()):
            if len(answers) >= OVERLAP_SBS:
                scores = BradleyTerry(n_iter=100).fit_predict(pd.DataFrame(answers))
                self.scores_by_image[image] = scores.sort_values(ascending=False)
                del self.waiting[image]

        logging.info('Waiting for SbS aggregation count: %d', len(self.waiting))

### Putting it all together

In [None]:
from toloka.streaming import AssignmentsObserver, Pipeline

pipeline = Pipeline()
found_items_observer = pipeline.register(AssignmentsObserver(client, find_items_pool.id))
verification_observer = pipeline.register(AssignmentsObserver(client, verification_pool.id))
sbs_observer = pipeline.register(AssignmentsObserver(client, sbs_pool.id))

In [None]:
found_items_observer.on_submitted(handle_found_items)
found_items_observer.on_accepted(AcceptedItemsToComparison(client))
verification_observer.on_accepted(VerificationDoneHandler(client))
sbs_handler = sbs_observer.on_accepted(HandleSbS(client))

Create tasks for initial pool.

In [None]:
images = [
    'https://tlk.s3.yandex.net/wsdm2020/photos/8ca087fe33065d75327cafdb8720204b.jpg',
    'https://tlk.s3.yandex.net/wsdm2020/photos/d0c9eb8737f48df5964d93b08ec0d758.jpg',
    'https://tlk.s3.yandex.net/wsdm2020/photos/9245eed8aa1d1e6f5d5d39d00ab044c6.jpg',
    'https://tlk.s3.yandex.net/wsdm2020/photos/0aff4fc1edbe6096a9a517092902627f.jpg',
    'http://tolokaadmin.s3.yandex.net/demo/abb61898-c886-4e20-b7cd-c0d359ddbb9a',
]
tasks = [
    Task(pool_id=find_items_pool.id, overlap=OVERLAP_FIND_ITEMS, input_values={'image': image})
    for image in images
]
client.create_tasks(tasks, open_pool=True);

### Run

In [None]:
# Google Colab is using a global event pool,
# so in order to run our pipeline we have to apply nest_asyncio to create an inner pool
if 'google.colab' in str(get_ipython()):
    import nest_asyncio, asyncio
    nest_asyncio.apply()
    asyncio.get_event_loop().run_until_complete(pipeline.run())
else:
    await pipeline.run()

### Display results

In [None]:
from IPython.display import Image, display

for image, scores in sbs_handler.scores_by_image.items():
    display(Image(url=image, height=200))
    print(f'{scores.nlargest(1)}\n')